mirror of https://github.com/telemt/telemt.git
Merge pull request #677 from xaosproxy/feat/rst-on-close
feat: add configurable RST-on-close mode for client sockets
This commit is contained in:
commit
6d5a1a29df
|
|
@ -219,6 +219,7 @@ This document lists all configuration keys accepted by `config.toml`.
|
|||
| [`ntp_servers`](#cfg-general-ntp_servers) | `String[]` | `["pool.ntp.org"]` |
|
||||
| [`auto_degradation_enabled`](#cfg-general-auto_degradation_enabled) | `bool` | `true` |
|
||||
| [`degradation_min_unavailable_dc_groups`](#cfg-general-degradation_min_unavailable_dc_groups) | `u8` | `2` |
|
||||
| [`rst_on_close`](#cfg-general-rst_on_close) | `"off"`, `"errors"`, or `"always"` | `"off"` |
|
||||
|
||||
## "cfg-general-data_path"
|
||||
- `data_path`
|
||||
|
|
@ -1592,7 +1593,21 @@ This document lists all configuration keys accepted by `config.toml`.
|
|||
[general]
|
||||
degradation_min_unavailable_dc_groups = 2
|
||||
```
|
||||
## "cfg-general-rst_on_close"
|
||||
- `rst_on_close`
|
||||
- **Constraints / validation**: one of `"off"`, `"errors"`, `"always"`.
|
||||
- **Description**: Controls `SO_LINGER(0)` behaviour on accepted client TCP sockets.
|
||||
High-traffic proxy servers accumulate `FIN-WAIT-1` and orphaned sockets from connections that never complete the Telegram handshake (scanners, DPI probes, bots).
|
||||
This option allows sending an immediate `RST` instead of a graceful `FIN` for such connections, freeing kernel resources instantly.
|
||||
- `"off"` — default. Normal `FIN` on all closes; no behaviour change.
|
||||
- `"errors"` — `SO_LINGER(0)` is set on `accept()`. If the client successfully completes authentication, linger is cleared and the relay session closes gracefully with `FIN`. Connections closed before handshake completion (timeouts, bad crypto, scanners) send `RST`.
|
||||
- `"always"` — `SO_LINGER(0)` is set on `accept()` and never cleared. All closes send `RST` regardless of handshake outcome.
|
||||
- **Example**:
|
||||
|
||||
```toml
|
||||
[general]
|
||||
rst_on_close = "errors"
|
||||
```
|
||||
|
||||
# [general.modes]
|
||||
|
||||
|
|
|
|||
|
|
@ -219,6 +219,7 @@
|
|||
| [`ntp_servers`](#cfg-general-ntp_servers) | `String[]` | `["pool.ntp.org"]` |
|
||||
| [`auto_degradation_enabled`](#cfg-general-auto_degradation_enabled) | `bool` | `true` |
|
||||
| [`degradation_min_unavailable_dc_groups`](#cfg-general-degradation_min_unavailable_dc_groups) | `u8` | `2` |
|
||||
| [`rst_on_close`](#cfg-general-rst_on_close) | `"off"`, `"errors"` или `"always"` | `"off"` |
|
||||
|
||||
## "cfg-general-data_path"
|
||||
- `data_path`
|
||||
|
|
@ -1592,7 +1593,21 @@
|
|||
[general]
|
||||
degradation_min_unavailable_dc_groups = 2
|
||||
```
|
||||
## "cfg-general-rst_on_close"
|
||||
- `rst_on_close`
|
||||
- **Ограничения / валидация**: одно из `"off"`, `"errors"`, `"always"`.
|
||||
- **Описание**: Управляет поведением `SO_LINGER(0)` на принятых клиентских TCP-сокетах.
|
||||
На высоконагруженных прокси-серверах накапливаются `FIN-WAIT-1` и осиротевшие (orphan) сокеты от соединений, которые не завершают Telegram-рукопожатие (сканеры, DPI-зонды, боты).
|
||||
Эта опция позволяет отправлять немедленный `RST` вместо корректного `FIN` для таких соединений, мгновенно освобождая ресурсы ядра.
|
||||
- `"off"` — по умолчанию. Обычный `FIN` при закрытии всех соединений; поведение не меняется.
|
||||
- `"errors"` — `SO_LINGER(0)` устанавливается при `accept()`. Если клиент успешно проходит аутентификацию, linger сбрасывается и relay-сессия закрывается корректно через `FIN`. Соединения, закрытые до завершения рукопожатия (таймауты, ошибки крипто, сканеры), отправляют `RST`.
|
||||
- `"always"` — `SO_LINGER(0)` устанавливается при `accept()` и никогда не сбрасывается. Все закрытия отправляют `RST` независимо от результата рукопожатия.
|
||||
- **Пример**:
|
||||
|
||||
```toml
|
||||
[general]
|
||||
rst_on_close = "errors"
|
||||
```
|
||||
|
||||
# [general.modes]
|
||||
|
||||
|
|
|
|||
|
|
@ -159,6 +159,21 @@ impl MeBindStaleMode {
|
|||
}
|
||||
}
|
||||
|
||||
/// RST-on-close mode for accepted client sockets.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum RstOnCloseMode {
|
||||
/// Normal FIN on all closes (default, no behaviour change).
|
||||
#[default]
|
||||
Off,
|
||||
/// SO_LINGER(0) on accept; cleared after successful auth.
|
||||
/// Pre-handshake failures (scanners, DPI, timeouts) send RST;
|
||||
/// authenticated relay sessions close gracefully with FIN.
|
||||
Errors,
|
||||
/// SO_LINGER(0) on accept, never cleared — all closes send RST.
|
||||
Always,
|
||||
}
|
||||
|
||||
/// Middle-End writer floor policy mode.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
|
|
@ -925,6 +940,14 @@ pub struct GeneralConfig {
|
|||
/// Minimum unavailable ME DC groups before degrading.
|
||||
#[serde(default = "default_degradation_min_unavailable_dc_groups")]
|
||||
pub degradation_min_unavailable_dc_groups: u8,
|
||||
|
||||
/// RST-on-close mode for accepted client sockets.
|
||||
/// `off` — normal FIN on all closes (default).
|
||||
/// `errors` — SO_LINGER(0) on accept, cleared after successful auth;
|
||||
/// pre-handshake failures send RST, relayed sessions close gracefully.
|
||||
/// `always` — SO_LINGER(0) on accept, never cleared; all closes send RST.
|
||||
#[serde(default)]
|
||||
pub rst_on_close: RstOnCloseMode,
|
||||
}
|
||||
|
||||
impl Default for GeneralConfig {
|
||||
|
|
@ -1086,6 +1109,7 @@ impl Default for GeneralConfig {
|
|||
ntp_servers: default_ntp_servers(),
|
||||
auto_degradation_enabled: default_true(),
|
||||
degradation_min_unavailable_dc_groups: default_degradation_min_unavailable_dc_groups(),
|
||||
rst_on_close: RstOnCloseMode::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ use tokio::net::UnixListener;
|
|||
use tokio::sync::{Semaphore, watch};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::config::{ProxyConfig, RstOnCloseMode};
|
||||
use crate::crypto::SecureRandom;
|
||||
use crate::ip_tracker::UserIpTracker;
|
||||
use crate::proxy::ClientHandler;
|
||||
|
|
@ -21,6 +21,7 @@ use crate::stats::{ReplayChecker, Stats};
|
|||
use crate::stream::BufferPool;
|
||||
use crate::tls_front::TlsFrontCache;
|
||||
use crate::transport::middle_proxy::MePool;
|
||||
use crate::transport::socket::set_linger_zero;
|
||||
use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes};
|
||||
|
||||
use super::helpers::{is_expected_handshake_eof, print_proxy_links};
|
||||
|
|
@ -380,6 +381,15 @@ pub(crate) fn spawn_tcp_accept_loops(
|
|||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((stream, peer_addr)) => {
|
||||
let rst_mode = config_rx.borrow().general.rst_on_close;
|
||||
#[cfg(unix)]
|
||||
let raw_fd = {
|
||||
use std::os::unix::io::AsRawFd;
|
||||
stream.as_raw_fd()
|
||||
};
|
||||
if matches!(rst_mode, RstOnCloseMode::Errors | RstOnCloseMode::Always) {
|
||||
let _ = set_linger_zero(&stream);
|
||||
}
|
||||
if !*admission_rx_tcp.borrow() {
|
||||
debug!(peer = %peer_addr, "Admission gate closed, dropping connection");
|
||||
drop(stream);
|
||||
|
|
@ -454,6 +464,9 @@ pub(crate) fn spawn_tcp_accept_loops(
|
|||
shared,
|
||||
proxy_protocol_enabled,
|
||||
real_peer_report_for_handler,
|
||||
#[cfg(unix)]
|
||||
raw_fd,
|
||||
rst_mode,
|
||||
)
|
||||
.run()
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -804,6 +804,9 @@ pub struct RunningClientHandler {
|
|||
beobachten: Arc<BeobachtenStore>,
|
||||
shared: Arc<ProxySharedState>,
|
||||
proxy_protocol_enabled: bool,
|
||||
#[cfg(unix)]
|
||||
raw_fd: std::os::unix::io::RawFd,
|
||||
rst_on_close: crate::config::RstOnCloseMode,
|
||||
}
|
||||
|
||||
impl ClientHandler {
|
||||
|
|
@ -825,6 +828,11 @@ impl ClientHandler {
|
|||
proxy_protocol_enabled: bool,
|
||||
real_peer_report: Arc<std::sync::Mutex<Option<SocketAddr>>>,
|
||||
) -> RunningClientHandler {
|
||||
#[cfg(unix)]
|
||||
let raw_fd = {
|
||||
use std::os::unix::io::AsRawFd;
|
||||
stream.as_raw_fd()
|
||||
};
|
||||
Self::new_with_shared(
|
||||
stream,
|
||||
peer,
|
||||
|
|
@ -842,6 +850,9 @@ impl ClientHandler {
|
|||
ProxySharedState::new(),
|
||||
proxy_protocol_enabled,
|
||||
real_peer_report,
|
||||
#[cfg(unix)]
|
||||
raw_fd,
|
||||
crate::config::RstOnCloseMode::Off,
|
||||
)
|
||||
}
|
||||
|
||||
|
|
@ -863,6 +874,8 @@ impl ClientHandler {
|
|||
shared: Arc<ProxySharedState>,
|
||||
proxy_protocol_enabled: bool,
|
||||
real_peer_report: Arc<std::sync::Mutex<Option<SocketAddr>>>,
|
||||
#[cfg(unix)] raw_fd: std::os::unix::io::RawFd,
|
||||
rst_on_close: crate::config::RstOnCloseMode,
|
||||
) -> RunningClientHandler {
|
||||
let normalized_peer = normalize_ip(peer);
|
||||
RunningClientHandler {
|
||||
|
|
@ -883,6 +896,9 @@ impl ClientHandler {
|
|||
beobachten,
|
||||
shared,
|
||||
proxy_protocol_enabled,
|
||||
#[cfg(unix)]
|
||||
raw_fd,
|
||||
rst_on_close,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -901,6 +917,10 @@ impl RunningClientHandler {
|
|||
debug!(peer = %peer, error = %e, "Failed to configure client socket");
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
let raw_fd = self.raw_fd;
|
||||
let rst_on_close = self.rst_on_close;
|
||||
|
||||
let outcome = match self.do_handshake().await? {
|
||||
Some(outcome) => outcome,
|
||||
None => return Ok(()),
|
||||
|
|
@ -908,7 +928,14 @@ impl RunningClientHandler {
|
|||
|
||||
// Phase 2: relay (WITHOUT handshake timeout — relay has its own activity timeouts)
|
||||
match outcome {
|
||||
HandshakeOutcome::NeedsRelay(fut) | HandshakeOutcome::NeedsMasking(fut) => fut.await,
|
||||
HandshakeOutcome::NeedsRelay(fut) => {
|
||||
#[cfg(unix)]
|
||||
if matches!(rst_on_close, crate::config::RstOnCloseMode::Errors) {
|
||||
let _ = crate::transport::socket::clear_linger_fd(raw_fd);
|
||||
}
|
||||
fut.await
|
||||
}
|
||||
HandshakeOutcome::NeedsMasking(fut) => fut.await,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -102,14 +102,29 @@ pub fn configure_client_socket(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Set socket to send RST on close (for masking)
|
||||
#[allow(dead_code)]
|
||||
/// Set socket to send RST on close instead of FIN, eliminating
|
||||
/// FIN-WAIT-1 and orphan socket accumulation on high-churn workloads.
|
||||
pub fn set_linger_zero(stream: &TcpStream) -> Result<()> {
|
||||
let socket = socket2::SockRef::from(stream);
|
||||
socket.set_linger(Some(Duration::ZERO))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Restore default linger behaviour (graceful FIN) on a socket
|
||||
/// identified by its raw file descriptor. Safe to call after
|
||||
/// `TcpStream::into_split()` because the fd remains valid until
|
||||
/// both halves are dropped.
|
||||
#[cfg(unix)]
|
||||
pub fn clear_linger_fd(fd: std::os::unix::io::RawFd) -> Result<()> {
|
||||
use std::os::unix::io::BorrowedFd;
|
||||
// SAFETY: the fd is still open — the caller guarantees the
|
||||
// TcpStream (or its split halves) is alive.
|
||||
let borrowed = unsafe { BorrowedFd::borrow_raw(fd) };
|
||||
let socket = socket2::SockRef::from(&borrowed);
|
||||
socket.set_linger(None)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a new TCP socket for outgoing connections
|
||||
#[allow(dead_code)]
|
||||
pub fn create_outgoing_socket(addr: SocketAddr) -> Result<Socket> {
|
||||
|
|
|
|||
Loading…
Reference in New Issue