diff --git a/docs/Config_params/CONFIG_PARAMS.en.md b/docs/Config_params/CONFIG_PARAMS.en.md index cbafacf..82c98f5 100644 --- a/docs/Config_params/CONFIG_PARAMS.en.md +++ b/docs/Config_params/CONFIG_PARAMS.en.md @@ -219,6 +219,7 @@ This document lists all configuration keys accepted by `config.toml`. | [`ntp_servers`](#cfg-general-ntp_servers) | `String[]` | `["pool.ntp.org"]` | | [`auto_degradation_enabled`](#cfg-general-auto_degradation_enabled) | `bool` | `true` | | [`degradation_min_unavailable_dc_groups`](#cfg-general-degradation_min_unavailable_dc_groups) | `u8` | `2` | +| [`rst_on_close`](#cfg-general-rst_on_close) | `"off"`, `"errors"`, or `"always"` | `"off"` | ## "cfg-general-data_path" - `data_path` @@ -1592,7 +1593,21 @@ This document lists all configuration keys accepted by `config.toml`. [general] degradation_min_unavailable_dc_groups = 2 ``` +## "cfg-general-rst_on_close" +- `rst_on_close` + - **Constraints / validation**: one of `"off"`, `"errors"`, `"always"`. + - **Description**: Controls `SO_LINGER(0)` behaviour on accepted client TCP sockets. + High-traffic proxy servers accumulate `FIN-WAIT-1` and orphaned sockets from connections that never complete the Telegram handshake (scanners, DPI probes, bots). + This option allows sending an immediate `RST` instead of a graceful `FIN` for such connections, freeing kernel resources instantly. + - `"off"` — default. Normal `FIN` on all closes; no behaviour change. + - `"errors"` — `SO_LINGER(0)` is set on `accept()`. If the client successfully completes authentication, linger is cleared and the relay session closes gracefully with `FIN`. Connections closed before handshake completion (timeouts, bad crypto, scanners) send `RST`. + - `"always"` — `SO_LINGER(0)` is set on `accept()` and never cleared. All closes send `RST` regardless of handshake outcome. + - **Example**: + ```toml + [general] + rst_on_close = "errors" + ``` # [general.modes] diff --git a/docs/Config_params/CONFIG_PARAMS.ru.md b/docs/Config_params/CONFIG_PARAMS.ru.md index 4302ba6..fd56556 100644 --- a/docs/Config_params/CONFIG_PARAMS.ru.md +++ b/docs/Config_params/CONFIG_PARAMS.ru.md @@ -219,6 +219,7 @@ | [`ntp_servers`](#cfg-general-ntp_servers) | `String[]` | `["pool.ntp.org"]` | | [`auto_degradation_enabled`](#cfg-general-auto_degradation_enabled) | `bool` | `true` | | [`degradation_min_unavailable_dc_groups`](#cfg-general-degradation_min_unavailable_dc_groups) | `u8` | `2` | +| [`rst_on_close`](#cfg-general-rst_on_close) | `"off"`, `"errors"` или `"always"` | `"off"` | ## "cfg-general-data_path" - `data_path` @@ -1592,7 +1593,21 @@ [general] degradation_min_unavailable_dc_groups = 2 ``` +## "cfg-general-rst_on_close" +- `rst_on_close` + - **Ограничения / валидация**: одно из `"off"`, `"errors"`, `"always"`. + - **Описание**: Управляет поведением `SO_LINGER(0)` на принятых клиентских TCP-сокетах. + На высоконагруженных прокси-серверах накапливаются `FIN-WAIT-1` и осиротевшие (orphan) сокеты от соединений, которые не завершают Telegram-рукопожатие (сканеры, DPI-зонды, боты). + Эта опция позволяет отправлять немедленный `RST` вместо корректного `FIN` для таких соединений, мгновенно освобождая ресурсы ядра. + - `"off"` — по умолчанию. Обычный `FIN` при закрытии всех соединений; поведение не меняется. + - `"errors"` — `SO_LINGER(0)` устанавливается при `accept()`. Если клиент успешно проходит аутентификацию, linger сбрасывается и relay-сессия закрывается корректно через `FIN`. Соединения, закрытые до завершения рукопожатия (таймауты, ошибки крипто, сканеры), отправляют `RST`. + - `"always"` — `SO_LINGER(0)` устанавливается при `accept()` и никогда не сбрасывается. Все закрытия отправляют `RST` независимо от результата рукопожатия. + - **Пример**: + ```toml + [general] + rst_on_close = "errors" + ``` # [general.modes] diff --git a/src/config/types.rs b/src/config/types.rs index 0a5af21..98c22a6 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -159,6 +159,21 @@ impl MeBindStaleMode { } } +/// RST-on-close mode for accepted client sockets. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum RstOnCloseMode { + /// Normal FIN on all closes (default, no behaviour change). + #[default] + Off, + /// SO_LINGER(0) on accept; cleared after successful auth. + /// Pre-handshake failures (scanners, DPI, timeouts) send RST; + /// authenticated relay sessions close gracefully with FIN. + Errors, + /// SO_LINGER(0) on accept, never cleared — all closes send RST. + Always, +} + /// Middle-End writer floor policy mode. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] #[serde(rename_all = "lowercase")] @@ -925,6 +940,14 @@ pub struct GeneralConfig { /// Minimum unavailable ME DC groups before degrading. #[serde(default = "default_degradation_min_unavailable_dc_groups")] pub degradation_min_unavailable_dc_groups: u8, + + /// RST-on-close mode for accepted client sockets. + /// `off` — normal FIN on all closes (default). + /// `errors` — SO_LINGER(0) on accept, cleared after successful auth; + /// pre-handshake failures send RST, relayed sessions close gracefully. + /// `always` — SO_LINGER(0) on accept, never cleared; all closes send RST. + #[serde(default)] + pub rst_on_close: RstOnCloseMode, } impl Default for GeneralConfig { @@ -1086,6 +1109,7 @@ impl Default for GeneralConfig { ntp_servers: default_ntp_servers(), auto_degradation_enabled: default_true(), degradation_min_unavailable_dc_groups: default_degradation_min_unavailable_dc_groups(), + rst_on_close: RstOnCloseMode::default(), } } } diff --git a/src/maestro/listeners.rs b/src/maestro/listeners.rs index 96d4cd9..f032d77 100644 --- a/src/maestro/listeners.rs +++ b/src/maestro/listeners.rs @@ -9,7 +9,7 @@ use tokio::net::UnixListener; use tokio::sync::{Semaphore, watch}; use tracing::{debug, error, info, warn}; -use crate::config::ProxyConfig; +use crate::config::{ProxyConfig, RstOnCloseMode}; use crate::crypto::SecureRandom; use crate::ip_tracker::UserIpTracker; use crate::proxy::ClientHandler; @@ -21,6 +21,7 @@ use crate::stats::{ReplayChecker, Stats}; use crate::stream::BufferPool; use crate::tls_front::TlsFrontCache; use crate::transport::middle_proxy::MePool; +use crate::transport::socket::set_linger_zero; use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes}; use super::helpers::{is_expected_handshake_eof, print_proxy_links}; @@ -380,6 +381,15 @@ pub(crate) fn spawn_tcp_accept_loops( loop { match listener.accept().await { Ok((stream, peer_addr)) => { + let rst_mode = config_rx.borrow().general.rst_on_close; + #[cfg(unix)] + let raw_fd = { + use std::os::unix::io::AsRawFd; + stream.as_raw_fd() + }; + if matches!(rst_mode, RstOnCloseMode::Errors | RstOnCloseMode::Always) { + let _ = set_linger_zero(&stream); + } if !*admission_rx_tcp.borrow() { debug!(peer = %peer_addr, "Admission gate closed, dropping connection"); drop(stream); @@ -454,6 +464,9 @@ pub(crate) fn spawn_tcp_accept_loops( shared, proxy_protocol_enabled, real_peer_report_for_handler, + #[cfg(unix)] + raw_fd, + rst_mode, ) .run() .await diff --git a/src/proxy/client.rs b/src/proxy/client.rs index fb73db2..0937a8f 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -804,6 +804,9 @@ pub struct RunningClientHandler { beobachten: Arc, shared: Arc, proxy_protocol_enabled: bool, + #[cfg(unix)] + raw_fd: std::os::unix::io::RawFd, + rst_on_close: crate::config::RstOnCloseMode, } impl ClientHandler { @@ -825,6 +828,11 @@ impl ClientHandler { proxy_protocol_enabled: bool, real_peer_report: Arc>>, ) -> RunningClientHandler { + #[cfg(unix)] + let raw_fd = { + use std::os::unix::io::AsRawFd; + stream.as_raw_fd() + }; Self::new_with_shared( stream, peer, @@ -842,6 +850,9 @@ impl ClientHandler { ProxySharedState::new(), proxy_protocol_enabled, real_peer_report, + #[cfg(unix)] + raw_fd, + crate::config::RstOnCloseMode::Off, ) } @@ -863,6 +874,8 @@ impl ClientHandler { shared: Arc, proxy_protocol_enabled: bool, real_peer_report: Arc>>, + #[cfg(unix)] raw_fd: std::os::unix::io::RawFd, + rst_on_close: crate::config::RstOnCloseMode, ) -> RunningClientHandler { let normalized_peer = normalize_ip(peer); RunningClientHandler { @@ -883,6 +896,9 @@ impl ClientHandler { beobachten, shared, proxy_protocol_enabled, + #[cfg(unix)] + raw_fd, + rst_on_close, } } } @@ -901,6 +917,10 @@ impl RunningClientHandler { debug!(peer = %peer, error = %e, "Failed to configure client socket"); } + #[cfg(unix)] + let raw_fd = self.raw_fd; + let rst_on_close = self.rst_on_close; + let outcome = match self.do_handshake().await? { Some(outcome) => outcome, None => return Ok(()), @@ -908,7 +928,14 @@ impl RunningClientHandler { // Phase 2: relay (WITHOUT handshake timeout — relay has its own activity timeouts) match outcome { - HandshakeOutcome::NeedsRelay(fut) | HandshakeOutcome::NeedsMasking(fut) => fut.await, + HandshakeOutcome::NeedsRelay(fut) => { + #[cfg(unix)] + if matches!(rst_on_close, crate::config::RstOnCloseMode::Errors) { + let _ = crate::transport::socket::clear_linger_fd(raw_fd); + } + fut.await + } + HandshakeOutcome::NeedsMasking(fut) => fut.await, } } diff --git a/src/transport/socket.rs b/src/transport/socket.rs index 32400f4..b751a30 100644 --- a/src/transport/socket.rs +++ b/src/transport/socket.rs @@ -102,14 +102,29 @@ pub fn configure_client_socket( Ok(()) } -/// Set socket to send RST on close (for masking) -#[allow(dead_code)] +/// Set socket to send RST on close instead of FIN, eliminating +/// FIN-WAIT-1 and orphan socket accumulation on high-churn workloads. pub fn set_linger_zero(stream: &TcpStream) -> Result<()> { let socket = socket2::SockRef::from(stream); socket.set_linger(Some(Duration::ZERO))?; Ok(()) } +/// Restore default linger behaviour (graceful FIN) on a socket +/// identified by its raw file descriptor. Safe to call after +/// `TcpStream::into_split()` because the fd remains valid until +/// both halves are dropped. +#[cfg(unix)] +pub fn clear_linger_fd(fd: std::os::unix::io::RawFd) -> Result<()> { + use std::os::unix::io::BorrowedFd; + // SAFETY: the fd is still open — the caller guarantees the + // TcpStream (or its split halves) is alive. + let borrowed = unsafe { BorrowedFd::borrow_raw(fd) }; + let socket = socket2::SockRef::from(&borrowed); + socket.set_linger(None)?; + Ok(()) +} + /// Create a new TCP socket for outgoing connections #[allow(dead_code)] pub fn create_outgoing_socket(addr: SocketAddr) -> Result {