mirror of
https://github.com/telemt/telemt.git
synced 2026-05-23 04:01:44 +03:00
Compare commits
11 Commits
456bcb9d61
...
cc229182fd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cc229182fd | ||
|
|
6d5a1a29df | ||
|
|
026ca5cc1d | ||
|
|
b11dec7f91 | ||
|
|
edd1405562 | ||
|
|
6748ed920e | ||
|
|
303b273c77 | ||
|
|
3bcc129b8d | ||
|
|
3ffbd294d2 | ||
|
|
ddeda8d914 | ||
|
|
06eb112efd |
@@ -219,6 +219,7 @@ This document lists all configuration keys accepted by `config.toml`.
|
||||
| [`ntp_servers`](#cfg-general-ntp_servers) | `String[]` | `["pool.ntp.org"]` |
|
||||
| [`auto_degradation_enabled`](#cfg-general-auto_degradation_enabled) | `bool` | `true` |
|
||||
| [`degradation_min_unavailable_dc_groups`](#cfg-general-degradation_min_unavailable_dc_groups) | `u8` | `2` |
|
||||
| [`rst_on_close`](#cfg-general-rst_on_close) | `"off"`, `"errors"`, or `"always"` | `"off"` |
|
||||
|
||||
## "cfg-general-data_path"
|
||||
- `data_path`
|
||||
@@ -1592,7 +1593,21 @@ This document lists all configuration keys accepted by `config.toml`.
|
||||
[general]
|
||||
degradation_min_unavailable_dc_groups = 2
|
||||
```
|
||||
## "cfg-general-rst_on_close"
|
||||
- `rst_on_close`
|
||||
- **Constraints / validation**: one of `"off"`, `"errors"`, `"always"`.
|
||||
- **Description**: Controls `SO_LINGER(0)` behaviour on accepted client TCP sockets.
|
||||
High-traffic proxy servers accumulate `FIN-WAIT-1` and orphaned sockets from connections that never complete the Telegram handshake (scanners, DPI probes, bots).
|
||||
This option allows sending an immediate `RST` instead of a graceful `FIN` for such connections, freeing kernel resources instantly.
|
||||
- `"off"` — default. Normal `FIN` on all closes; no behaviour change.
|
||||
- `"errors"` — `SO_LINGER(0)` is set on `accept()`. If the client successfully completes authentication, linger is cleared and the relay session closes gracefully with `FIN`. Connections closed before handshake completion (timeouts, bad crypto, scanners) send `RST`.
|
||||
- `"always"` — `SO_LINGER(0)` is set on `accept()` and never cleared. All closes send `RST` regardless of handshake outcome.
|
||||
- **Example**:
|
||||
|
||||
```toml
|
||||
[general]
|
||||
rst_on_close = "errors"
|
||||
```
|
||||
|
||||
# [general.modes]
|
||||
|
||||
@@ -1959,7 +1974,7 @@ This document lists all configuration keys accepted by `config.toml`.
|
||||
## "cfg-server-proxy_protocol_trusted_cidrs"
|
||||
- `proxy_protocol_trusted_cidrs`
|
||||
- **Constraints / validation**: `IpNetwork[]`.
|
||||
- If omitted, defaults to trust-all CIDRs (`0.0.0.0/0` and `::/0`).
|
||||
- If omitted, defaults to an empty list and incoming PROXY headers are rejected.
|
||||
- If explicitly set to an empty array, all PROXY headers are rejected.
|
||||
- **Description**: Trusted source CIDRs allowed to provide PROXY protocol headers (security control).
|
||||
- **Example**:
|
||||
@@ -3313,4 +3328,3 @@ If your backend or network is very bandwidth-constrained, reduce cap first. If p
|
||||
password = "secret"
|
||||
```
|
||||
|
||||
|
||||
|
||||
@@ -219,6 +219,7 @@
|
||||
| [`ntp_servers`](#cfg-general-ntp_servers) | `String[]` | `["pool.ntp.org"]` |
|
||||
| [`auto_degradation_enabled`](#cfg-general-auto_degradation_enabled) | `bool` | `true` |
|
||||
| [`degradation_min_unavailable_dc_groups`](#cfg-general-degradation_min_unavailable_dc_groups) | `u8` | `2` |
|
||||
| [`rst_on_close`](#cfg-general-rst_on_close) | `"off"`, `"errors"` или `"always"` | `"off"` |
|
||||
|
||||
## "cfg-general-data_path"
|
||||
- `data_path`
|
||||
@@ -1592,7 +1593,21 @@
|
||||
[general]
|
||||
degradation_min_unavailable_dc_groups = 2
|
||||
```
|
||||
## "cfg-general-rst_on_close"
|
||||
- `rst_on_close`
|
||||
- **Ограничения / валидация**: одно из `"off"`, `"errors"`, `"always"`.
|
||||
- **Описание**: Управляет поведением `SO_LINGER(0)` на принятых клиентских TCP-сокетах.
|
||||
На высоконагруженных прокси-серверах накапливаются `FIN-WAIT-1` и осиротевшие (orphan) сокеты от соединений, которые не завершают Telegram-рукопожатие (сканеры, DPI-зонды, боты).
|
||||
Эта опция позволяет отправлять немедленный `RST` вместо корректного `FIN` для таких соединений, мгновенно освобождая ресурсы ядра.
|
||||
- `"off"` — по умолчанию. Обычный `FIN` при закрытии всех соединений; поведение не меняется.
|
||||
- `"errors"` — `SO_LINGER(0)` устанавливается при `accept()`. Если клиент успешно проходит аутентификацию, linger сбрасывается и relay-сессия закрывается корректно через `FIN`. Соединения, закрытые до завершения рукопожатия (таймауты, ошибки крипто, сканеры), отправляют `RST`.
|
||||
- `"always"` — `SO_LINGER(0)` устанавливается при `accept()` и никогда не сбрасывается. Все закрытия отправляют `RST` независимо от результата рукопожатия.
|
||||
- **Пример**:
|
||||
|
||||
```toml
|
||||
[general]
|
||||
rst_on_close = "errors"
|
||||
```
|
||||
|
||||
# [general.modes]
|
||||
|
||||
|
||||
@@ -163,7 +163,7 @@ PING 10.10.10.1 (10.10.10.1) 56(84) bytes of data.
|
||||
---
|
||||
|
||||
## Step 2. Installing telemt on Server B (conditionally Netherlands)
|
||||
Installation and configuration are described [here](https://github.com/telemt/telemt/blob/main/docs/QUICK_START_GUIDE.ru.md) or [here](https://gitlab.com/An0nX/telemt-docker#-quick-start-docker-compose).\
|
||||
Installation and configuration are described [here](https://github.com/telemt/telemt/blob/main/docs/Quick_start/QUICK_START_GUIDE.en.md) or [here](https://gitlab.com/An0nX/telemt-docker#-quick-start-docker-compose).\
|
||||
It is assumed that telemt expects connections on port `443\tcp`.
|
||||
|
||||
In the telemt config, you must enable the `Proxy` protocol and restrict connections to it only through the tunnel.
|
||||
|
||||
@@ -166,7 +166,7 @@ PING 10.10.10.1 (10.10.10.1) 56(84) bytes of data.
|
||||
|
||||
## Шаг 2. Установка telemt на Сервере B (_условно Нидерланды_)
|
||||
|
||||
Установка и настройка описаны [здесь](https://github.com/telemt/telemt/blob/main/docs/QUICK_START_GUIDE.ru.md) или [здесь](https://gitlab.com/An0nX/telemt-docker#-quick-start-docker-compose).\
|
||||
Установка и настройка описаны [здесь](https://github.com/telemt/telemt/blob/main/docs/Quick_start/QUICK_START_GUIDE.ru.md) или [здесь](https://gitlab.com/An0nX/telemt-docker#-quick-start-docker-compose).\
|
||||
Подразумевается что telemt ожидает подключения на порту `443\tcp`.
|
||||
|
||||
В конфиге telemt необходимо включить протокол `Proxy` и ограничить подключения к нему только через туннель.
|
||||
|
||||
@@ -210,7 +210,7 @@ pub(crate) fn default_proxy_protocol_header_timeout_ms() -> u64 {
|
||||
}
|
||||
|
||||
pub(crate) fn default_proxy_protocol_trusted_cidrs() -> Vec<IpNetwork> {
|
||||
vec!["0.0.0.0/0".parse().unwrap(), "::/0".parse().unwrap()]
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
pub(crate) fn default_server_max_connections() -> u32 {
|
||||
|
||||
@@ -47,12 +47,18 @@ pub(crate) struct UserAuthEntry {
|
||||
|
||||
impl UserAuthSnapshot {
|
||||
fn from_users(users: &HashMap<String, String>) -> Result<Self> {
|
||||
// Keep runtime user ids stable across reloads so overload scans and
|
||||
// sticky hints do not depend on HashMap iteration order.
|
||||
let mut sorted_users: Vec<_> = users.iter().collect();
|
||||
sorted_users
|
||||
.sort_unstable_by(|(left, _), (right, _)| left.as_bytes().cmp(right.as_bytes()));
|
||||
|
||||
let mut entries = Vec::with_capacity(users.len());
|
||||
let mut by_name = HashMap::with_capacity(users.len());
|
||||
let mut sni_index = HashMap::with_capacity(users.len());
|
||||
let mut sni_initial_index = HashMap::with_capacity(users.len());
|
||||
|
||||
for (user, secret_hex) in users {
|
||||
for (user, secret_hex) in sorted_users {
|
||||
let decoded = hex::decode(secret_hex).map_err(|_| ProxyError::InvalidSecret {
|
||||
user: user.clone(),
|
||||
reason: "Must be 32 hex characters".to_string(),
|
||||
@@ -1734,10 +1740,7 @@ mod tests {
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
cfg_missing.server.proxy_protocol_trusted_cidrs,
|
||||
default_proxy_protocol_trusted_cidrs()
|
||||
);
|
||||
assert!(cfg_missing.server.proxy_protocol_trusted_cidrs.is_empty());
|
||||
|
||||
let cfg_explicit_empty: ProxyConfig = toml::from_str(
|
||||
r#"
|
||||
@@ -1758,6 +1761,46 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn runtime_user_auth_snapshot_order_is_stable_across_hashmap_insertion_orders() {
|
||||
let mut left_users = HashMap::new();
|
||||
left_users.insert(
|
||||
"beta".to_string(),
|
||||
"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb".to_string(),
|
||||
);
|
||||
left_users.insert(
|
||||
"alpha".to_string(),
|
||||
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(),
|
||||
);
|
||||
|
||||
let mut right_users = HashMap::new();
|
||||
right_users.insert(
|
||||
"alpha".to_string(),
|
||||
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(),
|
||||
);
|
||||
right_users.insert(
|
||||
"beta".to_string(),
|
||||
"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb".to_string(),
|
||||
);
|
||||
|
||||
let left_snapshot = UserAuthSnapshot::from_users(&left_users).unwrap();
|
||||
let right_snapshot = UserAuthSnapshot::from_users(&right_users).unwrap();
|
||||
|
||||
let left_names: Vec<_> = left_snapshot
|
||||
.entries()
|
||||
.iter()
|
||||
.map(|entry| entry.user.as_str())
|
||||
.collect();
|
||||
let right_names: Vec<_> = right_snapshot
|
||||
.entries()
|
||||
.iter()
|
||||
.map(|entry| entry.user.as_str())
|
||||
.collect();
|
||||
|
||||
assert_eq!(left_names, ["alpha", "beta"]);
|
||||
assert_eq!(left_names, right_names);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unknown_sni_action_parses_and_defaults_to_drop() {
|
||||
let cfg_default: ProxyConfig = toml::from_str(
|
||||
|
||||
@@ -159,6 +159,21 @@ impl MeBindStaleMode {
|
||||
}
|
||||
}
|
||||
|
||||
/// RST-on-close mode for accepted client sockets.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum RstOnCloseMode {
|
||||
/// Normal FIN on all closes (default, no behaviour change).
|
||||
#[default]
|
||||
Off,
|
||||
/// SO_LINGER(0) on accept; cleared after successful auth.
|
||||
/// Pre-handshake failures (scanners, DPI, timeouts) send RST;
|
||||
/// authenticated relay sessions close gracefully with FIN.
|
||||
Errors,
|
||||
/// SO_LINGER(0) on accept, never cleared — all closes send RST.
|
||||
Always,
|
||||
}
|
||||
|
||||
/// Middle-End writer floor policy mode.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
@@ -925,6 +940,14 @@ pub struct GeneralConfig {
|
||||
/// Minimum unavailable ME DC groups before degrading.
|
||||
#[serde(default = "default_degradation_min_unavailable_dc_groups")]
|
||||
pub degradation_min_unavailable_dc_groups: u8,
|
||||
|
||||
/// RST-on-close mode for accepted client sockets.
|
||||
/// `off` — normal FIN on all closes (default).
|
||||
/// `errors` — SO_LINGER(0) on accept, cleared after successful auth;
|
||||
/// pre-handshake failures send RST, relayed sessions close gracefully.
|
||||
/// `always` — SO_LINGER(0) on accept, never cleared; all closes send RST.
|
||||
#[serde(default)]
|
||||
pub rst_on_close: RstOnCloseMode,
|
||||
}
|
||||
|
||||
impl Default for GeneralConfig {
|
||||
@@ -1086,6 +1109,7 @@ impl Default for GeneralConfig {
|
||||
ntp_servers: default_ntp_servers(),
|
||||
auto_degradation_enabled: default_true(),
|
||||
degradation_min_unavailable_dc_groups: default_degradation_min_unavailable_dc_groups(),
|
||||
rst_on_close: RstOnCloseMode::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1363,9 +1387,8 @@ pub struct ServerConfig {
|
||||
|
||||
/// Trusted source CIDRs allowed to send incoming PROXY protocol headers.
|
||||
///
|
||||
/// If this field is omitted in config, it defaults to trust-all CIDRs
|
||||
/// (`0.0.0.0/0` and `::/0`). If it is explicitly set to an empty list,
|
||||
/// all PROXY protocol headers are rejected.
|
||||
/// If this field is omitted in config, it defaults to an empty list and
|
||||
/// all PROXY protocol headers are rejected until trusted CIDRs are set.
|
||||
#[serde(default = "default_proxy_protocol_trusted_cidrs")]
|
||||
pub proxy_protocol_trusted_cidrs: Vec<IpNetwork>,
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ use tokio::net::UnixListener;
|
||||
use tokio::sync::{Semaphore, watch};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::config::{ProxyConfig, RstOnCloseMode};
|
||||
use crate::crypto::SecureRandom;
|
||||
use crate::ip_tracker::UserIpTracker;
|
||||
use crate::proxy::ClientHandler;
|
||||
@@ -21,6 +21,7 @@ use crate::stats::{ReplayChecker, Stats};
|
||||
use crate::stream::BufferPool;
|
||||
use crate::tls_front::TlsFrontCache;
|
||||
use crate::transport::middle_proxy::MePool;
|
||||
use crate::transport::socket::set_linger_zero;
|
||||
use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes};
|
||||
|
||||
use super::helpers::{is_expected_handshake_eof, print_proxy_links};
|
||||
@@ -380,6 +381,15 @@ pub(crate) fn spawn_tcp_accept_loops(
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((stream, peer_addr)) => {
|
||||
let rst_mode = config_rx.borrow().general.rst_on_close;
|
||||
#[cfg(unix)]
|
||||
let raw_fd = {
|
||||
use std::os::unix::io::AsRawFd;
|
||||
stream.as_raw_fd()
|
||||
};
|
||||
if matches!(rst_mode, RstOnCloseMode::Errors | RstOnCloseMode::Always) {
|
||||
let _ = set_linger_zero(&stream);
|
||||
}
|
||||
if !*admission_rx_tcp.borrow() {
|
||||
debug!(peer = %peer_addr, "Admission gate closed, dropping connection");
|
||||
drop(stream);
|
||||
@@ -454,6 +464,9 @@ pub(crate) fn spawn_tcp_accept_loops(
|
||||
shared,
|
||||
proxy_protocol_enabled,
|
||||
real_peer_report_for_handler,
|
||||
#[cfg(unix)]
|
||||
raw_fd,
|
||||
rst_mode,
|
||||
)
|
||||
.run()
|
||||
.await
|
||||
|
||||
@@ -804,6 +804,9 @@ pub struct RunningClientHandler {
|
||||
beobachten: Arc<BeobachtenStore>,
|
||||
shared: Arc<ProxySharedState>,
|
||||
proxy_protocol_enabled: bool,
|
||||
#[cfg(unix)]
|
||||
raw_fd: std::os::unix::io::RawFd,
|
||||
rst_on_close: crate::config::RstOnCloseMode,
|
||||
}
|
||||
|
||||
impl ClientHandler {
|
||||
@@ -825,6 +828,11 @@ impl ClientHandler {
|
||||
proxy_protocol_enabled: bool,
|
||||
real_peer_report: Arc<std::sync::Mutex<Option<SocketAddr>>>,
|
||||
) -> RunningClientHandler {
|
||||
#[cfg(unix)]
|
||||
let raw_fd = {
|
||||
use std::os::unix::io::AsRawFd;
|
||||
stream.as_raw_fd()
|
||||
};
|
||||
Self::new_with_shared(
|
||||
stream,
|
||||
peer,
|
||||
@@ -842,6 +850,9 @@ impl ClientHandler {
|
||||
ProxySharedState::new(),
|
||||
proxy_protocol_enabled,
|
||||
real_peer_report,
|
||||
#[cfg(unix)]
|
||||
raw_fd,
|
||||
crate::config::RstOnCloseMode::Off,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -863,6 +874,8 @@ impl ClientHandler {
|
||||
shared: Arc<ProxySharedState>,
|
||||
proxy_protocol_enabled: bool,
|
||||
real_peer_report: Arc<std::sync::Mutex<Option<SocketAddr>>>,
|
||||
#[cfg(unix)] raw_fd: std::os::unix::io::RawFd,
|
||||
rst_on_close: crate::config::RstOnCloseMode,
|
||||
) -> RunningClientHandler {
|
||||
let normalized_peer = normalize_ip(peer);
|
||||
RunningClientHandler {
|
||||
@@ -883,6 +896,9 @@ impl ClientHandler {
|
||||
beobachten,
|
||||
shared,
|
||||
proxy_protocol_enabled,
|
||||
#[cfg(unix)]
|
||||
raw_fd,
|
||||
rst_on_close,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -901,6 +917,10 @@ impl RunningClientHandler {
|
||||
debug!(peer = %peer, error = %e, "Failed to configure client socket");
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
let raw_fd = self.raw_fd;
|
||||
let rst_on_close = self.rst_on_close;
|
||||
|
||||
let outcome = match self.do_handshake().await? {
|
||||
Some(outcome) => outcome,
|
||||
None => return Ok(()),
|
||||
@@ -908,7 +928,14 @@ impl RunningClientHandler {
|
||||
|
||||
// Phase 2: relay (WITHOUT handshake timeout — relay has its own activity timeouts)
|
||||
match outcome {
|
||||
HandshakeOutcome::NeedsRelay(fut) | HandshakeOutcome::NeedsMasking(fut) => fut.await,
|
||||
HandshakeOutcome::NeedsRelay(fut) => {
|
||||
#[cfg(unix)]
|
||||
if matches!(rst_on_close, crate::config::RstOnCloseMode::Errors) {
|
||||
let _ = crate::transport::socket::clear_linger_fd(raw_fd);
|
||||
}
|
||||
fut.await
|
||||
}
|
||||
HandshakeOutcome::NeedsMasking(fut) => fut.await,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -55,6 +55,7 @@ const STICKY_HINT_MAX_ENTRIES: usize = 65_536;
|
||||
const CANDIDATE_HINT_TRACK_CAP: usize = 64;
|
||||
const OVERLOAD_CANDIDATE_BUDGET_HINTED: usize = 16;
|
||||
const OVERLOAD_CANDIDATE_BUDGET_UNHINTED: usize = 8;
|
||||
const OVERLOAD_FULL_SCAN_USER_THRESHOLD: usize = CANDIDATE_HINT_TRACK_CAP;
|
||||
const RECENT_USER_RING_SCAN_LIMIT: usize = 32;
|
||||
|
||||
type HmacSha256 = Hmac<Sha256>;
|
||||
@@ -242,6 +243,9 @@ fn budget_for_validation(total_users: usize, overload: bool, has_hint: bool) ->
|
||||
if !overload {
|
||||
return total_users;
|
||||
}
|
||||
if total_users <= OVERLOAD_FULL_SCAN_USER_THRESHOLD {
|
||||
return total_users;
|
||||
}
|
||||
let cap = if has_hint {
|
||||
OVERLOAD_CANDIDATE_BUDGET_HINTED
|
||||
} else {
|
||||
@@ -250,6 +254,28 @@ fn budget_for_validation(total_users: usize, overload: bool, has_hint: bool) ->
|
||||
total_users.min(cap.max(1))
|
||||
}
|
||||
|
||||
// Rotate partial overload scans across larger snapshots so one truncated
|
||||
// validation window does not permanently starve the same cold users.
|
||||
fn candidate_scan_start_offset_in(
|
||||
shared: &ProxySharedState,
|
||||
peer_ip: IpAddr,
|
||||
total_users: usize,
|
||||
candidate_budget: usize,
|
||||
) -> usize {
|
||||
if total_users == 0 || candidate_budget >= total_users {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let seq = shared
|
||||
.handshake
|
||||
.auth_candidate_scan_seq
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
let mut hasher = shared.handshake.auth_probe_eviction_hasher.build_hasher();
|
||||
peer_ip.hash(&mut hasher);
|
||||
seq.hash(&mut hasher);
|
||||
hasher.finish() as usize % total_users
|
||||
}
|
||||
|
||||
fn parse_tls_auth_material(
|
||||
handshake: &[u8],
|
||||
ignore_time_skew: bool,
|
||||
@@ -1312,7 +1338,14 @@ where
|
||||
}
|
||||
|
||||
if !matched && !budget_exhausted {
|
||||
for idx in 0..snapshot.entries().len() {
|
||||
let fallback_start = candidate_scan_start_offset_in(
|
||||
shared,
|
||||
peer.ip(),
|
||||
snapshot.entries().len(),
|
||||
candidate_budget,
|
||||
);
|
||||
for offset in 0..snapshot.entries().len() {
|
||||
let idx = (fallback_start + offset) % snapshot.entries().len();
|
||||
let Some(user_id) = u32::try_from(idx).ok() else {
|
||||
break;
|
||||
};
|
||||
@@ -1679,7 +1712,14 @@ where
|
||||
}
|
||||
|
||||
if !matched && !budget_exhausted {
|
||||
for idx in 0..snapshot.entries().len() {
|
||||
let fallback_start = candidate_scan_start_offset_in(
|
||||
shared,
|
||||
peer.ip(),
|
||||
snapshot.entries().len(),
|
||||
candidate_budget,
|
||||
);
|
||||
for offset in 0..snapshot.entries().len() {
|
||||
let idx = (fallback_start + offset) % snapshot.entries().len();
|
||||
let Some(user_id) = u32::try_from(idx).ok() else {
|
||||
break;
|
||||
};
|
||||
|
||||
@@ -506,6 +506,40 @@ fn is_mask_target_local_listener_with_interfaces(
|
||||
local_addr: SocketAddr,
|
||||
resolved_override: Option<SocketAddr>,
|
||||
interface_ips: &[IpAddr],
|
||||
) -> bool {
|
||||
let resolved_candidates = resolved_override
|
||||
.as_ref()
|
||||
.map(std::slice::from_ref)
|
||||
.unwrap_or(&[]);
|
||||
is_mask_target_local_listener_candidates_with_interfaces(
|
||||
mask_host,
|
||||
mask_port,
|
||||
local_addr,
|
||||
resolved_candidates,
|
||||
interface_ips,
|
||||
)
|
||||
}
|
||||
|
||||
fn mask_ip_targets_local_listener(
|
||||
mask_ip: IpAddr,
|
||||
local_ip: IpAddr,
|
||||
interface_ips: &[IpAddr],
|
||||
) -> bool {
|
||||
let mask_ip = canonical_ip(mask_ip);
|
||||
if mask_ip == local_ip {
|
||||
return true;
|
||||
}
|
||||
|
||||
local_ip.is_unspecified()
|
||||
&& (mask_ip.is_loopback() || mask_ip.is_unspecified() || interface_ips.contains(&mask_ip))
|
||||
}
|
||||
|
||||
fn is_mask_target_local_listener_candidates_with_interfaces(
|
||||
mask_host: &str,
|
||||
mask_port: u16,
|
||||
local_addr: SocketAddr,
|
||||
resolved_candidates: &[SocketAddr],
|
||||
interface_ips: &[IpAddr],
|
||||
) -> bool {
|
||||
if mask_port != local_addr.port() {
|
||||
return false;
|
||||
@@ -514,31 +548,14 @@ fn is_mask_target_local_listener_with_interfaces(
|
||||
let local_ip = canonical_ip(local_addr.ip());
|
||||
let literal_mask_ip = parse_mask_host_ip_literal(mask_host).map(canonical_ip);
|
||||
|
||||
if let Some(addr) = resolved_override {
|
||||
let resolved_ip = canonical_ip(addr.ip());
|
||||
if resolved_ip == local_ip {
|
||||
return true;
|
||||
}
|
||||
|
||||
if local_ip.is_unspecified()
|
||||
&& (resolved_ip.is_loopback()
|
||||
|| resolved_ip.is_unspecified()
|
||||
|| interface_ips.contains(&resolved_ip))
|
||||
{
|
||||
for addr in resolved_candidates {
|
||||
if mask_ip_targets_local_listener(addr.ip(), local_ip, interface_ips) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(mask_ip) = literal_mask_ip {
|
||||
if mask_ip == local_ip {
|
||||
return true;
|
||||
}
|
||||
|
||||
if local_ip.is_unspecified()
|
||||
&& (mask_ip.is_loopback()
|
||||
|| mask_ip.is_unspecified()
|
||||
|| interface_ips.contains(&mask_ip))
|
||||
{
|
||||
if mask_ip_targets_local_listener(mask_ip, local_ip, interface_ips) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -572,21 +589,67 @@ async fn is_mask_target_local_listener_async(
|
||||
mask_port: u16,
|
||||
local_addr: SocketAddr,
|
||||
resolved_override: Option<SocketAddr>,
|
||||
) -> bool {
|
||||
let resolved_candidates = resolved_override
|
||||
.as_ref()
|
||||
.map(std::slice::from_ref)
|
||||
.unwrap_or(&[]);
|
||||
is_mask_target_local_listener_candidates_async(
|
||||
mask_host,
|
||||
mask_port,
|
||||
local_addr,
|
||||
resolved_candidates,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn is_mask_target_local_listener_candidates_async(
|
||||
mask_host: &str,
|
||||
mask_port: u16,
|
||||
local_addr: SocketAddr,
|
||||
resolved_candidates: &[SocketAddr],
|
||||
) -> bool {
|
||||
if mask_port != local_addr.port() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let interfaces = local_interface_ips_async().await;
|
||||
is_mask_target_local_listener_with_interfaces(
|
||||
is_mask_target_local_listener_candidates_with_interfaces(
|
||||
mask_host,
|
||||
mask_port,
|
||||
local_addr,
|
||||
resolved_override,
|
||||
resolved_candidates,
|
||||
&interfaces,
|
||||
)
|
||||
}
|
||||
|
||||
// Resolve hostnames through the same OS DNS path `TcpStream::connect` uses so
|
||||
// self-target rejection also catches loopback and local-interface hostnames.
|
||||
async fn resolve_mask_target_candidates(
|
||||
mask_host: &str,
|
||||
mask_port: u16,
|
||||
resolved_override: Option<SocketAddr>,
|
||||
) -> Vec<SocketAddr> {
|
||||
if let Some(addr) = resolved_override {
|
||||
return vec![addr];
|
||||
}
|
||||
|
||||
if parse_mask_host_ip_literal(mask_host).is_some() {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let mut resolved = Vec::new();
|
||||
if let Ok(addrs) = tokio::net::lookup_host((mask_host, mask_port)).await {
|
||||
for addr in addrs {
|
||||
if !resolved.contains(&addr) {
|
||||
resolved.push(addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
resolved
|
||||
}
|
||||
|
||||
fn masking_beobachten_ttl(config: &ProxyConfig) -> Duration {
|
||||
let minutes = config.general.beobachten_minutes;
|
||||
let clamped = minutes.clamp(1, 24 * 60);
|
||||
@@ -731,8 +794,15 @@ pub async fn handle_bad_client<R, W>(
|
||||
// Self-referential masking can create recursive proxy loops under
|
||||
// misconfiguration and leak distinguishable load spikes to adversaries.
|
||||
let resolved_mask_addr = resolve_socket_addr(mask_host, mask_port);
|
||||
if is_mask_target_local_listener_async(mask_host, mask_port, local_addr, resolved_mask_addr)
|
||||
.await
|
||||
let resolved_mask_candidates =
|
||||
resolve_mask_target_candidates(mask_host, mask_port, resolved_mask_addr).await;
|
||||
if is_mask_target_local_listener_candidates_async(
|
||||
mask_host,
|
||||
mask_port,
|
||||
local_addr,
|
||||
&resolved_mask_candidates,
|
||||
)
|
||||
.await
|
||||
{
|
||||
let outcome_started = Instant::now();
|
||||
debug!(
|
||||
|
||||
@@ -48,6 +48,7 @@ pub(crate) struct HandshakeSharedState {
|
||||
pub(crate) sticky_user_by_sni_hash: DashMap<u64, u32>,
|
||||
pub(crate) recent_user_ring: Box<[AtomicU32]>,
|
||||
pub(crate) recent_user_ring_seq: AtomicU64,
|
||||
pub(crate) auth_candidate_scan_seq: AtomicU64,
|
||||
pub(crate) auth_expensive_checks_total: AtomicU64,
|
||||
pub(crate) auth_budget_exhausted_total: AtomicU64,
|
||||
}
|
||||
@@ -86,6 +87,7 @@ impl ProxySharedState {
|
||||
.collect::<Vec<_>>()
|
||||
.into_boxed_slice(),
|
||||
recent_user_ring_seq: AtomicU64::new(0),
|
||||
auth_candidate_scan_seq: AtomicU64::new(0),
|
||||
auth_expensive_checks_total: AtomicU64::new(0),
|
||||
auth_budget_exhausted_total: AtomicU64::new(0),
|
||||
},
|
||||
|
||||
@@ -1146,9 +1146,9 @@ async fn tls_overload_budget_limits_candidate_scan_depth() {
|
||||
let mut config = ProxyConfig::default();
|
||||
config.access.users.clear();
|
||||
config.access.ignore_time_skew = true;
|
||||
for idx in 0..32u8 {
|
||||
for idx in 0..96u8 {
|
||||
config.access.users.insert(
|
||||
format!("user-{idx}"),
|
||||
format!("user-{idx:02}"),
|
||||
format!("{:032x}", u128::from(idx) + 1),
|
||||
);
|
||||
}
|
||||
@@ -1203,6 +1203,64 @@ async fn tls_overload_budget_limits_candidate_scan_depth() {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn tls_overload_full_scans_small_runtime_snapshot_to_preserve_cold_user_auth() {
|
||||
let mut config = ProxyConfig::default();
|
||||
config.access.users.clear();
|
||||
config.access.ignore_time_skew = true;
|
||||
for idx in 0..32u8 {
|
||||
config.access.users.insert(
|
||||
format!("user-{idx:02}"),
|
||||
format!("{:032x}", u128::from(idx) + 1),
|
||||
);
|
||||
}
|
||||
config.rebuild_runtime_user_auth().unwrap();
|
||||
|
||||
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
|
||||
let rng = SecureRandom::new();
|
||||
let shared = ProxySharedState::new();
|
||||
let now = Instant::now();
|
||||
{
|
||||
let mut saturation = shared.handshake.auth_probe_saturation.lock().unwrap();
|
||||
*saturation = Some(AuthProbeSaturationState {
|
||||
fail_streak: AUTH_PROBE_BACKOFF_START_FAILS,
|
||||
blocked_until: now + Duration::from_millis(200),
|
||||
last_seen: now,
|
||||
});
|
||||
}
|
||||
|
||||
let peer: SocketAddr = "198.51.100.214:44326".parse().unwrap();
|
||||
let mut secret = [0u8; 16];
|
||||
secret[15] = 32;
|
||||
let handshake = make_valid_tls_handshake(&secret, 0);
|
||||
|
||||
let result = handle_tls_handshake_with_shared(
|
||||
&handshake,
|
||||
tokio::io::empty(),
|
||||
tokio::io::sink(),
|
||||
peer,
|
||||
&config,
|
||||
&replay_checker,
|
||||
&rng,
|
||||
None,
|
||||
shared.as_ref(),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(
|
||||
matches!(result, HandshakeResult::Success(_)),
|
||||
"overload mode must still authenticate valid cold users when runtime snapshot stays small"
|
||||
);
|
||||
assert_eq!(
|
||||
shared
|
||||
.handshake
|
||||
.auth_expensive_checks_total
|
||||
.load(Ordering::Relaxed),
|
||||
32,
|
||||
"small saturated snapshots must remain fully scannable"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mtproto_runtime_snapshot_prefers_preferred_user_hint() {
|
||||
let mut config = ProxyConfig::default();
|
||||
@@ -1255,6 +1313,66 @@ async fn mtproto_runtime_snapshot_prefers_preferred_user_hint() {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mtproto_overload_full_scans_small_runtime_snapshot_to_preserve_cold_user_auth() {
|
||||
let mut config = ProxyConfig::default();
|
||||
config.general.modes.secure = true;
|
||||
config.access.users.clear();
|
||||
config.access.ignore_time_skew = true;
|
||||
for idx in 0..32u8 {
|
||||
config.access.users.insert(
|
||||
format!("user-{idx:02}"),
|
||||
format!("{:032x}", u128::from(idx) + 1),
|
||||
);
|
||||
}
|
||||
config.rebuild_runtime_user_auth().unwrap();
|
||||
|
||||
let shared = ProxySharedState::new();
|
||||
let now = Instant::now();
|
||||
{
|
||||
let mut saturation = shared.handshake.auth_probe_saturation.lock().unwrap();
|
||||
*saturation = Some(AuthProbeSaturationState {
|
||||
fail_streak: AUTH_PROBE_BACKOFF_START_FAILS,
|
||||
blocked_until: now + Duration::from_millis(200),
|
||||
last_seen: now,
|
||||
});
|
||||
}
|
||||
|
||||
let replay_checker = ReplayChecker::new(128, Duration::from_secs(60));
|
||||
let handshake = make_valid_mtproto_handshake(
|
||||
"00000000000000000000000000000020",
|
||||
ProtoTag::Secure,
|
||||
2,
|
||||
);
|
||||
let peer: SocketAddr = "198.51.100.215:44326".parse().unwrap();
|
||||
|
||||
let result = handle_mtproto_handshake_with_shared(
|
||||
&handshake,
|
||||
tokio::io::empty(),
|
||||
tokio::io::sink(),
|
||||
peer,
|
||||
&config,
|
||||
&replay_checker,
|
||||
false,
|
||||
None,
|
||||
shared.as_ref(),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(
|
||||
matches!(result, HandshakeResult::Success(_)),
|
||||
"overload mode must still authenticate valid direct MTProto users when runtime snapshot stays small"
|
||||
);
|
||||
assert_eq!(
|
||||
shared
|
||||
.handshake
|
||||
.auth_expensive_checks_total
|
||||
.load(Ordering::Relaxed),
|
||||
32,
|
||||
"small saturated MTProto snapshots must remain fully scannable"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn alpn_enforce_rejects_unsupported_client_alpn() {
|
||||
let secret = [0x33u8; 16];
|
||||
|
||||
@@ -88,6 +88,45 @@ async fn self_target_fallback_refuses_recursive_loopback_connect() {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn self_target_fallback_refuses_recursive_hostname_connect() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let local_addr = listener.local_addr().unwrap();
|
||||
let accept_task = tokio::spawn(async move {
|
||||
timeout(Duration::from_millis(120), listener.accept())
|
||||
.await
|
||||
.is_ok()
|
||||
});
|
||||
|
||||
let mut config = ProxyConfig::default();
|
||||
config.general.beobachten = false;
|
||||
config.censorship.mask = true;
|
||||
config.censorship.mask_unix_sock = None;
|
||||
config.censorship.mask_host = Some("localhost".to_string());
|
||||
config.censorship.mask_port = local_addr.port();
|
||||
config.censorship.mask_proxy_protocol = 0;
|
||||
|
||||
let peer: SocketAddr = "203.0.113.99:55099".parse().unwrap();
|
||||
let beobachten = BeobachtenStore::new();
|
||||
|
||||
handle_bad_client(
|
||||
tokio::io::empty(),
|
||||
tokio::io::sink(),
|
||||
b"GET /",
|
||||
peer,
|
||||
local_addr,
|
||||
&config,
|
||||
&beobachten,
|
||||
)
|
||||
.await;
|
||||
|
||||
let accepted = accept_task.await.unwrap();
|
||||
assert!(
|
||||
!accepted,
|
||||
"hostname self-target masking must fail closed without connecting to local listener"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn same_ip_different_port_still_forwards_to_mask_backend() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
|
||||
@@ -102,14 +102,29 @@ pub fn configure_client_socket(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set socket to send RST on close (for masking)
|
||||
#[allow(dead_code)]
|
||||
/// Set socket to send RST on close instead of FIN, eliminating
|
||||
/// FIN-WAIT-1 and orphan socket accumulation on high-churn workloads.
|
||||
pub fn set_linger_zero(stream: &TcpStream) -> Result<()> {
|
||||
let socket = socket2::SockRef::from(stream);
|
||||
socket.set_linger(Some(Duration::ZERO))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Restore default linger behaviour (graceful FIN) on a socket
|
||||
/// identified by its raw file descriptor. Safe to call after
|
||||
/// `TcpStream::into_split()` because the fd remains valid until
|
||||
/// both halves are dropped.
|
||||
#[cfg(unix)]
|
||||
pub fn clear_linger_fd(fd: std::os::unix::io::RawFd) -> Result<()> {
|
||||
use std::os::unix::io::BorrowedFd;
|
||||
// SAFETY: the fd is still open — the caller guarantees the
|
||||
// TcpStream (or its split halves) is alive.
|
||||
let borrowed = unsafe { BorrowedFd::borrow_raw(fd) };
|
||||
let socket = socket2::SockRef::from(&borrowed);
|
||||
socket.set_linger(None)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a new TCP socket for outgoing connections
|
||||
#[allow(dead_code)]
|
||||
pub fn create_outgoing_socket(addr: SocketAddr) -> Result<Socket> {
|
||||
|
||||
Reference in New Issue
Block a user