feat: add configurable RST-on-close mode for client sockets

Add `rst_on_close` config option (off/errors/always) to control
SO_LINGER(0) behaviour on accepted TCP connections.

- `off` (default): normal FIN on all closes, no behaviour change.
- `errors`: SO_LINGER(0) set on accept, cleared after successful
  handshake auth. Pre-handshake failures (scanners, DPI probes,
  timeouts) send RST instead of FIN, eliminating FIN-WAIT-1 and
  orphan socket accumulation. Authenticated relay sessions still
  close gracefully with FIN.
- `always`: SO_LINGER(0) on accept, never cleared — all closes
  send RST regardless of handshake outcome.
This commit is contained in:
sintanial
2026-04-10 05:01:38 +03:00
parent 17fd01a2c4
commit ddeda8d914
6 changed files with 113 additions and 4 deletions

View File

@@ -159,6 +159,21 @@ impl MeBindStaleMode {
}
}
/// RST-on-close mode for accepted client sockets.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum RstOnCloseMode {
/// Normal FIN on all closes (default, no behaviour change).
#[default]
Off,
/// SO_LINGER(0) on accept; cleared after successful auth.
/// Pre-handshake failures (scanners, DPI, timeouts) send RST;
/// authenticated relay sessions close gracefully with FIN.
Errors,
/// SO_LINGER(0) on accept, never cleared — all closes send RST.
Always,
}
/// Middle-End writer floor policy mode.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
@@ -925,6 +940,14 @@ pub struct GeneralConfig {
/// Minimum unavailable ME DC groups before degrading.
#[serde(default = "default_degradation_min_unavailable_dc_groups")]
pub degradation_min_unavailable_dc_groups: u8,
/// RST-on-close mode for accepted client sockets.
/// `off` — normal FIN on all closes (default).
/// `errors` — SO_LINGER(0) on accept, cleared after successful auth;
/// pre-handshake failures send RST, relayed sessions close gracefully.
/// `always` — SO_LINGER(0) on accept, never cleared; all closes send RST.
#[serde(default)]
pub rst_on_close: RstOnCloseMode,
}
impl Default for GeneralConfig {
@@ -1086,6 +1109,7 @@ impl Default for GeneralConfig {
ntp_servers: default_ntp_servers(),
auto_degradation_enabled: default_true(),
degradation_min_unavailable_dc_groups: default_degradation_min_unavailable_dc_groups(),
rst_on_close: RstOnCloseMode::default(),
}
}
}

View File

@@ -9,7 +9,7 @@ use tokio::net::UnixListener;
use tokio::sync::{Semaphore, watch};
use tracing::{debug, error, info, warn};
use crate::config::ProxyConfig;
use crate::config::{ProxyConfig, RstOnCloseMode};
use crate::crypto::SecureRandom;
use crate::ip_tracker::UserIpTracker;
use crate::proxy::ClientHandler;
@@ -21,6 +21,7 @@ use crate::stats::{ReplayChecker, Stats};
use crate::stream::BufferPool;
use crate::tls_front::TlsFrontCache;
use crate::transport::middle_proxy::MePool;
use crate::transport::socket::set_linger_zero;
use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes};
use super::helpers::{is_expected_handshake_eof, print_proxy_links};
@@ -380,6 +381,15 @@ pub(crate) fn spawn_tcp_accept_loops(
loop {
match listener.accept().await {
Ok((stream, peer_addr)) => {
let rst_mode = config_rx.borrow().general.rst_on_close;
#[cfg(unix)]
let raw_fd = {
use std::os::unix::io::AsRawFd;
stream.as_raw_fd()
};
if matches!(rst_mode, RstOnCloseMode::Errors | RstOnCloseMode::Always) {
let _ = set_linger_zero(&stream);
}
if !*admission_rx_tcp.borrow() {
debug!(peer = %peer_addr, "Admission gate closed, dropping connection");
drop(stream);
@@ -454,6 +464,9 @@ pub(crate) fn spawn_tcp_accept_loops(
shared,
proxy_protocol_enabled,
real_peer_report_for_handler,
#[cfg(unix)]
raw_fd,
rst_mode,
)
.run()
.await

View File

@@ -804,6 +804,9 @@ pub struct RunningClientHandler {
beobachten: Arc<BeobachtenStore>,
shared: Arc<ProxySharedState>,
proxy_protocol_enabled: bool,
#[cfg(unix)]
raw_fd: std::os::unix::io::RawFd,
rst_on_close: crate::config::RstOnCloseMode,
}
impl ClientHandler {
@@ -825,6 +828,11 @@ impl ClientHandler {
proxy_protocol_enabled: bool,
real_peer_report: Arc<std::sync::Mutex<Option<SocketAddr>>>,
) -> RunningClientHandler {
#[cfg(unix)]
let raw_fd = {
use std::os::unix::io::AsRawFd;
stream.as_raw_fd()
};
Self::new_with_shared(
stream,
peer,
@@ -842,6 +850,9 @@ impl ClientHandler {
ProxySharedState::new(),
proxy_protocol_enabled,
real_peer_report,
#[cfg(unix)]
raw_fd,
crate::config::RstOnCloseMode::Off,
)
}
@@ -863,6 +874,8 @@ impl ClientHandler {
shared: Arc<ProxySharedState>,
proxy_protocol_enabled: bool,
real_peer_report: Arc<std::sync::Mutex<Option<SocketAddr>>>,
#[cfg(unix)] raw_fd: std::os::unix::io::RawFd,
rst_on_close: crate::config::RstOnCloseMode,
) -> RunningClientHandler {
let normalized_peer = normalize_ip(peer);
RunningClientHandler {
@@ -883,6 +896,9 @@ impl ClientHandler {
beobachten,
shared,
proxy_protocol_enabled,
#[cfg(unix)]
raw_fd,
rst_on_close,
}
}
}
@@ -901,6 +917,10 @@ impl RunningClientHandler {
debug!(peer = %peer, error = %e, "Failed to configure client socket");
}
#[cfg(unix)]
let raw_fd = self.raw_fd;
let rst_on_close = self.rst_on_close;
let outcome = match self.do_handshake().await? {
Some(outcome) => outcome,
None => return Ok(()),
@@ -908,7 +928,14 @@ impl RunningClientHandler {
// Phase 2: relay (WITHOUT handshake timeout — relay has its own activity timeouts)
match outcome {
HandshakeOutcome::NeedsRelay(fut) | HandshakeOutcome::NeedsMasking(fut) => fut.await,
HandshakeOutcome::NeedsRelay(fut) => {
#[cfg(unix)]
if matches!(rst_on_close, crate::config::RstOnCloseMode::Errors) {
let _ = crate::transport::socket::clear_linger_fd(raw_fd);
}
fut.await
}
HandshakeOutcome::NeedsMasking(fut) => fut.await,
}
}

View File

@@ -102,14 +102,29 @@ pub fn configure_client_socket(
Ok(())
}
/// Set socket to send RST on close (for masking)
#[allow(dead_code)]
/// Set socket to send RST on close instead of FIN, eliminating
/// FIN-WAIT-1 and orphan socket accumulation on high-churn workloads.
pub fn set_linger_zero(stream: &TcpStream) -> Result<()> {
let socket = socket2::SockRef::from(stream);
socket.set_linger(Some(Duration::ZERO))?;
Ok(())
}
/// Restore default linger behaviour (graceful FIN) on a socket
/// identified by its raw file descriptor. Safe to call after
/// `TcpStream::into_split()` because the fd remains valid until
/// both halves are dropped.
#[cfg(unix)]
pub fn clear_linger_fd(fd: std::os::unix::io::RawFd) -> Result<()> {
use std::os::unix::io::BorrowedFd;
// SAFETY: the fd is still open — the caller guarantees the
// TcpStream (or its split halves) is alive.
let borrowed = unsafe { BorrowedFd::borrow_raw(fd) };
let socket = socket2::SockRef::from(&borrowed);
socket.set_linger(None)?;
Ok(())
}
/// Create a new TCP socket for outgoing connections
#[allow(dead_code)]
pub fn create_outgoing_socket(addr: SocketAddr) -> Result<Socket> {