diff --git a/src/cli.rs b/src/cli.rs index 25d14f0..f737ff9 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -213,6 +213,7 @@ listen_addr_ipv6 = "::" [[server.listeners]] ip = "0.0.0.0" +# reuse_allow = false # Set true only when intentionally running multiple telemt instances on same port [[server.listeners]] ip = "::" diff --git a/src/config/load.rs b/src/config/load.rs index ec8011a..827687a 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -227,6 +227,7 @@ impl ProxyConfig { announce: None, announce_ip: None, proxy_protocol: None, + reuse_allow: false, }); } if let Some(ipv6_str) = &config.server.listen_addr_ipv6 { @@ -236,6 +237,7 @@ impl ProxyConfig { announce: None, announce_ip: None, proxy_protocol: None, + reuse_allow: false, }); } } diff --git a/src/config/types.rs b/src/config/types.rs index 6c54598..2be7704 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -603,6 +603,10 @@ pub struct ListenerConfig { /// Per-listener PROXY protocol override. When set, overrides global server.proxy_protocol. #[serde(default)] pub proxy_protocol: Option, + /// Allow multiple telemt instances to listen on the same IP:port (SO_REUSEPORT). + /// Default is false for safety. + #[serde(default)] + pub reuse_allow: bool, } // ============= ShowLink ============= diff --git a/src/main.rs b/src/main.rs index a9b0e0a..0f3757b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -38,7 +38,7 @@ use crate::stream::BufferPool; use crate::transport::middle_proxy::{ MePool, fetch_proxy_config, run_me_ping, MePingFamily, MePingSample, format_sample_line, }; -use crate::transport::{ListenOptions, UpstreamManager, create_listener}; +use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes}; use crate::tls_front::TlsFrontCache; fn parse_cli() -> (String, bool, Option) { @@ -715,6 +715,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai continue; } let options = ListenOptions { + reuse_port: listener_conf.reuse_allow, ipv6_only: listener_conf.ip.is_ipv6(), ..Default::default() }; @@ -753,7 +754,33 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai listeners.push((listener, listener_proxy_protocol)); } Err(e) => { - error!("Failed to bind to {}: {}", addr, e); + if e.kind() == std::io::ErrorKind::AddrInUse { + let owners = find_listener_processes(addr); + if owners.is_empty() { + error!( + %addr, + "Failed to bind: address already in use (owner process unresolved)" + ); + } else { + for owner in owners { + error!( + %addr, + pid = owner.pid, + process = %owner.process, + "Failed to bind: address already in use" + ); + } + } + + if !listener_conf.reuse_allow { + error!( + %addr, + "reuse_allow=false; set [[server.listeners]].reuse_allow=true to allow multi-instance listening" + ); + } + } else { + error!("Failed to bind to {}: {}", addr, e); + } } } } diff --git a/src/transport/socket.rs b/src/transport/socket.rs index f353c52..b41cfd1 100644 --- a/src/transport/socket.rs +++ b/src/transport/socket.rs @@ -1,5 +1,7 @@ //! TCP Socket Configuration +use std::collections::HashSet; +use std::fs; use std::io::Result; use std::net::{SocketAddr, IpAddr}; use std::time::Duration; @@ -234,6 +236,133 @@ pub fn create_listener(addr: SocketAddr, options: &ListenOptions) -> Result Vec { + #[cfg(target_os = "linux")] + { + find_listener_processes_linux(addr) + } + #[cfg(not(target_os = "linux"))] + { + let _ = addr; + Vec::new() + } +} + +#[cfg(target_os = "linux")] +fn find_listener_processes_linux(addr: SocketAddr) -> Vec { + let inodes = listening_inodes_for_port(addr); + if inodes.is_empty() { + return Vec::new(); + } + + let mut out = Vec::new(); + + let proc_entries = match fs::read_dir("/proc") { + Ok(entries) => entries, + Err(_) => return out, + }; + + for entry in proc_entries.flatten() { + let pid = match entry.file_name().to_string_lossy().parse::() { + Ok(pid) => pid, + Err(_) => continue, + }; + + let fd_dir = entry.path().join("fd"); + let fd_entries = match fs::read_dir(fd_dir) { + Ok(entries) => entries, + Err(_) => continue, + }; + + let mut matched = false; + for fd in fd_entries.flatten() { + let link_target = match fs::read_link(fd.path()) { + Ok(link) => link, + Err(_) => continue, + }; + + let link_str = link_target.to_string_lossy(); + let Some(rest) = link_str.strip_prefix("socket:[") else { + continue; + }; + let Some(inode_str) = rest.strip_suffix(']') else { + continue; + }; + let Ok(inode) = inode_str.parse::() else { + continue; + }; + + if inodes.contains(&inode) { + matched = true; + break; + } + } + + if matched { + let process = fs::read_to_string(entry.path().join("comm")) + .ok() + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .unwrap_or_else(|| "unknown".to_string()); + out.push(ListenerProcessInfo { pid, process }); + } + } + + out.sort_by_key(|p| p.pid); + out.dedup_by_key(|p| p.pid); + out +} + +#[cfg(target_os = "linux")] +fn listening_inodes_for_port(addr: SocketAddr) -> HashSet { + let path = match addr { + SocketAddr::V4(_) => "/proc/net/tcp", + SocketAddr::V6(_) => "/proc/net/tcp6", + }; + + let mut inodes = HashSet::new(); + let Ok(data) = fs::read_to_string(path) else { + return inodes; + }; + + for line in data.lines().skip(1) { + let cols: Vec<&str> = line.split_whitespace().collect(); + if cols.len() < 10 { + continue; + } + + // LISTEN state in /proc/net/tcp* + if cols[3] != "0A" { + continue; + } + + let Some(port_hex) = cols[1].split(':').nth(1) else { + continue; + }; + let Ok(port) = u16::from_str_radix(port_hex, 16) else { + continue; + }; + if port != addr.port() { + continue; + } + + if let Ok(inode) = cols[9].parse::() { + inodes.insert(inode); + } + } + + inodes +} + #[cfg(test)] mod tests { use super::*;