mirror of
https://github.com/telemt/telemt.git
synced 2026-04-15 09:34:10 +03:00
Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d99df37ac5 | ||
|
|
d0f253b49b | ||
|
|
ef2ed3daa0 | ||
|
|
fc52cad109 | ||
|
|
98f365be44 | ||
|
|
b6c3cae2ad | ||
|
|
5f7fb15dd8 | ||
|
|
3a89f16332 | ||
|
|
aa3fcfbbe1 | ||
|
|
a616775f6d | ||
|
|
633af93b19 |
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "telemt"
|
name = "telemt"
|
||||||
version = "3.3.12"
|
version = "3.3.14"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
35
README.md
35
README.md
@@ -104,28 +104,19 @@ We welcome ideas, architectural feedback, and pull requests.
|
|||||||
- Extensive logging via `trace` and `debug` with `RUST_LOG` method
|
- Extensive logging via `trace` and `debug` with `RUST_LOG` method
|
||||||
|
|
||||||
# GOTO
|
# GOTO
|
||||||
- [Telemt - MTProxy on Rust + Tokio](#telemt---mtproxy-on-rust--tokio)
|
- [Quick Start Guide](#quick-start-guide)
|
||||||
- [NEWS and EMERGENCY](#news-and-emergency)
|
- [FAQ](#faq)
|
||||||
- [✈️ Telemt 3 is released!](#️-telemt-3-is-released)
|
- [Recognizability for DPI and crawler](#recognizability-for-dpi-and-crawler)
|
||||||
- [🇷🇺 RU](#-ru)
|
- [Client WITH secret-key accesses the MTProxy resource:](#client-with-secret-key-accesses-the-mtproxy-resource)
|
||||||
- [Релиз 3.3.5 LTS - 6 марта](#релиз-335-lts---6-марта)
|
- [Client WITHOUT secret-key gets transparent access to the specified resource:](#client-without-secret-key-gets-transparent-access-to-the-specified-resource)
|
||||||
- [🇬🇧 EN](#-en)
|
- [Telegram Calls via MTProxy](#telegram-calls-via-mtproxy)
|
||||||
- [Release 3.3.5 LTS - March 6](#release-335-lts---march-6)
|
- [How does DPI see MTProxy TLS?](#how-does-dpi-see-mtproxy-tls)
|
||||||
- [Features](#features)
|
- [Whitelist on IP](#whitelist-on-ip)
|
||||||
- [GOTO](#goto)
|
- [Too many open files](#too-many-open-files)
|
||||||
- [Quick Start Guide](#quick-start-guide)
|
- [Build](#build)
|
||||||
- [FAQ](#faq)
|
- [Why Rust?](#why-rust)
|
||||||
- [Recognizability for DPI and crawler](#recognizability-for-dpi-and-crawler)
|
- [Issues](#issues)
|
||||||
- [Client WITH secret-key accesses the MTProxy resource:](#client-with-secret-key-accesses-the-mtproxy-resource)
|
- [Roadmap](#roadmap)
|
||||||
- [Client WITHOUT secret-key gets transparent access to the specified resource:](#client-without-secret-key-gets-transparent-access-to-the-specified-resource)
|
|
||||||
- [Telegram Calls via MTProxy](#telegram-calls-via-mtproxy)
|
|
||||||
- [How does DPI see MTProxy TLS?](#how-does-dpi-see-mtproxy-tls)
|
|
||||||
- [Whitelist on IP](#whitelist-on-ip)
|
|
||||||
- [Too many open files](#too-many-open-files)
|
|
||||||
- [Build](#build)
|
|
||||||
- [Why Rust?](#why-rust)
|
|
||||||
- [Issues](#issues)
|
|
||||||
- [Roadmap](#roadmap)
|
|
||||||
|
|
||||||
|
|
||||||
## Quick Start Guide
|
## Quick Start Guide
|
||||||
|
|||||||
@@ -24,6 +24,13 @@ const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL: u32 = 256;
|
|||||||
const DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 4096;
|
const DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 4096;
|
||||||
const DEFAULT_ME_ROUTE_CHANNEL_CAPACITY: usize = 768;
|
const DEFAULT_ME_ROUTE_CHANNEL_CAPACITY: usize = 768;
|
||||||
const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 1024;
|
const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 1024;
|
||||||
|
const DEFAULT_ME_READER_ROUTE_DATA_WAIT_MS: u64 = 2;
|
||||||
|
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_FRAMES: usize = 32;
|
||||||
|
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_BYTES: usize = 128 * 1024;
|
||||||
|
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US: u64 = 1500;
|
||||||
|
const DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE: bool = false;
|
||||||
|
const DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES: usize = 64 * 1024;
|
||||||
|
const DEFAULT_DIRECT_RELAY_COPY_BUF_S2C_BYTES: usize = 256 * 1024;
|
||||||
const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3;
|
const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3;
|
||||||
const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000;
|
const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000;
|
||||||
const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000;
|
const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000;
|
||||||
@@ -316,6 +323,34 @@ pub(crate) fn default_me_c2me_channel_capacity() -> usize {
|
|||||||
DEFAULT_ME_C2ME_CHANNEL_CAPACITY
|
DEFAULT_ME_C2ME_CHANNEL_CAPACITY
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_reader_route_data_wait_ms() -> u64 {
|
||||||
|
DEFAULT_ME_READER_ROUTE_DATA_WAIT_MS
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_d2c_flush_batch_max_frames() -> usize {
|
||||||
|
DEFAULT_ME_D2C_FLUSH_BATCH_MAX_FRAMES
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_d2c_flush_batch_max_bytes() -> usize {
|
||||||
|
DEFAULT_ME_D2C_FLUSH_BATCH_MAX_BYTES
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_d2c_flush_batch_max_delay_us() -> u64 {
|
||||||
|
DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_d2c_ack_flush_immediate() -> bool {
|
||||||
|
DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_direct_relay_copy_buf_c2s_bytes() -> usize {
|
||||||
|
DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_direct_relay_copy_buf_s2c_bytes() -> usize {
|
||||||
|
DEFAULT_DIRECT_RELAY_COPY_BUF_S2C_BYTES
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_me_writer_pick_sample_size() -> u8 {
|
pub(crate) fn default_me_writer_pick_sample_size() -> u8 {
|
||||||
DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE
|
DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -96,6 +96,13 @@ pub struct HotFields {
|
|||||||
pub me_route_backpressure_base_timeout_ms: u64,
|
pub me_route_backpressure_base_timeout_ms: u64,
|
||||||
pub me_route_backpressure_high_timeout_ms: u64,
|
pub me_route_backpressure_high_timeout_ms: u64,
|
||||||
pub me_route_backpressure_high_watermark_pct: u8,
|
pub me_route_backpressure_high_watermark_pct: u8,
|
||||||
|
pub me_reader_route_data_wait_ms: u64,
|
||||||
|
pub me_d2c_flush_batch_max_frames: usize,
|
||||||
|
pub me_d2c_flush_batch_max_bytes: usize,
|
||||||
|
pub me_d2c_flush_batch_max_delay_us: u64,
|
||||||
|
pub me_d2c_ack_flush_immediate: bool,
|
||||||
|
pub direct_relay_copy_buf_c2s_bytes: usize,
|
||||||
|
pub direct_relay_copy_buf_s2c_bytes: usize,
|
||||||
pub me_health_interval_ms_unhealthy: u64,
|
pub me_health_interval_ms_unhealthy: u64,
|
||||||
pub me_health_interval_ms_healthy: u64,
|
pub me_health_interval_ms_healthy: u64,
|
||||||
pub me_admission_poll_ms: u64,
|
pub me_admission_poll_ms: u64,
|
||||||
@@ -203,6 +210,13 @@ impl HotFields {
|
|||||||
me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms,
|
me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms,
|
||||||
me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms,
|
me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms,
|
||||||
me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct,
|
me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct,
|
||||||
|
me_reader_route_data_wait_ms: cfg.general.me_reader_route_data_wait_ms,
|
||||||
|
me_d2c_flush_batch_max_frames: cfg.general.me_d2c_flush_batch_max_frames,
|
||||||
|
me_d2c_flush_batch_max_bytes: cfg.general.me_d2c_flush_batch_max_bytes,
|
||||||
|
me_d2c_flush_batch_max_delay_us: cfg.general.me_d2c_flush_batch_max_delay_us,
|
||||||
|
me_d2c_ack_flush_immediate: cfg.general.me_d2c_ack_flush_immediate,
|
||||||
|
direct_relay_copy_buf_c2s_bytes: cfg.general.direct_relay_copy_buf_c2s_bytes,
|
||||||
|
direct_relay_copy_buf_s2c_bytes: cfg.general.direct_relay_copy_buf_s2c_bytes,
|
||||||
me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy,
|
me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy,
|
||||||
me_health_interval_ms_healthy: cfg.general.me_health_interval_ms_healthy,
|
me_health_interval_ms_healthy: cfg.general.me_health_interval_ms_healthy,
|
||||||
me_admission_poll_ms: cfg.general.me_admission_poll_ms,
|
me_admission_poll_ms: cfg.general.me_admission_poll_ms,
|
||||||
@@ -352,6 +366,13 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
|
|||||||
new.general.me_route_backpressure_high_timeout_ms;
|
new.general.me_route_backpressure_high_timeout_ms;
|
||||||
cfg.general.me_route_backpressure_high_watermark_pct =
|
cfg.general.me_route_backpressure_high_watermark_pct =
|
||||||
new.general.me_route_backpressure_high_watermark_pct;
|
new.general.me_route_backpressure_high_watermark_pct;
|
||||||
|
cfg.general.me_reader_route_data_wait_ms = new.general.me_reader_route_data_wait_ms;
|
||||||
|
cfg.general.me_d2c_flush_batch_max_frames = new.general.me_d2c_flush_batch_max_frames;
|
||||||
|
cfg.general.me_d2c_flush_batch_max_bytes = new.general.me_d2c_flush_batch_max_bytes;
|
||||||
|
cfg.general.me_d2c_flush_batch_max_delay_us = new.general.me_d2c_flush_batch_max_delay_us;
|
||||||
|
cfg.general.me_d2c_ack_flush_immediate = new.general.me_d2c_ack_flush_immediate;
|
||||||
|
cfg.general.direct_relay_copy_buf_c2s_bytes = new.general.direct_relay_copy_buf_c2s_bytes;
|
||||||
|
cfg.general.direct_relay_copy_buf_s2c_bytes = new.general.direct_relay_copy_buf_s2c_bytes;
|
||||||
cfg.general.me_health_interval_ms_unhealthy = new.general.me_health_interval_ms_unhealthy;
|
cfg.general.me_health_interval_ms_unhealthy = new.general.me_health_interval_ms_unhealthy;
|
||||||
cfg.general.me_health_interval_ms_healthy = new.general.me_health_interval_ms_healthy;
|
cfg.general.me_health_interval_ms_healthy = new.general.me_health_interval_ms_healthy;
|
||||||
cfg.general.me_admission_poll_ms = new.general.me_admission_poll_ms;
|
cfg.general.me_admission_poll_ms = new.general.me_admission_poll_ms;
|
||||||
@@ -821,6 +842,7 @@ fn log_changes(
|
|||||||
!= new_hot.me_route_backpressure_high_timeout_ms
|
!= new_hot.me_route_backpressure_high_timeout_ms
|
||||||
|| old_hot.me_route_backpressure_high_watermark_pct
|
|| old_hot.me_route_backpressure_high_watermark_pct
|
||||||
!= new_hot.me_route_backpressure_high_watermark_pct
|
!= new_hot.me_route_backpressure_high_watermark_pct
|
||||||
|
|| old_hot.me_reader_route_data_wait_ms != new_hot.me_reader_route_data_wait_ms
|
||||||
|| old_hot.me_health_interval_ms_unhealthy
|
|| old_hot.me_health_interval_ms_unhealthy
|
||||||
!= new_hot.me_health_interval_ms_unhealthy
|
!= new_hot.me_health_interval_ms_unhealthy
|
||||||
|| old_hot.me_health_interval_ms_healthy != new_hot.me_health_interval_ms_healthy
|
|| old_hot.me_health_interval_ms_healthy != new_hot.me_health_interval_ms_healthy
|
||||||
@@ -828,10 +850,11 @@ fn log_changes(
|
|||||||
|| old_hot.me_warn_rate_limit_ms != new_hot.me_warn_rate_limit_ms
|
|| old_hot.me_warn_rate_limit_ms != new_hot.me_warn_rate_limit_ms
|
||||||
{
|
{
|
||||||
info!(
|
info!(
|
||||||
"config reload: me_route_backpressure: base={}ms high={}ms watermark={}%; me_health_interval: unhealthy={}ms healthy={}ms; me_admission_poll={}ms; me_warn_rate_limit={}ms",
|
"config reload: me_route_backpressure: base={}ms high={}ms watermark={}%; me_reader_route_data_wait_ms={}; me_health_interval: unhealthy={}ms healthy={}ms; me_admission_poll={}ms; me_warn_rate_limit={}ms",
|
||||||
new_hot.me_route_backpressure_base_timeout_ms,
|
new_hot.me_route_backpressure_base_timeout_ms,
|
||||||
new_hot.me_route_backpressure_high_timeout_ms,
|
new_hot.me_route_backpressure_high_timeout_ms,
|
||||||
new_hot.me_route_backpressure_high_watermark_pct,
|
new_hot.me_route_backpressure_high_watermark_pct,
|
||||||
|
new_hot.me_reader_route_data_wait_ms,
|
||||||
new_hot.me_health_interval_ms_unhealthy,
|
new_hot.me_health_interval_ms_unhealthy,
|
||||||
new_hot.me_health_interval_ms_healthy,
|
new_hot.me_health_interval_ms_healthy,
|
||||||
new_hot.me_admission_poll_ms,
|
new_hot.me_admission_poll_ms,
|
||||||
@@ -839,6 +862,24 @@ fn log_changes(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if old_hot.me_d2c_flush_batch_max_frames != new_hot.me_d2c_flush_batch_max_frames
|
||||||
|
|| old_hot.me_d2c_flush_batch_max_bytes != new_hot.me_d2c_flush_batch_max_bytes
|
||||||
|
|| old_hot.me_d2c_flush_batch_max_delay_us != new_hot.me_d2c_flush_batch_max_delay_us
|
||||||
|
|| old_hot.me_d2c_ack_flush_immediate != new_hot.me_d2c_ack_flush_immediate
|
||||||
|
|| old_hot.direct_relay_copy_buf_c2s_bytes != new_hot.direct_relay_copy_buf_c2s_bytes
|
||||||
|
|| old_hot.direct_relay_copy_buf_s2c_bytes != new_hot.direct_relay_copy_buf_s2c_bytes
|
||||||
|
{
|
||||||
|
info!(
|
||||||
|
"config reload: relay_tuning: me_d2c_frames={} me_d2c_bytes={} me_d2c_delay_us={} me_ack_flush_immediate={} direct_buf_c2s={} direct_buf_s2c={}",
|
||||||
|
new_hot.me_d2c_flush_batch_max_frames,
|
||||||
|
new_hot.me_d2c_flush_batch_max_bytes,
|
||||||
|
new_hot.me_d2c_flush_batch_max_delay_us,
|
||||||
|
new_hot.me_d2c_ack_flush_immediate,
|
||||||
|
new_hot.direct_relay_copy_buf_c2s_bytes,
|
||||||
|
new_hot.direct_relay_copy_buf_s2c_bytes,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
if old_hot.users != new_hot.users {
|
if old_hot.users != new_hot.users {
|
||||||
let mut added: Vec<&String> = new_hot.users.keys()
|
let mut added: Vec<&String> = new_hot.users.keys()
|
||||||
.filter(|u| !old_hot.users.contains_key(*u))
|
.filter(|u| !old_hot.users.contains_key(*u))
|
||||||
|
|||||||
@@ -303,6 +303,42 @@ impl ProxyConfig {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.general.me_reader_route_data_wait_ms > 20 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_reader_route_data_wait_ms must be within [0, 20]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !(1..=512).contains(&config.general.me_d2c_flush_batch_max_frames) {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_d2c_flush_batch_max_frames must be within [1, 512]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !(4096..=2 * 1024 * 1024).contains(&config.general.me_d2c_flush_batch_max_bytes) {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_d2c_flush_batch_max_bytes must be within [4096, 2097152]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.general.me_d2c_flush_batch_max_delay_us > 5000 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_d2c_flush_batch_max_delay_us must be within [0, 5000]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !(4096..=1024 * 1024).contains(&config.general.direct_relay_copy_buf_c2s_bytes) {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.direct_relay_copy_buf_c2s_bytes must be within [4096, 1048576]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !(8192..=2 * 1024 * 1024).contains(&config.general.direct_relay_copy_buf_s2c_bytes) {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.direct_relay_copy_buf_s2c_bytes must be within [8192, 2097152]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if config.general.me_health_interval_ms_unhealthy == 0 {
|
if config.general.me_health_interval_ms_unhealthy == 0 {
|
||||||
return Err(ProxyError::Config(
|
return Err(ProxyError::Config(
|
||||||
"general.me_health_interval_ms_unhealthy must be > 0".to_string(),
|
"general.me_health_interval_ms_unhealthy must be > 0".to_string(),
|
||||||
|
|||||||
@@ -458,6 +458,36 @@ pub struct GeneralConfig {
|
|||||||
#[serde(default = "default_me_c2me_channel_capacity")]
|
#[serde(default = "default_me_c2me_channel_capacity")]
|
||||||
pub me_c2me_channel_capacity: usize,
|
pub me_c2me_channel_capacity: usize,
|
||||||
|
|
||||||
|
/// Bounded wait in milliseconds for routing ME DATA to per-connection queue.
|
||||||
|
/// `0` keeps legacy no-wait behavior.
|
||||||
|
#[serde(default = "default_me_reader_route_data_wait_ms")]
|
||||||
|
pub me_reader_route_data_wait_ms: u64,
|
||||||
|
|
||||||
|
/// Maximum number of ME->Client responses coalesced before flush.
|
||||||
|
#[serde(default = "default_me_d2c_flush_batch_max_frames")]
|
||||||
|
pub me_d2c_flush_batch_max_frames: usize,
|
||||||
|
|
||||||
|
/// Maximum total payload bytes coalesced before flush.
|
||||||
|
#[serde(default = "default_me_d2c_flush_batch_max_bytes")]
|
||||||
|
pub me_d2c_flush_batch_max_bytes: usize,
|
||||||
|
|
||||||
|
/// Maximum wait in microseconds to coalesce additional ME->Client responses.
|
||||||
|
/// `0` disables timed coalescing.
|
||||||
|
#[serde(default = "default_me_d2c_flush_batch_max_delay_us")]
|
||||||
|
pub me_d2c_flush_batch_max_delay_us: u64,
|
||||||
|
|
||||||
|
/// Flush client writer immediately after quick-ack write.
|
||||||
|
#[serde(default = "default_me_d2c_ack_flush_immediate")]
|
||||||
|
pub me_d2c_ack_flush_immediate: bool,
|
||||||
|
|
||||||
|
/// Copy buffer size for client->DC direction in direct relay.
|
||||||
|
#[serde(default = "default_direct_relay_copy_buf_c2s_bytes")]
|
||||||
|
pub direct_relay_copy_buf_c2s_bytes: usize,
|
||||||
|
|
||||||
|
/// Copy buffer size for DC->client direction in direct relay.
|
||||||
|
#[serde(default = "default_direct_relay_copy_buf_s2c_bytes")]
|
||||||
|
pub direct_relay_copy_buf_s2c_bytes: usize,
|
||||||
|
|
||||||
/// Max pending ciphertext buffer per client writer (bytes).
|
/// Max pending ciphertext buffer per client writer (bytes).
|
||||||
/// Controls FakeTLS backpressure vs throughput.
|
/// Controls FakeTLS backpressure vs throughput.
|
||||||
#[serde(default = "default_crypto_pending_buffer")]
|
#[serde(default = "default_crypto_pending_buffer")]
|
||||||
@@ -861,6 +891,13 @@ impl Default for GeneralConfig {
|
|||||||
me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(),
|
me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(),
|
||||||
me_route_channel_capacity: default_me_route_channel_capacity(),
|
me_route_channel_capacity: default_me_route_channel_capacity(),
|
||||||
me_c2me_channel_capacity: default_me_c2me_channel_capacity(),
|
me_c2me_channel_capacity: default_me_c2me_channel_capacity(),
|
||||||
|
me_reader_route_data_wait_ms: default_me_reader_route_data_wait_ms(),
|
||||||
|
me_d2c_flush_batch_max_frames: default_me_d2c_flush_batch_max_frames(),
|
||||||
|
me_d2c_flush_batch_max_bytes: default_me_d2c_flush_batch_max_bytes(),
|
||||||
|
me_d2c_flush_batch_max_delay_us: default_me_d2c_flush_batch_max_delay_us(),
|
||||||
|
me_d2c_ack_flush_immediate: default_me_d2c_ack_flush_immediate(),
|
||||||
|
direct_relay_copy_buf_c2s_bytes: default_direct_relay_copy_buf_c2s_bytes(),
|
||||||
|
direct_relay_copy_buf_s2c_bytes: default_direct_relay_copy_buf_s2c_bytes(),
|
||||||
me_warmup_stagger_enabled: default_true(),
|
me_warmup_stagger_enabled: default_true(),
|
||||||
me_warmup_step_delay_ms: default_warmup_step_delay_ms(),
|
me_warmup_step_delay_ms: default_warmup_step_delay_ms(),
|
||||||
me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(),
|
me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(),
|
||||||
|
|||||||
157
src/main.rs
157
src/main.rs
@@ -37,6 +37,7 @@ use crate::crypto::SecureRandom;
|
|||||||
use crate::ip_tracker::UserIpTracker;
|
use crate::ip_tracker::UserIpTracker;
|
||||||
use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe};
|
use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe};
|
||||||
use crate::proxy::ClientHandler;
|
use crate::proxy::ClientHandler;
|
||||||
|
use crate::proxy::route_mode::{ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteRuntimeController};
|
||||||
use crate::stats::beobachten::BeobachtenStore;
|
use crate::stats::beobachten::BeobachtenStore;
|
||||||
use crate::stats::telemetry::TelemetryPolicy;
|
use crate::stats::telemetry::TelemetryPolicy;
|
||||||
use crate::stats::{ReplayChecker, Stats};
|
use crate::stats::{ReplayChecker, Stats};
|
||||||
@@ -261,6 +262,10 @@ async fn wait_until_admission_open(admission_rx: &mut watch::Receiver<bool>) ->
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn is_expected_handshake_eof(err: &crate::error::ProxyError) -> bool {
|
||||||
|
err.to_string().contains("expected 64 bytes, got 0")
|
||||||
|
}
|
||||||
|
|
||||||
async fn load_startup_proxy_config_snapshot(
|
async fn load_startup_proxy_config_snapshot(
|
||||||
url: &str,
|
url: &str,
|
||||||
cache_path: Option<&str>,
|
cache_path: Option<&str>,
|
||||||
@@ -519,6 +524,12 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
let (api_config_tx, api_config_rx) = watch::channel(Arc::new(config.clone()));
|
let (api_config_tx, api_config_rx) = watch::channel(Arc::new(config.clone()));
|
||||||
let initial_admission_open = !config.general.use_middle_proxy;
|
let initial_admission_open = !config.general.use_middle_proxy;
|
||||||
let (admission_tx, admission_rx) = watch::channel(initial_admission_open);
|
let (admission_tx, admission_rx) = watch::channel(initial_admission_open);
|
||||||
|
let initial_route_mode = if config.general.use_middle_proxy {
|
||||||
|
RelayRouteMode::Middle
|
||||||
|
} else {
|
||||||
|
RelayRouteMode::Direct
|
||||||
|
};
|
||||||
|
let route_runtime = Arc::new(RouteRuntimeController::new(initial_route_mode));
|
||||||
let api_me_pool = Arc::new(RwLock::new(None::<Arc<MePool>>));
|
let api_me_pool = Arc::new(RwLock::new(None::<Arc<MePool>>));
|
||||||
startup_tracker
|
startup_tracker
|
||||||
.start_component(COMPONENT_API_BOOTSTRAP, Some("spawn API listener task".to_string()))
|
.start_component(COMPONENT_API_BOOTSTRAP, Some("spawn API listener task".to_string()))
|
||||||
@@ -1055,6 +1066,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
config.general.me_route_backpressure_base_timeout_ms,
|
config.general.me_route_backpressure_base_timeout_ms,
|
||||||
config.general.me_route_backpressure_high_timeout_ms,
|
config.general.me_route_backpressure_high_timeout_ms,
|
||||||
config.general.me_route_backpressure_high_watermark_pct,
|
config.general.me_route_backpressure_high_watermark_pct,
|
||||||
|
config.general.me_reader_route_data_wait_ms,
|
||||||
config.general.me_health_interval_ms_unhealthy,
|
config.general.me_health_interval_ms_unhealthy,
|
||||||
config.general.me_health_interval_ms_healthy,
|
config.general.me_health_interval_ms_healthy,
|
||||||
config.general.me_warn_rate_limit_ms,
|
config.general.me_warn_rate_limit_ms,
|
||||||
@@ -1559,6 +1571,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
cfg.general.me_route_backpressure_base_timeout_ms,
|
cfg.general.me_route_backpressure_base_timeout_ms,
|
||||||
cfg.general.me_route_backpressure_high_timeout_ms,
|
cfg.general.me_route_backpressure_high_timeout_ms,
|
||||||
cfg.general.me_route_backpressure_high_watermark_pct,
|
cfg.general.me_route_backpressure_high_watermark_pct,
|
||||||
|
cfg.general.me_reader_route_data_wait_ms,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1781,9 +1794,11 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
if config.general.use_middle_proxy {
|
if config.general.use_middle_proxy {
|
||||||
if let Some(pool) = me_pool.as_ref() {
|
if let Some(pool) = me_pool.as_ref() {
|
||||||
let initial_open = pool.admission_ready_conditional_cast().await;
|
let fallback_after = Duration::from_secs(6);
|
||||||
admission_tx.send_replace(initial_open);
|
let initial_ready = pool.admission_ready_conditional_cast().await;
|
||||||
if initial_open {
|
admission_tx.send_replace(initial_ready);
|
||||||
|
let _ = route_runtime.set_mode(RelayRouteMode::Middle);
|
||||||
|
if initial_ready {
|
||||||
info!("Conditional-admission gate: open (ME pool ready)");
|
info!("Conditional-admission gate: open (ME pool ready)");
|
||||||
} else {
|
} else {
|
||||||
warn!("Conditional-admission gate: closed (ME pool is not ready)");
|
warn!("Conditional-admission gate: closed (ME pool is not ready)");
|
||||||
@@ -1791,12 +1806,18 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
let pool_for_gate = pool.clone();
|
let pool_for_gate = pool.clone();
|
||||||
let admission_tx_gate = admission_tx.clone();
|
let admission_tx_gate = admission_tx.clone();
|
||||||
|
let route_runtime_gate = route_runtime.clone();
|
||||||
let mut config_rx_gate = config_rx.clone();
|
let mut config_rx_gate = config_rx.clone();
|
||||||
let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1);
|
let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1);
|
||||||
|
let mut fallback_enabled = config.general.me2dc_fallback;
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut gate_open = initial_open;
|
let mut gate_open = initial_ready;
|
||||||
let mut open_streak = if initial_open { 1u32 } else { 0u32 };
|
let mut route_mode = RelayRouteMode::Middle;
|
||||||
let mut close_streak = if initial_open { 0u32 } else { 1u32 };
|
let mut not_ready_since = if initial_ready {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(Instant::now())
|
||||||
|
};
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
changed = config_rx_gate.changed() => {
|
changed = config_rx_gate.changed() => {
|
||||||
@@ -1805,42 +1826,70 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
}
|
}
|
||||||
let cfg = config_rx_gate.borrow_and_update().clone();
|
let cfg = config_rx_gate.borrow_and_update().clone();
|
||||||
admission_poll_ms = cfg.general.me_admission_poll_ms.max(1);
|
admission_poll_ms = cfg.general.me_admission_poll_ms.max(1);
|
||||||
|
fallback_enabled = cfg.general.me2dc_fallback;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
_ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {}
|
_ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {}
|
||||||
}
|
}
|
||||||
let ready = pool_for_gate.admission_ready_conditional_cast().await;
|
let ready = pool_for_gate.admission_ready_conditional_cast().await;
|
||||||
if ready {
|
let now = Instant::now();
|
||||||
open_streak = open_streak.saturating_add(1);
|
let (next_gate_open, next_route_mode, next_fallback_active) = if ready {
|
||||||
close_streak = 0;
|
not_ready_since = None;
|
||||||
if !gate_open && open_streak >= 2 {
|
(true, RelayRouteMode::Middle, false)
|
||||||
gate_open = true;
|
|
||||||
admission_tx_gate.send_replace(true);
|
|
||||||
info!(
|
|
||||||
open_streak,
|
|
||||||
"Conditional-admission gate opened (ME pool recovered)"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
close_streak = close_streak.saturating_add(1);
|
let not_ready_started_at = *not_ready_since.get_or_insert(now);
|
||||||
open_streak = 0;
|
let not_ready_for = now.saturating_duration_since(not_ready_started_at);
|
||||||
if gate_open && close_streak >= 2 {
|
if fallback_enabled && not_ready_for > fallback_after {
|
||||||
gate_open = false;
|
(true, RelayRouteMode::Direct, true)
|
||||||
admission_tx_gate.send_replace(false);
|
} else {
|
||||||
warn!(
|
(false, RelayRouteMode::Middle, false)
|
||||||
close_streak,
|
}
|
||||||
"Conditional-admission gate closed (ME pool has uncovered DC groups)"
|
};
|
||||||
);
|
|
||||||
|
if next_route_mode != route_mode {
|
||||||
|
route_mode = next_route_mode;
|
||||||
|
if let Some(snapshot) = route_runtime_gate.set_mode(route_mode) {
|
||||||
|
if matches!(route_mode, RelayRouteMode::Middle) {
|
||||||
|
info!(
|
||||||
|
target_mode = route_mode.as_str(),
|
||||||
|
cutover_generation = snapshot.generation,
|
||||||
|
"Middle-End routing restored for new sessions"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
warn!(
|
||||||
|
target_mode = route_mode.as_str(),
|
||||||
|
cutover_generation = snapshot.generation,
|
||||||
|
grace_secs = fallback_after.as_secs(),
|
||||||
|
"ME pool stayed not-ready beyond grace; routing new sessions via Direct-DC"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if next_gate_open != gate_open {
|
||||||
|
gate_open = next_gate_open;
|
||||||
|
admission_tx_gate.send_replace(gate_open);
|
||||||
|
if gate_open {
|
||||||
|
if next_fallback_active {
|
||||||
|
warn!("Conditional-admission gate opened in ME fallback mode");
|
||||||
|
} else {
|
||||||
|
info!("Conditional-admission gate opened (ME pool ready)");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warn!("Conditional-admission gate closed (ME pool is not ready)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
admission_tx.send_replace(false);
|
admission_tx.send_replace(false);
|
||||||
|
let _ = route_runtime.set_mode(RelayRouteMode::Direct);
|
||||||
warn!("Conditional-admission gate: closed (ME pool is unavailable)");
|
warn!("Conditional-admission gate: closed (ME pool is unavailable)");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
admission_tx.send_replace(true);
|
admission_tx.send_replace(true);
|
||||||
|
let _ = route_runtime.set_mode(RelayRouteMode::Direct);
|
||||||
}
|
}
|
||||||
let _admission_tx_hold = admission_tx;
|
let _admission_tx_hold = admission_tx;
|
||||||
|
|
||||||
@@ -1884,6 +1933,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
let buffer_pool = buffer_pool.clone();
|
let buffer_pool = buffer_pool.clone();
|
||||||
let rng = rng.clone();
|
let rng = rng.clone();
|
||||||
let me_pool = me_pool.clone();
|
let me_pool = me_pool.clone();
|
||||||
|
let route_runtime = route_runtime.clone();
|
||||||
let tls_cache = tls_cache.clone();
|
let tls_cache = tls_cache.clone();
|
||||||
let ip_tracker = ip_tracker.clone();
|
let ip_tracker = ip_tracker.clone();
|
||||||
let beobachten = beobachten.clone();
|
let beobachten = beobachten.clone();
|
||||||
@@ -1916,6 +1966,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
let buffer_pool = buffer_pool.clone();
|
let buffer_pool = buffer_pool.clone();
|
||||||
let rng = rng.clone();
|
let rng = rng.clone();
|
||||||
let me_pool = me_pool.clone();
|
let me_pool = me_pool.clone();
|
||||||
|
let route_runtime = route_runtime.clone();
|
||||||
let tls_cache = tls_cache.clone();
|
let tls_cache = tls_cache.clone();
|
||||||
let ip_tracker = ip_tracker.clone();
|
let ip_tracker = ip_tracker.clone();
|
||||||
let beobachten = beobachten.clone();
|
let beobachten = beobachten.clone();
|
||||||
@@ -1926,7 +1977,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
if let Err(e) = crate::proxy::client::handle_client_stream(
|
if let Err(e) = crate::proxy::client::handle_client_stream(
|
||||||
stream, fake_peer, config, stats,
|
stream, fake_peer, config, stats,
|
||||||
upstream_manager, replay_checker, buffer_pool, rng,
|
upstream_manager, replay_checker, buffer_pool, rng,
|
||||||
me_pool, tls_cache, ip_tracker, beobachten, proxy_protocol_enabled,
|
me_pool, route_runtime, tls_cache, ip_tracker, beobachten, proxy_protocol_enabled,
|
||||||
).await {
|
).await {
|
||||||
debug!(error = %e, "Unix socket connection error");
|
debug!(error = %e, "Unix socket connection error");
|
||||||
}
|
}
|
||||||
@@ -2037,6 +2088,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
let buffer_pool = buffer_pool.clone();
|
let buffer_pool = buffer_pool.clone();
|
||||||
let rng = rng.clone();
|
let rng = rng.clone();
|
||||||
let me_pool = me_pool.clone();
|
let me_pool = me_pool.clone();
|
||||||
|
let route_runtime = route_runtime.clone();
|
||||||
let tls_cache = tls_cache.clone();
|
let tls_cache = tls_cache.clone();
|
||||||
let ip_tracker = ip_tracker.clone();
|
let ip_tracker = ip_tracker.clone();
|
||||||
let beobachten = beobachten.clone();
|
let beobachten = beobachten.clone();
|
||||||
@@ -2064,10 +2116,13 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
let buffer_pool = buffer_pool.clone();
|
let buffer_pool = buffer_pool.clone();
|
||||||
let rng = rng.clone();
|
let rng = rng.clone();
|
||||||
let me_pool = me_pool.clone();
|
let me_pool = me_pool.clone();
|
||||||
|
let route_runtime = route_runtime.clone();
|
||||||
let tls_cache = tls_cache.clone();
|
let tls_cache = tls_cache.clone();
|
||||||
let ip_tracker = ip_tracker.clone();
|
let ip_tracker = ip_tracker.clone();
|
||||||
let beobachten = beobachten.clone();
|
let beobachten = beobachten.clone();
|
||||||
let proxy_protocol_enabled = listener_proxy_protocol;
|
let proxy_protocol_enabled = listener_proxy_protocol;
|
||||||
|
let real_peer_report = Arc::new(std::sync::Mutex::new(None));
|
||||||
|
let real_peer_report_for_handler = real_peer_report.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let _permit = permit;
|
let _permit = permit;
|
||||||
@@ -2081,14 +2136,20 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
buffer_pool,
|
buffer_pool,
|
||||||
rng,
|
rng,
|
||||||
me_pool,
|
me_pool,
|
||||||
|
route_runtime,
|
||||||
tls_cache,
|
tls_cache,
|
||||||
ip_tracker,
|
ip_tracker,
|
||||||
beobachten,
|
beobachten,
|
||||||
proxy_protocol_enabled,
|
proxy_protocol_enabled,
|
||||||
|
real_peer_report_for_handler,
|
||||||
)
|
)
|
||||||
.run()
|
.run()
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
let real_peer = match real_peer_report.lock() {
|
||||||
|
Ok(guard) => *guard,
|
||||||
|
Err(_) => None,
|
||||||
|
};
|
||||||
let peer_closed = matches!(
|
let peer_closed = matches!(
|
||||||
&e,
|
&e,
|
||||||
crate::error::ProxyError::Io(ioe)
|
crate::error::ProxyError::Io(ioe)
|
||||||
@@ -2117,11 +2178,47 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||||||
&e,
|
&e,
|
||||||
crate::error::ProxyError::Proxy(msg) if msg == "ME connection lost"
|
crate::error::ProxyError::Proxy(msg) if msg == "ME connection lost"
|
||||||
);
|
);
|
||||||
|
let route_switched = matches!(
|
||||||
|
&e,
|
||||||
|
crate::error::ProxyError::Proxy(msg) if msg == ROUTE_SWITCH_ERROR_MSG
|
||||||
|
);
|
||||||
|
|
||||||
match (peer_closed, me_closed) {
|
match (peer_closed, me_closed) {
|
||||||
(true, _) => debug!(peer = %peer_addr, error = %e, "Connection closed by client"),
|
(true, _) => {
|
||||||
(_, true) => warn!(peer = %peer_addr, error = %e, "Connection closed: Middle-End dropped session"),
|
if let Some(real_peer) = real_peer {
|
||||||
_ => warn!(peer = %peer_addr, error = %e, "Connection closed with error"),
|
debug!(peer = %peer_addr, real_peer = %real_peer, error = %e, "Connection closed by client");
|
||||||
|
} else {
|
||||||
|
debug!(peer = %peer_addr, error = %e, "Connection closed by client");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(_, true) => {
|
||||||
|
if let Some(real_peer) = real_peer {
|
||||||
|
warn!(peer = %peer_addr, real_peer = %real_peer, error = %e, "Connection closed: Middle-End dropped session");
|
||||||
|
} else {
|
||||||
|
warn!(peer = %peer_addr, error = %e, "Connection closed: Middle-End dropped session");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ if route_switched => {
|
||||||
|
if let Some(real_peer) = real_peer {
|
||||||
|
info!(peer = %peer_addr, real_peer = %real_peer, error = %e, "Connection closed by controlled route cutover");
|
||||||
|
} else {
|
||||||
|
info!(peer = %peer_addr, error = %e, "Connection closed by controlled route cutover");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ if is_expected_handshake_eof(&e) => {
|
||||||
|
if let Some(real_peer) = real_peer {
|
||||||
|
info!(peer = %peer_addr, real_peer = %real_peer, error = %e, "Connection closed during initial handshake");
|
||||||
|
} else {
|
||||||
|
info!(peer = %peer_addr, error = %e, "Connection closed during initial handshake");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
if let Some(real_peer) = real_peer {
|
||||||
|
warn!(peer = %peer_addr, real_peer = %real_peer, error = %e, "Connection closed with error");
|
||||||
|
} else {
|
||||||
|
warn!(peer = %peer_addr, error = %e, "Connection closed with error");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -39,6 +39,7 @@ use crate::proxy::direct_relay::handle_via_direct;
|
|||||||
use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle_tls_handshake};
|
use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle_tls_handshake};
|
||||||
use crate::proxy::masking::handle_bad_client;
|
use crate::proxy::masking::handle_bad_client;
|
||||||
use crate::proxy::middle_relay::handle_via_middle_proxy;
|
use crate::proxy::middle_relay::handle_via_middle_proxy;
|
||||||
|
use crate::proxy::route_mode::{RelayRouteMode, RouteRuntimeController};
|
||||||
|
|
||||||
fn beobachten_ttl(config: &ProxyConfig) -> Duration {
|
fn beobachten_ttl(config: &ProxyConfig) -> Duration {
|
||||||
Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60))
|
Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60))
|
||||||
@@ -80,6 +81,7 @@ pub async fn handle_client_stream<S>(
|
|||||||
buffer_pool: Arc<BufferPool>,
|
buffer_pool: Arc<BufferPool>,
|
||||||
rng: Arc<SecureRandom>,
|
rng: Arc<SecureRandom>,
|
||||||
me_pool: Option<Arc<MePool>>,
|
me_pool: Option<Arc<MePool>>,
|
||||||
|
route_runtime: Arc<RouteRuntimeController>,
|
||||||
tls_cache: Option<Arc<TlsFrontCache>>,
|
tls_cache: Option<Arc<TlsFrontCache>>,
|
||||||
ip_tracker: Arc<UserIpTracker>,
|
ip_tracker: Arc<UserIpTracker>,
|
||||||
beobachten: Arc<BeobachtenStore>,
|
beobachten: Arc<BeobachtenStore>,
|
||||||
@@ -214,6 +216,7 @@ where
|
|||||||
RunningClientHandler::handle_authenticated_static(
|
RunningClientHandler::handle_authenticated_static(
|
||||||
crypto_reader, crypto_writer, success,
|
crypto_reader, crypto_writer, success,
|
||||||
upstream_manager, stats, config, buffer_pool, rng, me_pool,
|
upstream_manager, stats, config, buffer_pool, rng, me_pool,
|
||||||
|
route_runtime.clone(),
|
||||||
local_addr, real_peer, ip_tracker.clone(),
|
local_addr, real_peer, ip_tracker.clone(),
|
||||||
),
|
),
|
||||||
)))
|
)))
|
||||||
@@ -274,6 +277,7 @@ where
|
|||||||
buffer_pool,
|
buffer_pool,
|
||||||
rng,
|
rng,
|
||||||
me_pool,
|
me_pool,
|
||||||
|
route_runtime.clone(),
|
||||||
local_addr,
|
local_addr,
|
||||||
real_peer,
|
real_peer,
|
||||||
ip_tracker.clone(),
|
ip_tracker.clone(),
|
||||||
@@ -317,6 +321,8 @@ pub struct ClientHandler;
|
|||||||
pub struct RunningClientHandler {
|
pub struct RunningClientHandler {
|
||||||
stream: TcpStream,
|
stream: TcpStream,
|
||||||
peer: SocketAddr,
|
peer: SocketAddr,
|
||||||
|
real_peer_from_proxy: Option<SocketAddr>,
|
||||||
|
real_peer_report: Arc<std::sync::Mutex<Option<SocketAddr>>>,
|
||||||
config: Arc<ProxyConfig>,
|
config: Arc<ProxyConfig>,
|
||||||
stats: Arc<Stats>,
|
stats: Arc<Stats>,
|
||||||
replay_checker: Arc<ReplayChecker>,
|
replay_checker: Arc<ReplayChecker>,
|
||||||
@@ -324,6 +330,7 @@ pub struct RunningClientHandler {
|
|||||||
buffer_pool: Arc<BufferPool>,
|
buffer_pool: Arc<BufferPool>,
|
||||||
rng: Arc<SecureRandom>,
|
rng: Arc<SecureRandom>,
|
||||||
me_pool: Option<Arc<MePool>>,
|
me_pool: Option<Arc<MePool>>,
|
||||||
|
route_runtime: Arc<RouteRuntimeController>,
|
||||||
tls_cache: Option<Arc<TlsFrontCache>>,
|
tls_cache: Option<Arc<TlsFrontCache>>,
|
||||||
ip_tracker: Arc<UserIpTracker>,
|
ip_tracker: Arc<UserIpTracker>,
|
||||||
beobachten: Arc<BeobachtenStore>,
|
beobachten: Arc<BeobachtenStore>,
|
||||||
@@ -341,14 +348,19 @@ impl ClientHandler {
|
|||||||
buffer_pool: Arc<BufferPool>,
|
buffer_pool: Arc<BufferPool>,
|
||||||
rng: Arc<SecureRandom>,
|
rng: Arc<SecureRandom>,
|
||||||
me_pool: Option<Arc<MePool>>,
|
me_pool: Option<Arc<MePool>>,
|
||||||
|
route_runtime: Arc<RouteRuntimeController>,
|
||||||
tls_cache: Option<Arc<TlsFrontCache>>,
|
tls_cache: Option<Arc<TlsFrontCache>>,
|
||||||
ip_tracker: Arc<UserIpTracker>,
|
ip_tracker: Arc<UserIpTracker>,
|
||||||
beobachten: Arc<BeobachtenStore>,
|
beobachten: Arc<BeobachtenStore>,
|
||||||
proxy_protocol_enabled: bool,
|
proxy_protocol_enabled: bool,
|
||||||
|
real_peer_report: Arc<std::sync::Mutex<Option<SocketAddr>>>,
|
||||||
) -> RunningClientHandler {
|
) -> RunningClientHandler {
|
||||||
|
let normalized_peer = normalize_ip(peer);
|
||||||
RunningClientHandler {
|
RunningClientHandler {
|
||||||
stream,
|
stream,
|
||||||
peer,
|
peer: normalized_peer,
|
||||||
|
real_peer_from_proxy: None,
|
||||||
|
real_peer_report,
|
||||||
config,
|
config,
|
||||||
stats,
|
stats,
|
||||||
replay_checker,
|
replay_checker,
|
||||||
@@ -356,6 +368,7 @@ impl ClientHandler {
|
|||||||
buffer_pool,
|
buffer_pool,
|
||||||
rng,
|
rng,
|
||||||
me_pool,
|
me_pool,
|
||||||
|
route_runtime,
|
||||||
tls_cache,
|
tls_cache,
|
||||||
ip_tracker,
|
ip_tracker,
|
||||||
beobachten,
|
beobachten,
|
||||||
@@ -365,10 +378,8 @@ impl ClientHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RunningClientHandler {
|
impl RunningClientHandler {
|
||||||
pub async fn run(mut self) -> Result<()> {
|
pub async fn run(self) -> Result<()> {
|
||||||
self.stats.increment_connects_all();
|
self.stats.increment_connects_all();
|
||||||
|
|
||||||
self.peer = normalize_ip(self.peer);
|
|
||||||
let peer = self.peer;
|
let peer = self.peer;
|
||||||
let _ip_tracker = self.ip_tracker.clone();
|
let _ip_tracker = self.ip_tracker.clone();
|
||||||
debug!(peer = %peer, "New connection");
|
debug!(peer = %peer, "New connection");
|
||||||
@@ -441,6 +452,10 @@ impl RunningClientHandler {
|
|||||||
"PROXY protocol header parsed"
|
"PROXY protocol header parsed"
|
||||||
);
|
);
|
||||||
self.peer = normalize_ip(info.src_addr);
|
self.peer = normalize_ip(info.src_addr);
|
||||||
|
self.real_peer_from_proxy = Some(self.peer);
|
||||||
|
if let Ok(mut slot) = self.real_peer_report.lock() {
|
||||||
|
*slot = Some(self.peer);
|
||||||
|
}
|
||||||
if let Some(dst) = info.dst_addr {
|
if let Some(dst) = info.dst_addr {
|
||||||
local_addr = dst;
|
local_addr = dst;
|
||||||
}
|
}
|
||||||
@@ -597,6 +612,7 @@ impl RunningClientHandler {
|
|||||||
buffer_pool,
|
buffer_pool,
|
||||||
self.rng,
|
self.rng,
|
||||||
self.me_pool,
|
self.me_pool,
|
||||||
|
self.route_runtime.clone(),
|
||||||
local_addr,
|
local_addr,
|
||||||
peer,
|
peer,
|
||||||
self.ip_tracker,
|
self.ip_tracker,
|
||||||
@@ -677,6 +693,7 @@ impl RunningClientHandler {
|
|||||||
buffer_pool,
|
buffer_pool,
|
||||||
self.rng,
|
self.rng,
|
||||||
self.me_pool,
|
self.me_pool,
|
||||||
|
self.route_runtime.clone(),
|
||||||
local_addr,
|
local_addr,
|
||||||
peer,
|
peer,
|
||||||
self.ip_tracker,
|
self.ip_tracker,
|
||||||
@@ -698,6 +715,7 @@ impl RunningClientHandler {
|
|||||||
buffer_pool: Arc<BufferPool>,
|
buffer_pool: Arc<BufferPool>,
|
||||||
rng: Arc<SecureRandom>,
|
rng: Arc<SecureRandom>,
|
||||||
me_pool: Option<Arc<MePool>>,
|
me_pool: Option<Arc<MePool>>,
|
||||||
|
route_runtime: Arc<RouteRuntimeController>,
|
||||||
local_addr: SocketAddr,
|
local_addr: SocketAddr,
|
||||||
peer_addr: SocketAddr,
|
peer_addr: SocketAddr,
|
||||||
ip_tracker: Arc<UserIpTracker>,
|
ip_tracker: Arc<UserIpTracker>,
|
||||||
@@ -713,7 +731,11 @@ impl RunningClientHandler {
|
|||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
let relay_result = if config.general.use_middle_proxy {
|
let route_snapshot = route_runtime.snapshot();
|
||||||
|
let session_id = rng.u64();
|
||||||
|
let relay_result = if config.general.use_middle_proxy
|
||||||
|
&& matches!(route_snapshot.mode, RelayRouteMode::Middle)
|
||||||
|
{
|
||||||
if let Some(ref pool) = me_pool {
|
if let Some(ref pool) = me_pool {
|
||||||
handle_via_middle_proxy(
|
handle_via_middle_proxy(
|
||||||
client_reader,
|
client_reader,
|
||||||
@@ -725,6 +747,9 @@ impl RunningClientHandler {
|
|||||||
buffer_pool,
|
buffer_pool,
|
||||||
local_addr,
|
local_addr,
|
||||||
rng,
|
rng,
|
||||||
|
route_runtime.subscribe(),
|
||||||
|
route_snapshot,
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
} else {
|
} else {
|
||||||
@@ -738,6 +763,9 @@ impl RunningClientHandler {
|
|||||||
config,
|
config,
|
||||||
buffer_pool,
|
buffer_pool,
|
||||||
rng,
|
rng,
|
||||||
|
route_runtime.subscribe(),
|
||||||
|
route_snapshot,
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
@@ -752,6 +780,9 @@ impl RunningClientHandler {
|
|||||||
config,
|
config,
|
||||||
buffer_pool,
|
buffer_pool,
|
||||||
rng,
|
rng,
|
||||||
|
route_runtime.subscribe(),
|
||||||
|
route_snapshot,
|
||||||
|
session_id,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -5,14 +5,19 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
|
use tokio::sync::watch;
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
use crate::config::ProxyConfig;
|
use crate::config::ProxyConfig;
|
||||||
use crate::crypto::SecureRandom;
|
use crate::crypto::SecureRandom;
|
||||||
use crate::error::Result;
|
use crate::error::{ProxyError, Result};
|
||||||
use crate::protocol::constants::*;
|
use crate::protocol::constants::*;
|
||||||
use crate::proxy::handshake::{HandshakeSuccess, encrypt_tg_nonce_with_ciphers, generate_tg_nonce};
|
use crate::proxy::handshake::{HandshakeSuccess, encrypt_tg_nonce_with_ciphers, generate_tg_nonce};
|
||||||
use crate::proxy::relay::relay_bidirectional;
|
use crate::proxy::relay::relay_bidirectional;
|
||||||
|
use crate::proxy::route_mode::{
|
||||||
|
RelayRouteMode, RouteCutoverState, ROUTE_SWITCH_ERROR_MSG, affected_cutover_state,
|
||||||
|
cutover_stagger_delay,
|
||||||
|
};
|
||||||
use crate::stats::Stats;
|
use crate::stats::Stats;
|
||||||
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
|
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
|
||||||
use crate::transport::UpstreamManager;
|
use crate::transport::UpstreamManager;
|
||||||
@@ -26,6 +31,9 @@ pub(crate) async fn handle_via_direct<R, W>(
|
|||||||
config: Arc<ProxyConfig>,
|
config: Arc<ProxyConfig>,
|
||||||
buffer_pool: Arc<BufferPool>,
|
buffer_pool: Arc<BufferPool>,
|
||||||
rng: Arc<SecureRandom>,
|
rng: Arc<SecureRandom>,
|
||||||
|
mut route_rx: watch::Receiver<RouteCutoverState>,
|
||||||
|
route_snapshot: RouteCutoverState,
|
||||||
|
session_id: u64,
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
where
|
||||||
R: AsyncRead + Unpin + Send + 'static,
|
R: AsyncRead + Unpin + Send + 'static,
|
||||||
@@ -64,11 +72,41 @@ where
|
|||||||
client_writer,
|
client_writer,
|
||||||
tg_reader,
|
tg_reader,
|
||||||
tg_writer,
|
tg_writer,
|
||||||
|
config.general.direct_relay_copy_buf_c2s_bytes,
|
||||||
|
config.general.direct_relay_copy_buf_s2c_bytes,
|
||||||
user,
|
user,
|
||||||
Arc::clone(&stats),
|
Arc::clone(&stats),
|
||||||
buffer_pool,
|
buffer_pool,
|
||||||
)
|
);
|
||||||
.await;
|
tokio::pin!(relay_result);
|
||||||
|
let relay_result = loop {
|
||||||
|
if let Some(cutover) = affected_cutover_state(
|
||||||
|
&route_rx,
|
||||||
|
RelayRouteMode::Direct,
|
||||||
|
route_snapshot.generation,
|
||||||
|
) {
|
||||||
|
let delay = cutover_stagger_delay(session_id, cutover.generation);
|
||||||
|
warn!(
|
||||||
|
user = %user,
|
||||||
|
target_mode = cutover.mode.as_str(),
|
||||||
|
cutover_generation = cutover.generation,
|
||||||
|
delay_ms = delay.as_millis() as u64,
|
||||||
|
"Cutover affected direct session, closing client connection"
|
||||||
|
);
|
||||||
|
tokio::time::sleep(delay).await;
|
||||||
|
break Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
|
||||||
|
}
|
||||||
|
tokio::select! {
|
||||||
|
result = &mut relay_result => {
|
||||||
|
break result;
|
||||||
|
}
|
||||||
|
changed = route_rx.changed() => {
|
||||||
|
if changed.is_err() {
|
||||||
|
break relay_result.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
stats.decrement_current_connections_direct();
|
stats.decrement_current_connections_direct();
|
||||||
stats.decrement_user_curr_connects(user);
|
stats.decrement_user_curr_connects(user);
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ use std::time::{Duration, Instant};
|
|||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot, watch};
|
||||||
use tracing::{debug, trace, warn};
|
use tracing::{debug, trace, warn};
|
||||||
|
|
||||||
use crate::config::ProxyConfig;
|
use crate::config::ProxyConfig;
|
||||||
@@ -16,6 +16,10 @@ use crate::crypto::SecureRandom;
|
|||||||
use crate::error::{ProxyError, Result};
|
use crate::error::{ProxyError, Result};
|
||||||
use crate::protocol::constants::{*, secure_padding_len};
|
use crate::protocol::constants::{*, secure_padding_len};
|
||||||
use crate::proxy::handshake::HandshakeSuccess;
|
use crate::proxy::handshake::HandshakeSuccess;
|
||||||
|
use crate::proxy::route_mode::{
|
||||||
|
RelayRouteMode, RouteCutoverState, ROUTE_SWITCH_ERROR_MSG, affected_cutover_state,
|
||||||
|
cutover_stagger_delay,
|
||||||
|
};
|
||||||
use crate::stats::Stats;
|
use crate::stats::Stats;
|
||||||
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
|
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
|
||||||
use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag};
|
use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag};
|
||||||
@@ -30,6 +34,8 @@ const DESYNC_ERROR_CLASS: &str = "frame_too_large_crypto_desync";
|
|||||||
const C2ME_CHANNEL_CAPACITY_FALLBACK: usize = 128;
|
const C2ME_CHANNEL_CAPACITY_FALLBACK: usize = 128;
|
||||||
const C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS: usize = 64;
|
const C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS: usize = 64;
|
||||||
const C2ME_SENDER_FAIRNESS_BUDGET: usize = 32;
|
const C2ME_SENDER_FAIRNESS_BUDGET: usize = 32;
|
||||||
|
const ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN: usize = 1;
|
||||||
|
const ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN: usize = 4096;
|
||||||
static DESYNC_DEDUP: OnceLock<Mutex<HashMap<u64, Instant>>> = OnceLock::new();
|
static DESYNC_DEDUP: OnceLock<Mutex<HashMap<u64, Instant>>> = OnceLock::new();
|
||||||
|
|
||||||
struct RelayForensicsState {
|
struct RelayForensicsState {
|
||||||
@@ -44,6 +50,31 @@ struct RelayForensicsState {
|
|||||||
desync_all_full: bool,
|
desync_all_full: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
|
struct MeD2cFlushPolicy {
|
||||||
|
max_frames: usize,
|
||||||
|
max_bytes: usize,
|
||||||
|
max_delay: Duration,
|
||||||
|
ack_flush_immediate: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MeD2cFlushPolicy {
|
||||||
|
fn from_config(config: &ProxyConfig) -> Self {
|
||||||
|
Self {
|
||||||
|
max_frames: config
|
||||||
|
.general
|
||||||
|
.me_d2c_flush_batch_max_frames
|
||||||
|
.max(ME_D2C_FLUSH_BATCH_MAX_FRAMES_MIN),
|
||||||
|
max_bytes: config
|
||||||
|
.general
|
||||||
|
.me_d2c_flush_batch_max_bytes
|
||||||
|
.max(ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN),
|
||||||
|
max_delay: Duration::from_micros(config.general.me_d2c_flush_batch_max_delay_us),
|
||||||
|
ack_flush_immediate: config.general.me_d2c_ack_flush_immediate,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn hash_value<T: Hash>(value: &T) -> u64 {
|
fn hash_value<T: Hash>(value: &T) -> u64 {
|
||||||
let mut hasher = DefaultHasher::new();
|
let mut hasher = DefaultHasher::new();
|
||||||
value.hash(&mut hasher);
|
value.hash(&mut hasher);
|
||||||
@@ -201,6 +232,9 @@ pub(crate) async fn handle_via_middle_proxy<R, W>(
|
|||||||
_buffer_pool: Arc<BufferPool>,
|
_buffer_pool: Arc<BufferPool>,
|
||||||
local_addr: SocketAddr,
|
local_addr: SocketAddr,
|
||||||
rng: Arc<SecureRandom>,
|
rng: Arc<SecureRandom>,
|
||||||
|
mut route_rx: watch::Receiver<RouteCutoverState>,
|
||||||
|
route_snapshot: RouteCutoverState,
|
||||||
|
session_id: u64,
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
where
|
||||||
R: AsyncRead + Unpin + Send + 'static,
|
R: AsyncRead + Unpin + Send + 'static,
|
||||||
@@ -240,6 +274,27 @@ where
|
|||||||
stats.increment_user_curr_connects(&user);
|
stats.increment_user_curr_connects(&user);
|
||||||
stats.increment_current_connections_me();
|
stats.increment_current_connections_me();
|
||||||
|
|
||||||
|
if let Some(cutover) = affected_cutover_state(
|
||||||
|
&route_rx,
|
||||||
|
RelayRouteMode::Middle,
|
||||||
|
route_snapshot.generation,
|
||||||
|
) {
|
||||||
|
let delay = cutover_stagger_delay(session_id, cutover.generation);
|
||||||
|
warn!(
|
||||||
|
conn_id,
|
||||||
|
target_mode = cutover.mode.as_str(),
|
||||||
|
cutover_generation = cutover.generation,
|
||||||
|
delay_ms = delay.as_millis() as u64,
|
||||||
|
"Cutover affected middle session before relay start, closing client connection"
|
||||||
|
);
|
||||||
|
tokio::time::sleep(delay).await;
|
||||||
|
let _ = me_pool.send_close(conn_id).await;
|
||||||
|
me_pool.registry().unregister(conn_id).await;
|
||||||
|
stats.decrement_current_connections_me();
|
||||||
|
stats.decrement_user_curr_connects(&user);
|
||||||
|
return Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
// Per-user ad_tag from access.user_ad_tags; fallback to general.ad_tag (hot-reloadable)
|
// Per-user ad_tag from access.user_ad_tags; fallback to general.ad_tag (hot-reloadable)
|
||||||
let user_tag: Option<Vec<u8>> = config
|
let user_tag: Option<Vec<u8>> = config
|
||||||
.access
|
.access
|
||||||
@@ -313,71 +368,152 @@ where
|
|||||||
let rng_clone = rng.clone();
|
let rng_clone = rng.clone();
|
||||||
let user_clone = user.clone();
|
let user_clone = user.clone();
|
||||||
let bytes_me2c_clone = bytes_me2c.clone();
|
let bytes_me2c_clone = bytes_me2c.clone();
|
||||||
|
let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config);
|
||||||
let me_writer = tokio::spawn(async move {
|
let me_writer = tokio::spawn(async move {
|
||||||
let mut writer = crypto_writer;
|
let mut writer = crypto_writer;
|
||||||
let mut frame_buf = Vec::with_capacity(16 * 1024);
|
let mut frame_buf = Vec::with_capacity(16 * 1024);
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
msg = me_rx_task.recv() => {
|
msg = me_rx_task.recv() => {
|
||||||
match msg {
|
let Some(first) = msg else {
|
||||||
Some(MeResponse::Data { flags, data }) => {
|
debug!(conn_id, "ME channel closed");
|
||||||
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
|
return Err(ProxyError::Proxy("ME connection lost".into()));
|
||||||
bytes_me2c_clone.fetch_add(data.len() as u64, Ordering::Relaxed);
|
};
|
||||||
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
|
|
||||||
write_client_payload(
|
|
||||||
&mut writer,
|
|
||||||
proto_tag,
|
|
||||||
flags,
|
|
||||||
&data,
|
|
||||||
rng_clone.as_ref(),
|
|
||||||
&mut frame_buf,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// Drain all immediately queued ME responses and flush once.
|
let mut batch_frames = 0usize;
|
||||||
while let Ok(next) = me_rx_task.try_recv() {
|
let mut batch_bytes = 0usize;
|
||||||
match next {
|
let mut flush_immediately = false;
|
||||||
MeResponse::Data { flags, data } => {
|
|
||||||
trace!(conn_id, bytes = data.len(), flags, "ME->C data (batched)");
|
match process_me_writer_response(
|
||||||
bytes_me2c_clone.fetch_add(data.len() as u64, Ordering::Relaxed);
|
first,
|
||||||
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
|
&mut writer,
|
||||||
write_client_payload(
|
proto_tag,
|
||||||
&mut writer,
|
rng_clone.as_ref(),
|
||||||
proto_tag,
|
&mut frame_buf,
|
||||||
flags,
|
stats_clone.as_ref(),
|
||||||
&data,
|
&user_clone,
|
||||||
rng_clone.as_ref(),
|
bytes_me2c_clone.as_ref(),
|
||||||
&mut frame_buf,
|
conn_id,
|
||||||
).await?;
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
|
false,
|
||||||
|
).await? {
|
||||||
|
MeWriterResponseOutcome::Continue { frames, bytes, flush_immediately: immediate } => {
|
||||||
|
batch_frames = batch_frames.saturating_add(frames);
|
||||||
|
batch_bytes = batch_bytes.saturating_add(bytes);
|
||||||
|
flush_immediately = immediate;
|
||||||
|
}
|
||||||
|
MeWriterResponseOutcome::Close => {
|
||||||
|
let _ = writer.flush().await;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
while !flush_immediately
|
||||||
|
&& batch_frames < d2c_flush_policy.max_frames
|
||||||
|
&& batch_bytes < d2c_flush_policy.max_bytes
|
||||||
|
{
|
||||||
|
let Ok(next) = me_rx_task.try_recv() else {
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
|
||||||
|
match process_me_writer_response(
|
||||||
|
next,
|
||||||
|
&mut writer,
|
||||||
|
proto_tag,
|
||||||
|
rng_clone.as_ref(),
|
||||||
|
&mut frame_buf,
|
||||||
|
stats_clone.as_ref(),
|
||||||
|
&user_clone,
|
||||||
|
bytes_me2c_clone.as_ref(),
|
||||||
|
conn_id,
|
||||||
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
|
true,
|
||||||
|
).await? {
|
||||||
|
MeWriterResponseOutcome::Continue { frames, bytes, flush_immediately: immediate } => {
|
||||||
|
batch_frames = batch_frames.saturating_add(frames);
|
||||||
|
batch_bytes = batch_bytes.saturating_add(bytes);
|
||||||
|
flush_immediately |= immediate;
|
||||||
|
}
|
||||||
|
MeWriterResponseOutcome::Close => {
|
||||||
|
let _ = writer.flush().await;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !flush_immediately
|
||||||
|
&& !d2c_flush_policy.max_delay.is_zero()
|
||||||
|
&& batch_frames < d2c_flush_policy.max_frames
|
||||||
|
&& batch_bytes < d2c_flush_policy.max_bytes
|
||||||
|
{
|
||||||
|
match tokio::time::timeout(d2c_flush_policy.max_delay, me_rx_task.recv()).await {
|
||||||
|
Ok(Some(next)) => {
|
||||||
|
match process_me_writer_response(
|
||||||
|
next,
|
||||||
|
&mut writer,
|
||||||
|
proto_tag,
|
||||||
|
rng_clone.as_ref(),
|
||||||
|
&mut frame_buf,
|
||||||
|
stats_clone.as_ref(),
|
||||||
|
&user_clone,
|
||||||
|
bytes_me2c_clone.as_ref(),
|
||||||
|
conn_id,
|
||||||
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
|
true,
|
||||||
|
).await? {
|
||||||
|
MeWriterResponseOutcome::Continue { frames, bytes, flush_immediately: immediate } => {
|
||||||
|
batch_frames = batch_frames.saturating_add(frames);
|
||||||
|
batch_bytes = batch_bytes.saturating_add(bytes);
|
||||||
|
flush_immediately |= immediate;
|
||||||
}
|
}
|
||||||
MeResponse::Ack(confirm) => {
|
MeWriterResponseOutcome::Close => {
|
||||||
trace!(conn_id, confirm, "ME->C quickack (batched)");
|
|
||||||
write_client_ack(&mut writer, proto_tag, confirm).await?;
|
|
||||||
}
|
|
||||||
MeResponse::Close => {
|
|
||||||
debug!(conn_id, "ME sent close (batched)");
|
|
||||||
let _ = writer.flush().await;
|
let _ = writer.flush().await;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
writer.flush().await.map_err(ProxyError::Io)?;
|
while !flush_immediately
|
||||||
}
|
&& batch_frames < d2c_flush_policy.max_frames
|
||||||
Some(MeResponse::Ack(confirm)) => {
|
&& batch_bytes < d2c_flush_policy.max_bytes
|
||||||
trace!(conn_id, confirm, "ME->C quickack");
|
{
|
||||||
write_client_ack(&mut writer, proto_tag, confirm).await?;
|
let Ok(extra) = me_rx_task.try_recv() else {
|
||||||
}
|
break;
|
||||||
Some(MeResponse::Close) => {
|
};
|
||||||
debug!(conn_id, "ME sent close");
|
|
||||||
let _ = writer.flush().await;
|
match process_me_writer_response(
|
||||||
return Ok(());
|
extra,
|
||||||
}
|
&mut writer,
|
||||||
None => {
|
proto_tag,
|
||||||
debug!(conn_id, "ME channel closed");
|
rng_clone.as_ref(),
|
||||||
return Err(ProxyError::Proxy("ME connection lost".into()));
|
&mut frame_buf,
|
||||||
|
stats_clone.as_ref(),
|
||||||
|
&user_clone,
|
||||||
|
bytes_me2c_clone.as_ref(),
|
||||||
|
conn_id,
|
||||||
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
|
true,
|
||||||
|
).await? {
|
||||||
|
MeWriterResponseOutcome::Continue { frames, bytes, flush_immediately: immediate } => {
|
||||||
|
batch_frames = batch_frames.saturating_add(frames);
|
||||||
|
batch_bytes = batch_bytes.saturating_add(bytes);
|
||||||
|
flush_immediately |= immediate;
|
||||||
|
}
|
||||||
|
MeWriterResponseOutcome::Close => {
|
||||||
|
let _ = writer.flush().await;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
debug!(conn_id, "ME channel closed");
|
||||||
|
return Err(ProxyError::Proxy("ME connection lost".into()));
|
||||||
|
}
|
||||||
|
Err(_) => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
writer.flush().await.map_err(ProxyError::Io)?;
|
||||||
}
|
}
|
||||||
_ = &mut stop_rx => {
|
_ = &mut stop_rx => {
|
||||||
debug!(conn_id, "ME writer stop signal");
|
debug!(conn_id, "ME writer stop signal");
|
||||||
@@ -390,46 +526,75 @@ where
|
|||||||
let mut main_result: Result<()> = Ok(());
|
let mut main_result: Result<()> = Ok(());
|
||||||
let mut client_closed = false;
|
let mut client_closed = false;
|
||||||
let mut frame_counter: u64 = 0;
|
let mut frame_counter: u64 = 0;
|
||||||
|
let mut route_watch_open = true;
|
||||||
loop {
|
loop {
|
||||||
match read_client_payload(
|
if let Some(cutover) = affected_cutover_state(
|
||||||
&mut crypto_reader,
|
&route_rx,
|
||||||
proto_tag,
|
RelayRouteMode::Middle,
|
||||||
frame_limit,
|
route_snapshot.generation,
|
||||||
&forensics,
|
) {
|
||||||
&mut frame_counter,
|
let delay = cutover_stagger_delay(session_id, cutover.generation);
|
||||||
&stats,
|
warn!(
|
||||||
).await {
|
conn_id,
|
||||||
Ok(Some((payload, quickack))) => {
|
target_mode = cutover.mode.as_str(),
|
||||||
trace!(conn_id, bytes = payload.len(), "C->ME frame");
|
cutover_generation = cutover.generation,
|
||||||
forensics.bytes_c2me = forensics
|
delay_ms = delay.as_millis() as u64,
|
||||||
.bytes_c2me
|
"Cutover affected middle session, closing client connection"
|
||||||
.saturating_add(payload.len() as u64);
|
);
|
||||||
stats.add_user_octets_from(&user, payload.len() as u64);
|
tokio::time::sleep(delay).await;
|
||||||
let mut flags = proto_flags;
|
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
|
||||||
if quickack {
|
main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
|
||||||
flags |= RPC_FLAG_QUICKACK;
|
break;
|
||||||
}
|
}
|
||||||
if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) {
|
|
||||||
flags |= RPC_FLAG_NOT_ENCRYPTED;
|
tokio::select! {
|
||||||
}
|
changed = route_rx.changed(), if route_watch_open => {
|
||||||
// Keep client read loop lightweight: route heavy ME send path via a dedicated task.
|
if changed.is_err() {
|
||||||
if enqueue_c2me_command(&c2me_tx, C2MeCommand::Data { payload, flags })
|
route_watch_open = false;
|
||||||
.await
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
main_result = Err(ProxyError::Proxy("ME sender channel closed".into()));
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
payload_result = read_client_payload(
|
||||||
debug!(conn_id, "Client EOF");
|
&mut crypto_reader,
|
||||||
client_closed = true;
|
proto_tag,
|
||||||
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
|
frame_limit,
|
||||||
break;
|
&forensics,
|
||||||
}
|
&mut frame_counter,
|
||||||
Err(e) => {
|
&stats,
|
||||||
main_result = Err(e);
|
) => {
|
||||||
break;
|
match payload_result {
|
||||||
|
Ok(Some((payload, quickack))) => {
|
||||||
|
trace!(conn_id, bytes = payload.len(), "C->ME frame");
|
||||||
|
forensics.bytes_c2me = forensics
|
||||||
|
.bytes_c2me
|
||||||
|
.saturating_add(payload.len() as u64);
|
||||||
|
stats.add_user_octets_from(&user, payload.len() as u64);
|
||||||
|
let mut flags = proto_flags;
|
||||||
|
if quickack {
|
||||||
|
flags |= RPC_FLAG_QUICKACK;
|
||||||
|
}
|
||||||
|
if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) {
|
||||||
|
flags |= RPC_FLAG_NOT_ENCRYPTED;
|
||||||
|
}
|
||||||
|
// Keep client read loop lightweight: route heavy ME send path via a dedicated task.
|
||||||
|
if enqueue_c2me_command(&c2me_tx, C2MeCommand::Data { payload, flags })
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
main_result = Err(ProxyError::Proxy("ME sender channel closed".into()));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
debug!(conn_id, "Client EOF");
|
||||||
|
client_closed = true;
|
||||||
|
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
main_result = Err(e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -587,6 +752,81 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum MeWriterResponseOutcome {
|
||||||
|
Continue {
|
||||||
|
frames: usize,
|
||||||
|
bytes: usize,
|
||||||
|
flush_immediately: bool,
|
||||||
|
},
|
||||||
|
Close,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_me_writer_response<W>(
|
||||||
|
response: MeResponse,
|
||||||
|
client_writer: &mut CryptoWriter<W>,
|
||||||
|
proto_tag: ProtoTag,
|
||||||
|
rng: &SecureRandom,
|
||||||
|
frame_buf: &mut Vec<u8>,
|
||||||
|
stats: &Stats,
|
||||||
|
user: &str,
|
||||||
|
bytes_me2c: &AtomicU64,
|
||||||
|
conn_id: u64,
|
||||||
|
ack_flush_immediate: bool,
|
||||||
|
batched: bool,
|
||||||
|
) -> Result<MeWriterResponseOutcome>
|
||||||
|
where
|
||||||
|
W: AsyncWrite + Unpin + Send + 'static,
|
||||||
|
{
|
||||||
|
match response {
|
||||||
|
MeResponse::Data { flags, data } => {
|
||||||
|
if batched {
|
||||||
|
trace!(conn_id, bytes = data.len(), flags, "ME->C data (batched)");
|
||||||
|
} else {
|
||||||
|
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
|
||||||
|
}
|
||||||
|
bytes_me2c.fetch_add(data.len() as u64, Ordering::Relaxed);
|
||||||
|
stats.add_user_octets_to(user, data.len() as u64);
|
||||||
|
write_client_payload(
|
||||||
|
client_writer,
|
||||||
|
proto_tag,
|
||||||
|
flags,
|
||||||
|
&data,
|
||||||
|
rng,
|
||||||
|
frame_buf,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(MeWriterResponseOutcome::Continue {
|
||||||
|
frames: 1,
|
||||||
|
bytes: data.len(),
|
||||||
|
flush_immediately: false,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
MeResponse::Ack(confirm) => {
|
||||||
|
if batched {
|
||||||
|
trace!(conn_id, confirm, "ME->C quickack (batched)");
|
||||||
|
} else {
|
||||||
|
trace!(conn_id, confirm, "ME->C quickack");
|
||||||
|
}
|
||||||
|
write_client_ack(client_writer, proto_tag, confirm).await?;
|
||||||
|
|
||||||
|
Ok(MeWriterResponseOutcome::Continue {
|
||||||
|
frames: 1,
|
||||||
|
bytes: 4,
|
||||||
|
flush_immediately: ack_flush_immediate,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
MeResponse::Close => {
|
||||||
|
if batched {
|
||||||
|
debug!(conn_id, "ME sent close (batched)");
|
||||||
|
} else {
|
||||||
|
debug!(conn_id, "ME sent close");
|
||||||
|
}
|
||||||
|
Ok(MeWriterResponseOutcome::Close)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn write_client_payload<W>(
|
async fn write_client_payload<W>(
|
||||||
client_writer: &mut CryptoWriter<W>,
|
client_writer: &mut CryptoWriter<W>,
|
||||||
proto_tag: ProtoTag,
|
proto_tag: ProtoTag,
|
||||||
@@ -696,9 +936,7 @@ where
|
|||||||
client_writer
|
client_writer
|
||||||
.write_all(&bytes)
|
.write_all(&bytes)
|
||||||
.await
|
.await
|
||||||
.map_err(ProxyError::Io)?;
|
.map_err(ProxyError::Io)
|
||||||
// ACK should remain low-latency.
|
|
||||||
client_writer.flush().await.map_err(ProxyError::Io)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ pub mod direct_relay;
|
|||||||
pub mod handshake;
|
pub mod handshake;
|
||||||
pub mod masking;
|
pub mod masking;
|
||||||
pub mod middle_relay;
|
pub mod middle_relay;
|
||||||
|
pub mod route_mode;
|
||||||
pub mod relay;
|
pub mod relay;
|
||||||
|
|
||||||
pub use client::ClientHandler;
|
pub use client::ClientHandler;
|
||||||
|
|||||||
@@ -57,7 +57,9 @@ use std::sync::Arc;
|
|||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf, copy_bidirectional};
|
use tokio::io::{
|
||||||
|
AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf, copy_bidirectional_with_sizes,
|
||||||
|
};
|
||||||
use tokio::time::Instant;
|
use tokio::time::Instant;
|
||||||
use tracing::{debug, trace, warn};
|
use tracing::{debug, trace, warn};
|
||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
@@ -296,9 +298,8 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
|
|||||||
///
|
///
|
||||||
/// ## API compatibility
|
/// ## API compatibility
|
||||||
///
|
///
|
||||||
/// Signature is identical to the previous implementation. The `_buffer_pool`
|
/// The `_buffer_pool` parameter is retained for call-site compatibility.
|
||||||
/// parameter is retained for call-site compatibility — `copy_bidirectional`
|
/// Effective relay copy buffers are configured by `c2s_buf_size` / `s2c_buf_size`.
|
||||||
/// manages its own internal buffers (8 KB per direction).
|
|
||||||
///
|
///
|
||||||
/// ## Guarantees preserved
|
/// ## Guarantees preserved
|
||||||
///
|
///
|
||||||
@@ -312,6 +313,8 @@ pub async fn relay_bidirectional<CR, CW, SR, SW>(
|
|||||||
client_writer: CW,
|
client_writer: CW,
|
||||||
server_reader: SR,
|
server_reader: SR,
|
||||||
server_writer: SW,
|
server_writer: SW,
|
||||||
|
c2s_buf_size: usize,
|
||||||
|
s2c_buf_size: usize,
|
||||||
user: &str,
|
user: &str,
|
||||||
stats: Arc<Stats>,
|
stats: Arc<Stats>,
|
||||||
_buffer_pool: Arc<BufferPool>,
|
_buffer_pool: Arc<BufferPool>,
|
||||||
@@ -402,7 +405,12 @@ where
|
|||||||
// When the watchdog fires, select! drops the copy future,
|
// When the watchdog fires, select! drops the copy future,
|
||||||
// releasing the &mut borrows on client and server.
|
// releasing the &mut borrows on client and server.
|
||||||
let copy_result = tokio::select! {
|
let copy_result = tokio::select! {
|
||||||
result = copy_bidirectional(&mut client, &mut server) => Some(result),
|
result = copy_bidirectional_with_sizes(
|
||||||
|
&mut client,
|
||||||
|
&mut server,
|
||||||
|
c2s_buf_size.max(1),
|
||||||
|
s2c_buf_size.max(1),
|
||||||
|
) => Some(result),
|
||||||
_ = watchdog => None, // Activity timeout — cancel relay
|
_ = watchdog => None, // Activity timeout — cancel relay
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -463,4 +471,4 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
117
src/proxy/route_mode.rs
Normal file
117
src/proxy/route_mode.rs
Normal file
@@ -0,0 +1,117 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
|
pub(crate) const ROUTE_SWITCH_ERROR_MSG: &str = "Route mode switched by cutover";
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||||
|
#[repr(u8)]
|
||||||
|
pub(crate) enum RelayRouteMode {
|
||||||
|
Direct = 0,
|
||||||
|
Middle = 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RelayRouteMode {
|
||||||
|
pub(crate) fn as_u8(self) -> u8 {
|
||||||
|
self as u8
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn from_u8(value: u8) -> Self {
|
||||||
|
match value {
|
||||||
|
1 => Self::Middle,
|
||||||
|
_ => Self::Direct,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn as_str(self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Self::Direct => "direct",
|
||||||
|
Self::Middle => "middle",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||||
|
pub(crate) struct RouteCutoverState {
|
||||||
|
pub mode: RelayRouteMode,
|
||||||
|
pub generation: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub(crate) struct RouteRuntimeController {
|
||||||
|
mode: Arc<AtomicU8>,
|
||||||
|
generation: Arc<AtomicU64>,
|
||||||
|
tx: watch::Sender<RouteCutoverState>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RouteRuntimeController {
|
||||||
|
pub(crate) fn new(initial_mode: RelayRouteMode) -> Self {
|
||||||
|
let initial = RouteCutoverState {
|
||||||
|
mode: initial_mode,
|
||||||
|
generation: 0,
|
||||||
|
};
|
||||||
|
let (tx, _rx) = watch::channel(initial);
|
||||||
|
Self {
|
||||||
|
mode: Arc::new(AtomicU8::new(initial_mode.as_u8())),
|
||||||
|
generation: Arc::new(AtomicU64::new(0)),
|
||||||
|
tx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn snapshot(&self) -> RouteCutoverState {
|
||||||
|
RouteCutoverState {
|
||||||
|
mode: RelayRouteMode::from_u8(self.mode.load(Ordering::Relaxed)),
|
||||||
|
generation: self.generation.load(Ordering::Relaxed),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn subscribe(&self) -> watch::Receiver<RouteCutoverState> {
|
||||||
|
self.tx.subscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn set_mode(&self, mode: RelayRouteMode) -> Option<RouteCutoverState> {
|
||||||
|
let previous = self.mode.swap(mode.as_u8(), Ordering::Relaxed);
|
||||||
|
if previous == mode.as_u8() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let generation = self.generation.fetch_add(1, Ordering::Relaxed) + 1;
|
||||||
|
let next = RouteCutoverState { mode, generation };
|
||||||
|
self.tx.send_replace(next);
|
||||||
|
Some(next)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn is_session_affected_by_cutover(
|
||||||
|
current: RouteCutoverState,
|
||||||
|
_session_mode: RelayRouteMode,
|
||||||
|
session_generation: u64,
|
||||||
|
) -> bool {
|
||||||
|
current.generation > session_generation
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn affected_cutover_state(
|
||||||
|
rx: &watch::Receiver<RouteCutoverState>,
|
||||||
|
session_mode: RelayRouteMode,
|
||||||
|
session_generation: u64,
|
||||||
|
) -> Option<RouteCutoverState> {
|
||||||
|
let current = *rx.borrow();
|
||||||
|
if is_session_affected_by_cutover(current, session_mode, session_generation) {
|
||||||
|
return Some(current);
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn cutover_stagger_delay(session_id: u64, generation: u64) -> Duration {
|
||||||
|
let mut value = session_id
|
||||||
|
^ generation.rotate_left(17)
|
||||||
|
^ 0x9e37_79b9_7f4a_7c15;
|
||||||
|
value ^= value >> 30;
|
||||||
|
value = value.wrapping_mul(0xbf58_476d_1ce4_e5b9);
|
||||||
|
value ^= value >> 27;
|
||||||
|
value = value.wrapping_mul(0x94d0_49bb_1331_11eb);
|
||||||
|
value ^= value >> 31;
|
||||||
|
let ms = 1000 + (value % 1000);
|
||||||
|
Duration::from_millis(ms)
|
||||||
|
}
|
||||||
@@ -183,6 +183,7 @@ pub struct MePool {
|
|||||||
pub(super) me_writer_pick_mode: AtomicU8,
|
pub(super) me_writer_pick_mode: AtomicU8,
|
||||||
pub(super) me_writer_pick_sample_size: AtomicU8,
|
pub(super) me_writer_pick_sample_size: AtomicU8,
|
||||||
pub(super) me_socks_kdf_policy: AtomicU8,
|
pub(super) me_socks_kdf_policy: AtomicU8,
|
||||||
|
pub(super) me_reader_route_data_wait_ms: Arc<AtomicU64>,
|
||||||
pub(super) me_route_no_writer_mode: AtomicU8,
|
pub(super) me_route_no_writer_mode: AtomicU8,
|
||||||
pub(super) me_route_no_writer_wait: Duration,
|
pub(super) me_route_no_writer_wait: Duration,
|
||||||
pub(super) me_route_inline_recovery_attempts: u32,
|
pub(super) me_route_inline_recovery_attempts: u32,
|
||||||
@@ -287,6 +288,7 @@ impl MePool {
|
|||||||
me_route_backpressure_base_timeout_ms: u64,
|
me_route_backpressure_base_timeout_ms: u64,
|
||||||
me_route_backpressure_high_timeout_ms: u64,
|
me_route_backpressure_high_timeout_ms: u64,
|
||||||
me_route_backpressure_high_watermark_pct: u8,
|
me_route_backpressure_high_watermark_pct: u8,
|
||||||
|
me_reader_route_data_wait_ms: u64,
|
||||||
me_health_interval_ms_unhealthy: u64,
|
me_health_interval_ms_unhealthy: u64,
|
||||||
me_health_interval_ms_healthy: u64,
|
me_health_interval_ms_healthy: u64,
|
||||||
me_warn_rate_limit_ms: u64,
|
me_warn_rate_limit_ms: u64,
|
||||||
@@ -460,6 +462,7 @@ impl MePool {
|
|||||||
me_writer_pick_mode: AtomicU8::new(me_writer_pick_mode.as_u8()),
|
me_writer_pick_mode: AtomicU8::new(me_writer_pick_mode.as_u8()),
|
||||||
me_writer_pick_sample_size: AtomicU8::new(me_writer_pick_sample_size.clamp(2, 4)),
|
me_writer_pick_sample_size: AtomicU8::new(me_writer_pick_sample_size.clamp(2, 4)),
|
||||||
me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()),
|
me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()),
|
||||||
|
me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)),
|
||||||
me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()),
|
me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()),
|
||||||
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
|
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
|
||||||
me_route_inline_recovery_attempts,
|
me_route_inline_recovery_attempts,
|
||||||
@@ -650,9 +653,12 @@ impl MePool {
|
|||||||
route_backpressure_base_timeout_ms: u64,
|
route_backpressure_base_timeout_ms: u64,
|
||||||
route_backpressure_high_timeout_ms: u64,
|
route_backpressure_high_timeout_ms: u64,
|
||||||
route_backpressure_high_watermark_pct: u8,
|
route_backpressure_high_watermark_pct: u8,
|
||||||
|
reader_route_data_wait_ms: u64,
|
||||||
) {
|
) {
|
||||||
self.me_socks_kdf_policy
|
self.me_socks_kdf_policy
|
||||||
.store(socks_kdf_policy.as_u8(), Ordering::Relaxed);
|
.store(socks_kdf_policy.as_u8(), Ordering::Relaxed);
|
||||||
|
self.me_reader_route_data_wait_ms
|
||||||
|
.store(reader_route_data_wait_ms, Ordering::Relaxed);
|
||||||
self.registry.update_route_backpressure_policy(
|
self.registry.update_route_backpressure_policy(
|
||||||
route_backpressure_base_timeout_ms,
|
route_backpressure_base_timeout_ms,
|
||||||
route_backpressure_high_timeout_ms,
|
route_backpressure_high_timeout_ms,
|
||||||
@@ -822,10 +828,29 @@ impl MePool {
|
|||||||
effective
|
effective
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Keeps per-contour (active/warm) writer budget bounded by CPU count.
|
||||||
|
// Baseline is 86 writers on the first core and +48 for each extra core.
|
||||||
|
fn adaptive_floor_cpu_budget_per_contour_cap(&self, cores: usize) -> usize {
|
||||||
|
const FIRST_CORE_WRITER_BUDGET: usize = 86;
|
||||||
|
const EXTRA_CORE_WRITER_BUDGET: usize = 48;
|
||||||
|
if cores == 0 {
|
||||||
|
return FIRST_CORE_WRITER_BUDGET;
|
||||||
|
}
|
||||||
|
FIRST_CORE_WRITER_BUDGET.saturating_add(
|
||||||
|
cores
|
||||||
|
.saturating_sub(1)
|
||||||
|
.saturating_mul(EXTRA_CORE_WRITER_BUDGET),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
pub(super) fn adaptive_floor_active_cap_configured_total(&self) -> usize {
|
pub(super) fn adaptive_floor_active_cap_configured_total(&self) -> usize {
|
||||||
let cores = self.adaptive_floor_effective_cpu_cores();
|
let cores = self.adaptive_floor_effective_cpu_cores();
|
||||||
let per_core_cap = cores.saturating_mul(self.adaptive_floor_max_active_writers_per_core());
|
let per_contour_budget = self.adaptive_floor_cpu_budget_per_contour_cap(cores);
|
||||||
let configured = per_core_cap.min(self.adaptive_floor_max_active_writers_global());
|
let configured = cores
|
||||||
|
.saturating_mul(self.adaptive_floor_max_active_writers_per_core())
|
||||||
|
.min(self.adaptive_floor_max_active_writers_global())
|
||||||
|
.min(per_contour_budget)
|
||||||
|
.max(1);
|
||||||
self.me_adaptive_floor_active_cap_configured
|
self.me_adaptive_floor_active_cap_configured
|
||||||
.store(configured as u64, Ordering::Relaxed);
|
.store(configured as u64, Ordering::Relaxed);
|
||||||
self.stats
|
self.stats
|
||||||
@@ -835,8 +860,12 @@ impl MePool {
|
|||||||
|
|
||||||
pub(super) fn adaptive_floor_warm_cap_configured_total(&self) -> usize {
|
pub(super) fn adaptive_floor_warm_cap_configured_total(&self) -> usize {
|
||||||
let cores = self.adaptive_floor_effective_cpu_cores();
|
let cores = self.adaptive_floor_effective_cpu_cores();
|
||||||
let per_core_cap = cores.saturating_mul(self.adaptive_floor_max_warm_writers_per_core());
|
let per_contour_budget = self.adaptive_floor_cpu_budget_per_contour_cap(cores);
|
||||||
let configured = per_core_cap.min(self.adaptive_floor_max_warm_writers_global());
|
let configured = cores
|
||||||
|
.saturating_mul(self.adaptive_floor_max_warm_writers_per_core())
|
||||||
|
.min(self.adaptive_floor_max_warm_writers_global())
|
||||||
|
.min(per_contour_budget)
|
||||||
|
.max(1);
|
||||||
self.me_adaptive_floor_warm_cap_configured
|
self.me_adaptive_floor_warm_cap_configured
|
||||||
.store(configured as u64, Ordering::Relaxed);
|
.store(configured as u64, Ordering::Relaxed);
|
||||||
self.stats
|
self.stats
|
||||||
|
|||||||
@@ -208,6 +208,7 @@ impl MePool {
|
|||||||
let keepalive_jitter_signal = self.me_keepalive_jitter;
|
let keepalive_jitter_signal = self.me_keepalive_jitter;
|
||||||
let cancel_reader_token = cancel.clone();
|
let cancel_reader_token = cancel.clone();
|
||||||
let cancel_ping_token = cancel_ping.clone();
|
let cancel_ping_token = cancel_ping.clone();
|
||||||
|
let reader_route_data_wait_ms = self.me_reader_route_data_wait_ms.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let res = reader_loop(
|
let res = reader_loop(
|
||||||
@@ -225,6 +226,7 @@ impl MePool {
|
|||||||
writer_id,
|
writer_id,
|
||||||
degraded.clone(),
|
degraded.clone(),
|
||||||
rtt_ema_ms_x10.clone(),
|
rtt_ema_ms_x10.clone(),
|
||||||
|
reader_route_data_wait_ms,
|
||||||
cancel_reader_token.clone(),
|
cancel_reader_token.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::ErrorKind;
|
use std::io::ErrorKind;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
@@ -35,6 +35,7 @@ pub(crate) async fn reader_loop(
|
|||||||
_writer_id: u64,
|
_writer_id: u64,
|
||||||
degraded: Arc<AtomicBool>,
|
degraded: Arc<AtomicBool>,
|
||||||
writer_rtt_ema_ms_x10: Arc<AtomicU32>,
|
writer_rtt_ema_ms_x10: Arc<AtomicU32>,
|
||||||
|
reader_route_data_wait_ms: Arc<AtomicU64>,
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut raw = enc_leftover;
|
let mut raw = enc_leftover;
|
||||||
@@ -57,17 +58,14 @@ pub(crate) async fn reader_loop(
|
|||||||
|
|
||||||
let blocks = raw.len() / 16 * 16;
|
let blocks = raw.len() / 16 * 16;
|
||||||
if blocks > 0 {
|
if blocks > 0 {
|
||||||
|
let mut chunk = raw.split_to(blocks);
|
||||||
let mut new_iv = [0u8; 16];
|
let mut new_iv = [0u8; 16];
|
||||||
new_iv.copy_from_slice(&raw[blocks - 16..blocks]);
|
new_iv.copy_from_slice(&chunk[blocks - 16..blocks]);
|
||||||
|
|
||||||
let mut chunk = vec![0u8; blocks];
|
|
||||||
chunk.copy_from_slice(&raw[..blocks]);
|
|
||||||
AesCbc::new(dk, div)
|
AesCbc::new(dk, div)
|
||||||
.decrypt_in_place(&mut chunk)
|
.decrypt_in_place(&mut chunk[..])
|
||||||
.map_err(|e| ProxyError::Crypto(format!("{e}")))?;
|
.map_err(|e| ProxyError::Crypto(format!("{e}")))?;
|
||||||
div = new_iv;
|
div = new_iv;
|
||||||
dec.extend_from_slice(&chunk);
|
dec.extend_from_slice(&chunk);
|
||||||
let _ = raw.split_to(blocks);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
while dec.len() >= 12 {
|
while dec.len() >= 12 {
|
||||||
@@ -85,7 +83,7 @@ pub(crate) async fn reader_loop(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
let frame = dec.split_to(fl);
|
let frame = dec.split_to(fl).freeze();
|
||||||
let pe = fl - 4;
|
let pe = fl - 4;
|
||||||
let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap());
|
let ec = u32::from_le_bytes(frame[pe..pe + 4].try_into().unwrap());
|
||||||
let actual_crc = rpc_crc(crc_mode, &frame[..pe]);
|
let actual_crc = rpc_crc(crc_mode, &frame[..pe]);
|
||||||
@@ -111,21 +109,27 @@ pub(crate) async fn reader_loop(
|
|||||||
}
|
}
|
||||||
expected_seq = expected_seq.wrapping_add(1);
|
expected_seq = expected_seq.wrapping_add(1);
|
||||||
|
|
||||||
let payload = &frame[8..pe];
|
let payload = frame.slice(8..pe);
|
||||||
if payload.len() < 4 {
|
if payload.len() < 4 {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let pt = u32::from_le_bytes(payload[0..4].try_into().unwrap());
|
let pt = u32::from_le_bytes(payload[0..4].try_into().unwrap());
|
||||||
let body = &payload[4..];
|
let body = payload.slice(4..);
|
||||||
|
|
||||||
if pt == RPC_PROXY_ANS_U32 && body.len() >= 12 {
|
if pt == RPC_PROXY_ANS_U32 && body.len() >= 12 {
|
||||||
let flags = u32::from_le_bytes(body[0..4].try_into().unwrap());
|
let flags = u32::from_le_bytes(body[0..4].try_into().unwrap());
|
||||||
let cid = u64::from_le_bytes(body[4..12].try_into().unwrap());
|
let cid = u64::from_le_bytes(body[4..12].try_into().unwrap());
|
||||||
let data = Bytes::copy_from_slice(&body[12..]);
|
let data = body.slice(12..);
|
||||||
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
|
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
|
||||||
|
|
||||||
let routed = reg.route_nowait(cid, MeResponse::Data { flags, data }).await;
|
let data_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
|
||||||
|
let routed = if data_wait_ms == 0 {
|
||||||
|
reg.route_nowait(cid, MeResponse::Data { flags, data }).await
|
||||||
|
} else {
|
||||||
|
reg.route_with_timeout(cid, MeResponse::Data { flags, data }, data_wait_ms)
|
||||||
|
.await
|
||||||
|
};
|
||||||
if !matches!(routed, RouteResult::Routed) {
|
if !matches!(routed, RouteResult::Routed) {
|
||||||
match routed {
|
match routed {
|
||||||
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),
|
RouteResult::NoConn => stats.increment_me_route_drop_no_conn(),
|
||||||
|
|||||||
@@ -231,6 +231,57 @@ impl ConnRegistry {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn route_with_timeout(
|
||||||
|
&self,
|
||||||
|
id: u64,
|
||||||
|
resp: MeResponse,
|
||||||
|
timeout_ms: u64,
|
||||||
|
) -> RouteResult {
|
||||||
|
if timeout_ms == 0 {
|
||||||
|
return self.route_nowait(id, resp).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let tx = {
|
||||||
|
let inner = self.inner.read().await;
|
||||||
|
inner.map.get(&id).cloned()
|
||||||
|
};
|
||||||
|
|
||||||
|
let Some(tx) = tx else {
|
||||||
|
return RouteResult::NoConn;
|
||||||
|
};
|
||||||
|
|
||||||
|
match tx.try_send(resp) {
|
||||||
|
Ok(()) => RouteResult::Routed,
|
||||||
|
Err(TrySendError::Closed(_)) => RouteResult::ChannelClosed,
|
||||||
|
Err(TrySendError::Full(resp)) => {
|
||||||
|
let high_watermark_pct = self
|
||||||
|
.route_backpressure_high_watermark_pct
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
.clamp(1, 100);
|
||||||
|
let used = self.route_channel_capacity.saturating_sub(tx.capacity());
|
||||||
|
let used_pct = if self.route_channel_capacity == 0 {
|
||||||
|
100
|
||||||
|
} else {
|
||||||
|
(used.saturating_mul(100) / self.route_channel_capacity) as u8
|
||||||
|
};
|
||||||
|
let high_profile = used_pct >= high_watermark_pct;
|
||||||
|
let timeout_dur = Duration::from_millis(timeout_ms.max(1));
|
||||||
|
|
||||||
|
match tokio::time::timeout(timeout_dur, tx.send(resp)).await {
|
||||||
|
Ok(Ok(())) => RouteResult::Routed,
|
||||||
|
Ok(Err(_)) => RouteResult::ChannelClosed,
|
||||||
|
Err(_) => {
|
||||||
|
if high_profile {
|
||||||
|
RouteResult::QueueFullHigh
|
||||||
|
} else {
|
||||||
|
RouteResult::QueueFullBase
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn bind_writer(
|
pub async fn bind_writer(
|
||||||
&self,
|
&self,
|
||||||
conn_id: u64,
|
conn_id: u64,
|
||||||
|
|||||||
Reference in New Issue
Block a user