Add Rust coding conventions and self-explanatory commenting guidelines; update dependencies and version in Cargo files; enhance OpenBSD support in installation and documentation; improve TCP socket configuration and testing

This commit is contained in:
David Osipov
2026-03-11 20:49:51 +04:00
parent be24b47300
commit 8b5cbb7b4b
10 changed files with 678 additions and 5 deletions

View File

@@ -199,10 +199,26 @@ impl MePool {
fn configure_keepalive(stream: &TcpStream) -> std::io::Result<()> {
let sock = SockRef::from(stream);
let ka = TcpKeepalive::new()
.with_time(Duration::from_secs(30))
.with_interval(Duration::from_secs(10))
.with_retries(3);
let ka = TcpKeepalive::new().with_time(Duration::from_secs(30));
// Mirror socket2 v0.5.10 target gate for with_retries(), the stricter method.
#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "illumos",
target_os = "ios",
target_os = "visionos",
target_os = "linux",
target_os = "macos",
target_os = "netbsd",
target_os = "tvos",
target_os = "watchos",
target_os = "cygwin",
))]
let ka = ka.with_interval(Duration::from_secs(10)).with_retries(3);
sock.set_tcp_keepalive(&ka)?;
sock.set_keepalive(true)?;
Ok(())
@@ -697,3 +713,66 @@ fn hex_dump(data: &[u8]) -> String {
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::ErrorKind;
use tokio::net::{TcpListener, TcpStream};
#[tokio::test]
async fn test_configure_keepalive_loopback() {
let listener = match TcpListener::bind("127.0.0.1:0").await {
Ok(listener) => listener,
Err(error) if error.kind() == ErrorKind::PermissionDenied => return,
Err(error) => panic!("bind failed: {error}"),
};
let addr = match listener.local_addr() {
Ok(addr) => addr,
Err(error) => panic!("local_addr failed: {error}"),
};
let stream = match TcpStream::connect(addr).await {
Ok(stream) => stream,
Err(error) if error.kind() == ErrorKind::PermissionDenied => return,
Err(error) => panic!("connect failed: {error}"),
};
if let Err(error) = MePool::configure_keepalive(&stream) {
if error.kind() == ErrorKind::PermissionDenied {
return;
}
panic!("configure_keepalive failed: {error}");
}
}
#[test]
#[cfg(target_os = "openbsd")]
fn test_openbsd_keepalive_cfg_path_compiles() {
let _ka = TcpKeepalive::new().with_time(Duration::from_secs(30));
}
#[test]
#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "illumos",
target_os = "ios",
target_os = "visionos",
target_os = "linux",
target_os = "macos",
target_os = "netbsd",
target_os = "tvos",
target_os = "watchos",
target_os = "cygwin",
))]
fn test_retry_keepalive_cfg_path_compiles() {
let _ka = TcpKeepalive::new()
.with_time(Duration::from_secs(30))
.with_interval(Duration::from_secs(10))
.with_retries(3);
}
}

View File

@@ -1,6 +1,8 @@
//! TCP Socket Configuration
#[cfg(target_os = "linux")]
use std::collections::HashSet;
#[cfg(target_os = "linux")]
use std::fs;
use std::io::Result;
use std::net::{SocketAddr, IpAddr};
@@ -44,6 +46,7 @@ pub fn configure_tcp_socket(
pub fn configure_client_socket(
stream: &TcpStream,
keepalive_secs: u64,
#[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
ack_timeout_secs: u64,
) -> Result<()> {
let socket = socket2::SockRef::from(stream);
@@ -373,6 +376,7 @@ fn listening_inodes_for_port(addr: SocketAddr) -> HashSet<u64> {
mod tests {
use super::*;
use std::io::ErrorKind;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::test]
@@ -396,6 +400,115 @@ mod tests {
panic!("configure_tcp_socket failed: {e}");
}
}
#[tokio::test]
async fn test_configure_client_socket() {
let listener = match TcpListener::bind("127.0.0.1:0").await {
Ok(l) => l,
Err(e) if e.kind() == ErrorKind::PermissionDenied => return,
Err(e) => panic!("bind failed: {e}"),
};
let addr = match listener.local_addr() {
Ok(addr) => addr,
Err(e) => panic!("local_addr failed: {e}"),
};
let stream = match TcpStream::connect(addr).await {
Ok(s) => s,
Err(e) if e.kind() == ErrorKind::PermissionDenied => return,
Err(e) => panic!("connect failed: {e}"),
};
if let Err(e) = configure_client_socket(&stream, 30, 30) {
if e.kind() == ErrorKind::PermissionDenied {
return;
}
panic!("configure_client_socket failed: {e}");
}
}
#[tokio::test]
async fn test_configure_client_socket_zero_ack_timeout() {
let listener = match TcpListener::bind("127.0.0.1:0").await {
Ok(l) => l,
Err(e) if e.kind() == ErrorKind::PermissionDenied => return,
Err(e) => panic!("bind failed: {e}"),
};
let addr = match listener.local_addr() {
Ok(addr) => addr,
Err(e) => panic!("local_addr failed: {e}"),
};
let stream = match TcpStream::connect(addr).await {
Ok(s) => s,
Err(e) if e.kind() == ErrorKind::PermissionDenied => return,
Err(e) => panic!("connect failed: {e}"),
};
if let Err(e) = configure_client_socket(&stream, 30, 0) {
if e.kind() == ErrorKind::PermissionDenied {
return;
}
panic!("configure_client_socket with zero ack timeout failed: {e}");
}
}
#[tokio::test]
async fn test_configure_client_socket_roundtrip_io() {
let listener = match TcpListener::bind("127.0.0.1:0").await {
Ok(l) => l,
Err(e) if e.kind() == ErrorKind::PermissionDenied => return,
Err(e) => panic!("bind failed: {e}"),
};
let addr = match listener.local_addr() {
Ok(addr) => addr,
Err(e) => panic!("local_addr failed: {e}"),
};
let server_task = tokio::spawn(async move {
let (mut accepted, _) = match listener.accept().await {
Ok(v) => v,
Err(e) => panic!("accept failed: {e}"),
};
let mut payload = [0u8; 4];
if let Err(e) = accepted.read_exact(&mut payload).await {
panic!("server read_exact failed: {e}");
}
if let Err(e) = accepted.write_all(b"pong").await {
panic!("server write_all failed: {e}");
}
payload
});
let mut stream = match TcpStream::connect(addr).await {
Ok(s) => s,
Err(e) if e.kind() == ErrorKind::PermissionDenied => return,
Err(e) => panic!("connect failed: {e}"),
};
if let Err(e) = configure_client_socket(&stream, 30, 30) {
if e.kind() == ErrorKind::PermissionDenied {
return;
}
panic!("configure_client_socket failed: {e}");
}
if let Err(e) = stream.write_all(b"ping").await {
panic!("client write_all failed: {e}");
}
let mut reply = [0u8; 4];
if let Err(e) = stream.read_exact(&mut reply).await {
panic!("client read_exact failed: {e}");
}
assert_eq!(&reply, b"pong");
let server_seen = match server_task.await {
Ok(value) => value,
Err(e) => panic!("server task join failed: {e}"),
};
assert_eq!(&server_seen, b"ping");
}
#[test]
fn test_normalize_ip() {