Compare commits

...

28 Commits
3.3.37 ... main

Author SHA1 Message Date
Alexey 4d87a790cc
Merge pull request #626 from vladon/fix/strip-release-binaries
[codex] Strip release binaries before packaging
2026-04-05 21:12:07 +03:00
Alexey 07fed8f871
Merge pull request #632 from SysAdminKo/main
Актуализация документации CONFIG_PARAMS
2026-04-05 21:10:58 +03:00
Alexey 407d686d49
Merge pull request #638 from Dimasssss/patch-1
Update install.sh - add port availability check and new CLI arguments + update QUICK_START_GUIDE - add CAP_NET_ADMIN Service
2026-04-05 21:06:29 +03:00
Dimasssss eac5cc81fb
Update QUICK_START_GUIDE.ru.md 2026-04-05 18:53:16 +03:00
Dimasssss c51d16f403
Update QUICK_START_GUIDE.en.md 2026-04-05 18:53:06 +03:00
Dimasssss b5146bba94
Update install.sh 2026-04-05 18:43:08 +03:00
SysAdminKo 5ed525fa48
Add server.conntrack_control configuration section with detailed parameters and descriptions
This update introduces a new section in the configuration documentation for `server.conntrack_control`, outlining various parameters such as `inline_conntrack_control`, `mode`, `backend`, `profile`, `hybrid_listener_ips`, `pressure_high_watermark_pct`, `pressure_low_watermark_pct`, and `delete_budget_per_sec`. Each parameter includes constraints, descriptions, and examples to assist users in configuring conntrack control effectively.
2026-04-05 18:05:13 +03:00
Олегсей Бреднев 9f7c1693ce
Merge branch 'telemt:main' into main 2026-04-05 17:42:08 +03:00
Dimasssss 1524396e10
Update install.sh
Новые аргументы командной строки:
-d, --domain : TLS-домен (дефолт: petrovich.ru)
-p, --port : Порт сервера (дефолт: 443)
-s, --secret : Секрет пользователя (32 hex-символа)
-a, --ad-tag : Установка ad_tag

⚠️ Если эти флаги переданы при запуске, они заменят собой старые сохраненные значения.
2026-04-05 17:32:21 +03:00
Alexey e630ea0045
Bump 2026-04-05 17:31:48 +03:00
Alexey 4574e423c6
New Relay Methods + Conntrack Control + Cleanup Methods for Memory + Buffer Pool Trim + Shrink Session Vec + ME2DC Fast for unstoppable init + Config Fallback + Working Directory Setup + Logging fixes with --syslog: merge pull request #637 from telemt/flow
New Relay Methods + Conntrack Control + Cleanup Methods for Memory + Buffer Pool Trim + Shrink Session Vec + ME2DC Fast for unstoppable init + Config Fallback + Working Directory Setup + Logging fixes with --syslog
2026-04-05 17:30:43 +03:00
Alexey 5f5582865e
Rustfmt 2026-04-05 17:23:40 +03:00
Alexey 1f54e4a203
Logging fixes with --syslog
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-04-05 17:21:47 +03:00
Alexey defa37da05
Merge pull request #636 from Dimasssss/patch-3
Update install.sh - add x86_64-v3 support + Add -d/--domain argument
2026-04-05 15:38:26 +03:00
Dimasssss 5fd058b6fd
Update install.sh - Add -d/--domain
**Example usage:**
`./install.sh -d example.com`
`./install.sh --domain example.com`
2026-04-05 14:49:31 +03:00
Alexey 977ee53b72
Config Fallback + Working Directory Setup
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-04-05 14:40:17 +03:00
Dimasssss 5b11522620
Update install.sh 2026-04-05 13:26:52 +03:00
Alexey 8fe6fcb7eb
ME2DC Fast for unstoppable init 2026-04-05 13:10:35 +03:00
SysAdminKo 444a20672d
Refine CONFIG_PARAMS documentation by updating default values to use a dash (—) for optional parameters instead of null. Adjust constraints for clarity, ensuring all types are accurately represented as required. Enhance descriptions for better understanding of configuration options. 2026-04-04 21:56:24 +03:00
Alexey c2f16a343a
Update README.md 2026-04-03 19:13:57 +03:00
Vlad Yaroslavlev d673935b6d
Merge branch 'main' into fix/strip-release-binaries 2026-04-03 00:35:20 +03:00
Vladislav Yaroslavlev 363b5014f7
Strip release binaries before packaging 2026-04-03 00:17:43 +03:00
Alexey bb6237151c
Update README.md 2026-04-03 00:06:34 +03:00
Alexey f6704d7d65
Update README.md 2026-04-02 10:59:19 +03:00
Alexey 3d20002e56
Update README.md 2026-04-02 10:58:50 +03:00
Alexey 8fcd0fa950
Merge pull request #618 from SysAdminKo/main
Переработка документации CONFIG_PARAMS
2026-04-01 17:24:22 +03:00
SysAdminKo 645e968778
Enhance CONFIG_PARAMS documentation with AI-assisted notes and detailed parameter descriptions. Update formatting for clarity and include examples for key configuration options. 2026-04-01 16:04:11 +03:00
Alexey b46216d357
Update README.md 2026-04-01 11:52:13 +03:00
43 changed files with 4315 additions and 629 deletions

View File

@ -151,6 +151,14 @@ jobs:
mkdir -p dist mkdir -p dist
cp "target/${{ matrix.target }}/release/${{ env.BINARY_NAME }}" dist/telemt cp "target/${{ matrix.target }}/release/${{ env.BINARY_NAME }}" dist/telemt
if [ "${{ matrix.target }}" = "aarch64-unknown-linux-gnu" ]; then
STRIP_BIN=aarch64-linux-gnu-strip
else
STRIP_BIN=strip
fi
"${STRIP_BIN}" dist/telemt
cd dist cd dist
tar -czf "${{ matrix.asset }}.tar.gz" \ tar -czf "${{ matrix.asset }}.tar.gz" \
--owner=0 --group=0 --numeric-owner \ --owner=0 --group=0 --numeric-owner \
@ -279,6 +287,14 @@ jobs:
mkdir -p dist mkdir -p dist
cp "target/${{ matrix.target }}/release/${{ env.BINARY_NAME }}" dist/telemt cp "target/${{ matrix.target }}/release/${{ env.BINARY_NAME }}" dist/telemt
if [ "${{ matrix.target }}" = "aarch64-unknown-linux-musl" ]; then
STRIP_BIN=aarch64-linux-musl-strip
else
STRIP_BIN=strip
fi
"${STRIP_BIN}" dist/telemt
cd dist cd dist
tar -czf "${{ matrix.asset }}.tar.gz" \ tar -czf "${{ matrix.asset }}.tar.gz" \
--owner=0 --group=0 --numeric-owner \ --owner=0 --group=0 --numeric-owner \

2
Cargo.lock generated
View File

@ -2780,7 +2780,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]] [[package]]
name = "telemt" name = "telemt"
version = "3.3.37" version = "3.3.38"
dependencies = [ dependencies = [
"aes", "aes",
"anyhow", "anyhow",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "3.3.37" version = "3.3.38"
edition = "2024" edition = "2024"
[features] [features]

View File

@ -2,7 +2,10 @@
***Löst Probleme, bevor andere überhaupt wissen, dass sie existieren*** / ***It solves problems before others even realize they exist*** ***Löst Probleme, bevor andere überhaupt wissen, dass sie existieren*** / ***It solves problems before others even realize they exist***
[**Telemt Chat in Telegram**](https://t.me/telemtrs) ### [**Telemt Chat in Telegram**](https://t.me/telemtrs)
#### Fixed TLS ClientHello is now available in Telegram Desktop starting from version 6.7.2: to work with EE-MTProxy, please update your client;
#### Fixed TLS ClientHello for Telegram Android Client is available in [our chat](https://t.me/telemtrs/30234/36441); official releases for Android and iOS are "work in progress";
**Telemt** is a fast, secure, and feature-rich server written in Rust: it fully implements the official Telegram proxy algo and adds many production-ready improvements such as: **Telemt** is a fast, secure, and feature-rich server written in Rust: it fully implements the official Telegram proxy algo and adds many production-ready improvements such as:
- [ME Pool + Reader/Writer + Registry + Refill + Adaptive Floor + Trio-State + Generation Lifecycle](https://github.com/telemt/telemt/blob/main/docs/model/MODEL.en.md) - [ME Pool + Reader/Writer + Registry + Refill + Adaptive Floor + Trio-State + Generation Lifecycle](https://github.com/telemt/telemt/blob/main/docs/model/MODEL.en.md)
@ -51,8 +54,12 @@
- [FAQ EN](docs/FAQ.en.md) - [FAQ EN](docs/FAQ.en.md)
### Recognizability for DPI and crawler ### Recognizability for DPI and crawler
Since version 1.1.0.0, we have debugged masking perfectly: for all clients without "presenting" a key,
we transparently direct traffic to the target host! On April 1, 2026, we became aware of a method for detecting MTProxy Fake-TLS,
based on the ECH extension and the ordering of cipher suites,
as well as an overall unique JA3/JA4 fingerprint
that does not occur in modern browsers:
we have already submitted initial changes to the Telegram Desktop developers and are working on updates for other clients.
- We consider this a breakthrough aspect, which has no stable analogues today - We consider this a breakthrough aspect, which has no stable analogues today
- Based on this: if `telemt` configured correctly, **TLS mode is completely identical to real-life handshake + communication** with a specified host - Based on this: if `telemt` configured correctly, **TLS mode is completely identical to real-life handshake + communication** with a specified host

File diff suppressed because it is too large Load Diff

View File

@ -128,8 +128,8 @@ WorkingDirectory=/opt/telemt
ExecStart=/bin/telemt /etc/telemt/telemt.toml ExecStart=/bin/telemt /etc/telemt/telemt.toml
Restart=on-failure Restart=on-failure
LimitNOFILE=65536 LimitNOFILE=65536
AmbientCapabilities=CAP_NET_BIND_SERVICE AmbientCapabilities=CAP_NET_ADMIN CAP_NET_BIND_SERVICE
CapabilityBoundingSet=CAP_NET_BIND_SERVICE CapabilityBoundingSet=CAP_NET_ADMIN CAP_NET_BIND_SERVICE
NoNewPrivileges=true NoNewPrivileges=true
[Install] [Install]

View File

@ -128,8 +128,8 @@ WorkingDirectory=/opt/telemt
ExecStart=/bin/telemt /etc/telemt/telemt.toml ExecStart=/bin/telemt /etc/telemt/telemt.toml
Restart=on-failure Restart=on-failure
LimitNOFILE=65536 LimitNOFILE=65536
AmbientCapabilities=CAP_NET_BIND_SERVICE AmbientCapabilities=CAP_NET_ADMIN CAP_NET_BIND_SERVICE
CapabilityBoundingSet=CAP_NET_BIND_SERVICE CapabilityBoundingSet=CAP_NET_ADMIN CAP_NET_BIND_SERVICE
NoNewPrivileges=true NoNewPrivileges=true
[Install] [Install]

View File

@ -8,18 +8,62 @@ CONFIG_DIR="${CONFIG_DIR:-/etc/telemt}"
CONFIG_FILE="${CONFIG_FILE:-${CONFIG_DIR}/telemt.toml}" CONFIG_FILE="${CONFIG_FILE:-${CONFIG_DIR}/telemt.toml}"
WORK_DIR="${WORK_DIR:-/opt/telemt}" WORK_DIR="${WORK_DIR:-/opt/telemt}"
TLS_DOMAIN="${TLS_DOMAIN:-petrovich.ru}" TLS_DOMAIN="${TLS_DOMAIN:-petrovich.ru}"
SERVER_PORT="${SERVER_PORT:-443}"
USER_SECRET=""
AD_TAG=""
SERVICE_NAME="telemt" SERVICE_NAME="telemt"
TEMP_DIR="" TEMP_DIR=""
SUDO="" SUDO=""
CONFIG_PARENT_DIR="" CONFIG_PARENT_DIR=""
SERVICE_START_FAILED=0 SERVICE_START_FAILED=0
PORT_PROVIDED=0
SECRET_PROVIDED=0
AD_TAG_PROVIDED=0
DOMAIN_PROVIDED=0
ACTION="install" ACTION="install"
TARGET_VERSION="${VERSION:-latest}" TARGET_VERSION="${VERSION:-latest}"
while [ $# -gt 0 ]; do while [ $# -gt 0 ]; do
case "$1" in case "$1" in
-h|--help) ACTION="help"; shift ;; -h|--help) ACTION="help"; shift ;;
-d|--domain)
if [ "$#" -lt 2 ] || [ -z "$2" ]; then
printf '[ERROR] %s requires a domain argument.\n' "$1" >&2
exit 1
fi
TLS_DOMAIN="$2"; DOMAIN_PROVIDED=1; shift 2 ;;
-p|--port)
if [ "$#" -lt 2 ] || [ -z "$2" ]; then
printf '[ERROR] %s requires a port argument.\n' "$1" >&2; exit 1
fi
case "$2" in
*[!0-9]*) printf '[ERROR] Port must be a valid number.\n' >&2; exit 1 ;;
esac
port_num="$(printf '%s\n' "$2" | sed 's/^0*//')"
[ -z "$port_num" ] && port_num="0"
if [ "${#port_num}" -gt 5 ] || [ "$port_num" -lt 1 ] || [ "$port_num" -gt 65535 ]; then
printf '[ERROR] Port must be between 1 and 65535.\n' >&2; exit 1
fi
SERVER_PORT="$port_num"; PORT_PROVIDED=1; shift 2 ;;
-s|--secret)
if [ "$#" -lt 2 ] || [ -z "$2" ]; then
printf '[ERROR] %s requires a secret argument.\n' "$1" >&2; exit 1
fi
case "$2" in
*[!0-9a-fA-F]*)
printf '[ERROR] Secret must contain only hex characters.\n' >&2; exit 1 ;;
esac
if [ "${#2}" -ne 32 ]; then
printf '[ERROR] Secret must be exactly 32 chars.\n' >&2; exit 1
fi
USER_SECRET="$2"; SECRET_PROVIDED=1; shift 2 ;;
-a|--ad-tag|--ad_tag)
if [ "$#" -lt 2 ] || [ -z "$2" ]; then
printf '[ERROR] %s requires an ad_tag argument.\n' "$1" >&2; exit 1
fi
AD_TAG="$2"; AD_TAG_PROVIDED=1; shift 2 ;;
uninstall|--uninstall) uninstall|--uninstall)
if [ "$ACTION" != "purge" ]; then ACTION="uninstall"; fi if [ "$ACTION" != "purge" ]; then ACTION="uninstall"; fi
shift ;; shift ;;
@ -52,11 +96,17 @@ cleanup() {
trap cleanup EXIT INT TERM trap cleanup EXIT INT TERM
show_help() { show_help() {
say "Usage: $0 [ <version> | install | uninstall | purge | --help ]" say "Usage: $0 [ <version> | install | uninstall | purge ] [ options ]"
say " <version> Install specific version (e.g. 3.3.15, default: latest)" say " <version> Install specific version (e.g. 3.3.15, default: latest)"
say " install Install the latest version" say " install Install the latest version"
say " uninstall Remove the binary and service (keeps config and user)" say " uninstall Remove the binary and service"
say " purge Remove everything including configuration, data, and user" say " purge Remove everything including configuration, data, and user"
say ""
say "Options:"
say " -d, --domain Set TLS domain (default: petrovich.ru)"
say " -p, --port Set server port (default: 443)"
say " -s, --secret Set specific user secret (32 hex characters)"
say " -a, --ad-tag Set ad_tag"
exit 0 exit 0
} }
@ -112,6 +162,14 @@ get_svc_mgr() {
else echo "none"; fi else echo "none"; fi
} }
is_config_exists() {
if [ -n "$SUDO" ]; then
$SUDO sh -c '[ -f "$1" ]' _ "$CONFIG_FILE"
else
[ -f "$CONFIG_FILE" ]
fi
}
verify_common() { verify_common() {
[ -n "$BIN_NAME" ] || die "BIN_NAME cannot be empty." [ -n "$BIN_NAME" ] || die "BIN_NAME cannot be empty."
[ -n "$INSTALL_DIR" ] || die "INSTALL_DIR cannot be empty." [ -n "$INSTALL_DIR" ] || die "INSTALL_DIR cannot be empty."
@ -119,7 +177,7 @@ verify_common() {
[ -n "$CONFIG_FILE" ] || die "CONFIG_FILE cannot be empty." [ -n "$CONFIG_FILE" ] || die "CONFIG_FILE cannot be empty."
case "${INSTALL_DIR}${CONFIG_DIR}${WORK_DIR}${CONFIG_FILE}" in case "${INSTALL_DIR}${CONFIG_DIR}${WORK_DIR}${CONFIG_FILE}" in
*[!a-zA-Z0-9_./-]*) die "Invalid characters in paths. Only alphanumeric, _, ., -, and / allowed." ;; *[!a-zA-Z0-9_./-]*) die "Invalid characters in paths." ;;
esac esac
case "$TARGET_VERSION" in *[!a-zA-Z0-9_.-]*) die "Invalid characters in version." ;; esac case "$TARGET_VERSION" in *[!a-zA-Z0-9_.-]*) die "Invalid characters in version." ;; esac
@ -137,11 +195,11 @@ verify_common() {
if [ "$(id -u)" -eq 0 ]; then if [ "$(id -u)" -eq 0 ]; then
SUDO="" SUDO=""
else else
command -v sudo >/dev/null 2>&1 || die "This script requires root or sudo. Neither found." command -v sudo >/dev/null 2>&1 || die "This script requires root or sudo."
SUDO="sudo" SUDO="sudo"
if ! sudo -n true 2>/dev/null; then if ! sudo -n true 2>/dev/null; then
if ! [ -t 0 ]; then if ! [ -t 0 ]; then
die "sudo requires a password, but no TTY detected. Aborting to prevent hang." die "sudo requires a password, but no TTY detected."
fi fi
fi fi
fi fi
@ -154,21 +212,7 @@ verify_common() {
die "Safety check failed: CONFIG_FILE '$CONFIG_FILE' is a directory." die "Safety check failed: CONFIG_FILE '$CONFIG_FILE' is a directory."
fi fi
for path in "$CONFIG_DIR" "$CONFIG_PARENT_DIR" "$WORK_DIR"; do for cmd in id uname awk grep find rm chown chmod mv mktemp mkdir tr dd sed ps head sleep cat tar gzip; do
check_path="$(get_realpath "$path")"
case "$check_path" in
/|/bin|/sbin|/usr|/usr/bin|/usr/sbin|/usr/local|/usr/local/bin|/usr/local/sbin|/usr/local/etc|/usr/local/share|/etc|/var|/var/lib|/var/log|/var/run|/home|/root|/tmp|/lib|/lib64|/opt|/run|/boot|/dev|/sys|/proc)
die "Safety check failed: '$path' (resolved to '$check_path') is a critical system directory." ;;
esac
done
check_install_dir="$(get_realpath "$INSTALL_DIR")"
case "$check_install_dir" in
/|/etc|/var|/home|/root|/tmp|/usr|/usr/local|/opt|/boot|/dev|/sys|/proc|/run)
die "Safety check failed: INSTALL_DIR '$INSTALL_DIR' is a critical system directory." ;;
esac
for cmd in id uname grep find rm chown chmod mv mktemp mkdir tr dd sed ps head sleep cat tar gzip rmdir; do
command -v "$cmd" >/dev/null 2>&1 || die "Required command not found: $cmd" command -v "$cmd" >/dev/null 2>&1 || die "Required command not found: $cmd"
done done
} }
@ -177,14 +221,41 @@ verify_install_deps() {
command -v curl >/dev/null 2>&1 || command -v wget >/dev/null 2>&1 || die "Neither curl nor wget is installed." command -v curl >/dev/null 2>&1 || command -v wget >/dev/null 2>&1 || die "Neither curl nor wget is installed."
command -v cp >/dev/null 2>&1 || command -v install >/dev/null 2>&1 || die "Need cp or install" command -v cp >/dev/null 2>&1 || command -v install >/dev/null 2>&1 || die "Need cp or install"
if ! command -v setcap >/dev/null 2>&1; then if ! command -v setcap >/dev/null 2>&1 || ! command -v conntrack >/dev/null 2>&1; then
if command -v apk >/dev/null 2>&1; then if command -v apk >/dev/null 2>&1; then
$SUDO apk add --no-cache libcap-utils >/dev/null 2>&1 || $SUDO apk add --no-cache libcap >/dev/null 2>&1 || true $SUDO apk add --no-cache libcap-utils libcap conntrack-tools >/dev/null 2>&1 || true
elif command -v apt-get >/dev/null 2>&1; then elif command -v apt-get >/dev/null 2>&1; then
$SUDO apt-get update -q >/dev/null 2>&1 || true $SUDO env DEBIAN_FRONTEND=noninteractive apt-get install -y -q libcap2-bin conntrack >/dev/null 2>&1 || {
$SUDO apt-get install -y -q libcap2-bin >/dev/null 2>&1 || true $SUDO env DEBIAN_FRONTEND=noninteractive apt-get update -q >/dev/null 2>&1 || true
elif command -v dnf >/dev/null 2>&1; then $SUDO dnf install -y -q libcap >/dev/null 2>&1 || true $SUDO env DEBIAN_FRONTEND=noninteractive apt-get install -y -q libcap2-bin conntrack >/dev/null 2>&1 || true
elif command -v yum >/dev/null 2>&1; then $SUDO yum install -y -q libcap >/dev/null 2>&1 || true }
elif command -v dnf >/dev/null 2>&1; then $SUDO dnf install -y -q libcap conntrack-tools >/dev/null 2>&1 || true
elif command -v yum >/dev/null 2>&1; then $SUDO yum install -y -q libcap conntrack-tools >/dev/null 2>&1 || true
fi
fi
}
check_port_availability() {
port_info=""
if command -v ss >/dev/null 2>&1; then
port_info=$($SUDO ss -tulnp 2>/dev/null | grep -E ":${SERVER_PORT}([[:space:]]|$)" || true)
elif command -v netstat >/dev/null 2>&1; then
port_info=$($SUDO netstat -tulnp 2>/dev/null | grep -E ":${SERVER_PORT}([[:space:]]|$)" || true)
elif command -v lsof >/dev/null 2>&1; then
port_info=$($SUDO lsof -i :${SERVER_PORT} 2>/dev/null | grep LISTEN || true)
else
say "[WARNING] Network diagnostic tools (ss, netstat, lsof) not found. Skipping port check."
return 0
fi
if [ -n "$port_info" ]; then
if printf '%s\n' "$port_info" | grep -q "${BIN_NAME}"; then
say " -> Port ${SERVER_PORT} is in use by ${BIN_NAME}. Ignoring as it will be restarted."
else
say "[ERROR] Port ${SERVER_PORT} is already in use by another process:"
printf ' %s\n' "$port_info"
die "Please free the port ${SERVER_PORT} or change it and try again."
fi fi
fi fi
} }
@ -192,7 +263,13 @@ verify_install_deps() {
detect_arch() { detect_arch() {
sys_arch="$(uname -m)" sys_arch="$(uname -m)"
case "$sys_arch" in case "$sys_arch" in
x86_64|amd64) echo "x86_64" ;; x86_64|amd64)
if [ -r /proc/cpuinfo ] && grep -q "avx2" /proc/cpuinfo 2>/dev/null && grep -q "bmi2" /proc/cpuinfo 2>/dev/null; then
echo "x86_64-v3"
else
echo "x86_64"
fi
;;
aarch64|arm64) echo "aarch64" ;; aarch64|arm64) echo "aarch64" ;;
*) die "Unsupported architecture: $sys_arch" ;; *) die "Unsupported architecture: $sys_arch" ;;
esac esac
@ -261,17 +338,19 @@ install_binary() {
fi fi
$SUDO mkdir -p "$INSTALL_DIR" || die "Failed to create install directory" $SUDO mkdir -p "$INSTALL_DIR" || die "Failed to create install directory"
$SUDO rm -f "$bin_dst" 2>/dev/null || true
if command -v install >/dev/null 2>&1; then if command -v install >/dev/null 2>&1; then
$SUDO install -m 0755 "$bin_src" "$bin_dst" || die "Failed to install binary" $SUDO install -m 0755 "$bin_src" "$bin_dst" || die "Failed to install binary"
else else
$SUDO rm -f "$bin_dst" 2>/dev/null || true
$SUDO cp "$bin_src" "$bin_dst" && $SUDO chmod 0755 "$bin_dst" || die "Failed to copy binary" $SUDO cp "$bin_src" "$bin_dst" && $SUDO chmod 0755 "$bin_dst" || die "Failed to copy binary"
fi fi
$SUDO sh -c '[ -x "$1" ]' _ "$bin_dst" || die "Binary not executable: $bin_dst" $SUDO sh -c '[ -x "$1" ]' _ "$bin_dst" || die "Binary not executable: $bin_dst"
if command -v setcap >/dev/null 2>&1; then if command -v setcap >/dev/null 2>&1; then
$SUDO setcap cap_net_bind_service=+ep "$bin_dst" 2>/dev/null || true $SUDO setcap cap_net_bind_service,cap_net_admin=+ep "$bin_dst" 2>/dev/null || true
fi fi
} }
@ -287,11 +366,20 @@ generate_secret() {
} }
generate_config_content() { generate_config_content() {
conf_secret="$1"
conf_tag="$2"
escaped_tls_domain="$(printf '%s\n' "$TLS_DOMAIN" | tr -d '[:cntrl:]' | sed 's/\\/\\\\/g; s/"/\\"/g')" escaped_tls_domain="$(printf '%s\n' "$TLS_DOMAIN" | tr -d '[:cntrl:]' | sed 's/\\/\\\\/g; s/"/\\"/g')"
cat <<EOF cat <<EOF
[general] [general]
use_middle_proxy = false use_middle_proxy = true
EOF
if [ -n "$conf_tag" ]; then
echo "ad_tag = \"${conf_tag}\""
fi
cat <<EOF
[general.modes] [general.modes]
classic = false classic = false
@ -299,7 +387,7 @@ secure = false
tls = true tls = true
[server] [server]
port = 443 port = ${SERVER_PORT}
[server.api] [server.api]
enabled = true enabled = true
@ -310,28 +398,73 @@ whitelist = ["127.0.0.1/32"]
tls_domain = "${escaped_tls_domain}" tls_domain = "${escaped_tls_domain}"
[access.users] [access.users]
hello = "$1" hello = "${conf_secret}"
EOF EOF
} }
install_config() { install_config() {
if [ -n "$SUDO" ]; then if is_config_exists; then
if $SUDO sh -c '[ -f "$1" ]' _ "$CONFIG_FILE"; then say " -> Config already exists at $CONFIG_FILE. Updating parameters..."
say " -> Config already exists at $CONFIG_FILE. Skipping creation."
return 0 tmp_conf="${TEMP_DIR}/config.tmp"
fi $SUDO cat "$CONFIG_FILE" > "$tmp_conf"
elif [ -f "$CONFIG_FILE" ]; then
say " -> Config already exists at $CONFIG_FILE. Skipping creation." escaped_domain="$(printf '%s\n' "$TLS_DOMAIN" | tr -d '[:cntrl:]' | sed 's/\\/\\\\/g; s/"/\\"/g')"
export AWK_PORT="$SERVER_PORT"
export AWK_SECRET="$USER_SECRET"
export AWK_DOMAIN="$escaped_domain"
export AWK_AD_TAG="$AD_TAG"
export AWK_FLAG_P="$PORT_PROVIDED"
export AWK_FLAG_S="$SECRET_PROVIDED"
export AWK_FLAG_D="$DOMAIN_PROVIDED"
export AWK_FLAG_A="$AD_TAG_PROVIDED"
awk '
BEGIN { ad_tag_handled = 0 }
ENVIRON["AWK_FLAG_P"] == "1" && /^[ \t]*port[ \t]*=/ { print "port = " ENVIRON["AWK_PORT"]; next }
ENVIRON["AWK_FLAG_S"] == "1" && /^[ \t]*hello[ \t]*=/ { print "hello = \"" ENVIRON["AWK_SECRET"] "\""; next }
ENVIRON["AWK_FLAG_D"] == "1" && /^[ \t]*tls_domain[ \t]*=/ { print "tls_domain = \"" ENVIRON["AWK_DOMAIN"] "\""; next }
ENVIRON["AWK_FLAG_A"] == "1" && /^[ \t]*ad_tag[ \t]*=/ {
if (!ad_tag_handled) {
print "ad_tag = \"" ENVIRON["AWK_AD_TAG"] "\"";
ad_tag_handled = 1;
}
next
}
ENVIRON["AWK_FLAG_A"] == "1" && /^\[general\]/ {
print;
if (!ad_tag_handled) {
print "ad_tag = \"" ENVIRON["AWK_AD_TAG"] "\"";
ad_tag_handled = 1;
}
next
}
{ print }
' "$tmp_conf" > "${tmp_conf}.new" && mv "${tmp_conf}.new" "$tmp_conf"
[ "$PORT_PROVIDED" -eq 1 ] && say " -> Updated port: $SERVER_PORT"
[ "$SECRET_PROVIDED" -eq 1 ] && say " -> Updated secret for user 'hello'"
[ "$DOMAIN_PROVIDED" -eq 1 ] && say " -> Updated tls_domain: $TLS_DOMAIN"
[ "$AD_TAG_PROVIDED" -eq 1 ] && say " -> Updated ad_tag"
write_root "$CONFIG_FILE" < "$tmp_conf"
rm -f "$tmp_conf"
return 0 return 0
fi fi
toml_secret="$(generate_secret)" || die "Failed to generate secret." if [ -z "$USER_SECRET" ]; then
USER_SECRET="$(generate_secret)" || die "Failed to generate secret."
fi
generate_config_content "$toml_secret" | write_root "$CONFIG_FILE" || die "Failed to install config" generate_config_content "$USER_SECRET" "$AD_TAG" | write_root "$CONFIG_FILE" || die "Failed to install config"
$SUDO chown root:telemt "$CONFIG_FILE" && $SUDO chmod 640 "$CONFIG_FILE" $SUDO chown root:telemt "$CONFIG_FILE" && $SUDO chmod 640 "$CONFIG_FILE"
say " -> Config created successfully." say " -> Config created successfully."
say " -> Generated secret for default user 'hello': $toml_secret" say " -> Configured secret for user 'hello': $USER_SECRET"
} }
generate_systemd_content() { generate_systemd_content() {
@ -348,9 +481,10 @@ Group=telemt
WorkingDirectory=$WORK_DIR WorkingDirectory=$WORK_DIR
ExecStart="${INSTALL_DIR}/${BIN_NAME}" "${CONFIG_FILE}" ExecStart="${INSTALL_DIR}/${BIN_NAME}" "${CONFIG_FILE}"
Restart=on-failure Restart=on-failure
RestartSec=5
LimitNOFILE=65536 LimitNOFILE=65536
AmbientCapabilities=CAP_NET_BIND_SERVICE AmbientCapabilities=CAP_NET_BIND_SERVICE CAP_NET_ADMIN
CapabilityBoundingSet=CAP_NET_BIND_SERVICE CapabilityBoundingSet=CAP_NET_BIND_SERVICE CAP_NET_ADMIN
[Install] [Install]
WantedBy=multi-user.target WantedBy=multi-user.target
@ -415,7 +549,8 @@ kill_user_procs() {
if command -v pgrep >/dev/null 2>&1; then if command -v pgrep >/dev/null 2>&1; then
pids="$(pgrep -u telemt 2>/dev/null || true)" pids="$(pgrep -u telemt 2>/dev/null || true)"
else else
pids="$(ps -u telemt -o pid= 2>/dev/null || true)" pids="$(ps -ef 2>/dev/null | awk '$1=="telemt"{print $2}' || true)"
[ -z "$pids" ] && pids="$(ps 2>/dev/null | awk '$2=="telemt"{print $1}' || true)"
fi fi
if [ -n "$pids" ]; then if [ -n "$pids" ]; then
@ -457,11 +592,12 @@ uninstall() {
say ">>> Stage 5: Purging configuration, data, and user" say ">>> Stage 5: Purging configuration, data, and user"
$SUDO rm -rf "$CONFIG_DIR" "$WORK_DIR" $SUDO rm -rf "$CONFIG_DIR" "$WORK_DIR"
$SUDO rm -f "$CONFIG_FILE" $SUDO rm -f "$CONFIG_FILE"
if [ "$CONFIG_PARENT_DIR" != "$CONFIG_DIR" ] && [ "$CONFIG_PARENT_DIR" != "." ] && [ "$CONFIG_PARENT_DIR" != "/" ]; then sleep 1
$SUDO rmdir "$CONFIG_PARENT_DIR" 2>/dev/null || true
fi
$SUDO userdel telemt 2>/dev/null || $SUDO deluser telemt 2>/dev/null || true $SUDO userdel telemt 2>/dev/null || $SUDO deluser telemt 2>/dev/null || true
$SUDO groupdel telemt 2>/dev/null || $SUDO delgroup telemt 2>/dev/null || true
if check_os_entity group telemt; then
$SUDO groupdel telemt 2>/dev/null || $SUDO delgroup telemt 2>/dev/null || true
fi
else else
say "Note: Configuration and user kept. Run with 'purge' to remove completely." say "Note: Configuration and user kept. Run with 'purge' to remove completely."
fi fi
@ -479,7 +615,17 @@ case "$ACTION" in
say "Starting installation of $BIN_NAME (Version: $TARGET_VERSION)" say "Starting installation of $BIN_NAME (Version: $TARGET_VERSION)"
say ">>> Stage 1: Verifying environment and dependencies" say ">>> Stage 1: Verifying environment and dependencies"
verify_common; verify_install_deps verify_common
verify_install_deps
if is_config_exists && [ "$PORT_PROVIDED" -eq 0 ]; then
ext_port="$($SUDO awk -F'=' '/^[ \t]*port[ \t]*=/ {gsub(/[^0-9]/, "", $2); print $2; exit}' "$CONFIG_FILE" 2>/dev/null || true)"
if [ -n "$ext_port" ]; then
SERVER_PORT="$ext_port"
fi
fi
check_port_availability
if [ "$TARGET_VERSION" != "latest" ]; then if [ "$TARGET_VERSION" != "latest" ]; then
TARGET_VERSION="${TARGET_VERSION#v}" TARGET_VERSION="${TARGET_VERSION#v}"
@ -500,7 +646,21 @@ case "$ACTION" in
die "Temp directory is invalid or was not created" die "Temp directory is invalid or was not created"
fi fi
fetch_file "$DL_URL" "${TEMP_DIR}/${FILE_NAME}" || die "Download failed" if ! fetch_file "$DL_URL" "${TEMP_DIR}/${FILE_NAME}"; then
if [ "$ARCH" = "x86_64-v3" ]; then
say " -> x86_64-v3 build not found, falling back to standard x86_64..."
ARCH="x86_64"
FILE_NAME="${BIN_NAME}-${ARCH}-linux-${LIBC}.tar.gz"
if [ "$TARGET_VERSION" = "latest" ]; then
DL_URL="https://github.com/${REPO}/releases/latest/download/${FILE_NAME}"
else
DL_URL="https://github.com/${REPO}/releases/download/${TARGET_VERSION}/${FILE_NAME}"
fi
fetch_file "$DL_URL" "${TEMP_DIR}/${FILE_NAME}" || die "Download failed"
else
die "Download failed"
fi
fi
say ">>> Stage 3: Extracting archive" say ">>> Stage 3: Extracting archive"
if ! gzip -dc "${TEMP_DIR}/${FILE_NAME}" | tar -xf - -C "$TEMP_DIR" 2>/dev/null; then if ! gzip -dc "${TEMP_DIR}/${FILE_NAME}" | tar -xf - -C "$TEMP_DIR" 2>/dev/null; then
@ -516,7 +676,7 @@ case "$ACTION" in
say ">>> Stage 5: Installing binary" say ">>> Stage 5: Installing binary"
install_binary "$EXTRACTED_BIN" "${INSTALL_DIR}/${BIN_NAME}" install_binary "$EXTRACTED_BIN" "${INSTALL_DIR}/${BIN_NAME}"
say ">>> Stage 6: Generating configuration" say ">>> Stage 6: Generating/Updating configuration"
install_config install_config
say ">>> Stage 7: Installing and starting service" say ">>> Stage 7: Installing and starting service"
@ -543,11 +703,14 @@ case "$ACTION" in
printf ' rc-service %s status\n\n' "$SERVICE_NAME" printf ' rc-service %s status\n\n' "$SERVICE_NAME"
fi fi
API_LISTEN="$($SUDO awk -F'"' '/^[ \t]*listen[ \t]*=/ {print $2; exit}' "$CONFIG_FILE" 2>/dev/null || true)"
API_LISTEN="${API_LISTEN:-127.0.0.1:9091}"
printf 'To get your user connection links (for Telegram), run:\n' printf 'To get your user connection links (for Telegram), run:\n'
if command -v jq >/dev/null 2>&1; then if command -v jq >/dev/null 2>&1; then
printf ' curl -s http://127.0.0.1:9091/v1/users | jq -r '\''.data[] | "User: \\(.username)\\n\\(.links.tls[0] // empty)\\n"'\''\n' printf ' curl -s http://%s/v1/users | jq -r '\''.data[]? | "User: \\(.username)\\n\\(.links.tls[0] // empty)\\n"'\''\n' "$API_LISTEN"
else else
printf ' curl -s http://127.0.0.1:9091/v1/users\n' printf ' curl -s http://%s/v1/users\n' "$API_LISTEN"
printf ' (Tip: Install '\''jq'\'' for a much cleaner output)\n' printf ' (Tip: Install '\''jq'\'' for a much cleaner output)\n'
fi fi

View File

@ -100,7 +100,7 @@ pub(crate) fn default_fake_cert_len() -> usize {
} }
pub(crate) fn default_tls_front_dir() -> String { pub(crate) fn default_tls_front_dir() -> String {
"tlsfront".to_string() "/etc/telemt/tlsfront".to_string()
} }
pub(crate) fn default_replay_check_len() -> usize { pub(crate) fn default_replay_check_len() -> usize {
@ -302,7 +302,7 @@ pub(crate) fn default_me2dc_fallback() -> bool {
} }
pub(crate) fn default_me2dc_fast() -> bool { pub(crate) fn default_me2dc_fast() -> bool {
false true
} }
pub(crate) fn default_keepalive_interval() -> u64 { pub(crate) fn default_keepalive_interval() -> u64 {
@ -558,7 +558,7 @@ pub(crate) fn default_beobachten_flush_secs() -> u64 {
} }
pub(crate) fn default_beobachten_file() -> String { pub(crate) fn default_beobachten_file() -> String {
"cache/beobachten.txt".to_string() "/etc/telemt/beobachten.txt".to_string()
} }
pub(crate) fn default_tls_new_session_tickets() -> u8 { pub(crate) fn default_tls_new_session_tickets() -> u8 {

View File

@ -947,7 +947,11 @@ impl ProxyConfig {
} }
if matches!(config.server.conntrack_control.mode, ConntrackMode::Hybrid) if matches!(config.server.conntrack_control.mode, ConntrackMode::Hybrid)
&& config.server.conntrack_control.hybrid_listener_ips.is_empty() && config
.server
.conntrack_control
.hybrid_listener_ips
.is_empty()
{ {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid" "server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid"
@ -2503,9 +2507,9 @@ mod tests {
let path = dir.join("telemt_conntrack_high_watermark_invalid_test.toml"); let path = dir.join("telemt_conntrack_high_watermark_invalid_test.toml");
std::fs::write(&path, toml).unwrap(); std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string(); let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!( assert!(err.contains(
err.contains("server.conntrack_control.pressure_high_watermark_pct must be within [1, 100]") "server.conntrack_control.pressure_high_watermark_pct must be within [1, 100]"
); ));
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }
@ -2570,9 +2574,9 @@ mod tests {
let path = dir.join("telemt_conntrack_hybrid_requires_ips_test.toml"); let path = dir.join("telemt_conntrack_hybrid_requires_ips_test.toml");
std::fs::write(&path, toml).unwrap(); std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string(); let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!( assert!(err.contains(
err.contains("server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid") "server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid"
); ));
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }

View File

@ -56,7 +56,11 @@ pub(crate) fn spawn_conntrack_controller(
shared: Arc<ProxySharedState>, shared: Arc<ProxySharedState>,
) { ) {
if !cfg!(target_os = "linux") { if !cfg!(target_os = "linux") {
let enabled = config_rx.borrow().server.conntrack_control.inline_conntrack_control; let enabled = config_rx
.borrow()
.server
.conntrack_control
.inline_conntrack_control;
stats.set_conntrack_control_enabled(enabled); stats.set_conntrack_control_enabled(enabled);
stats.set_conntrack_control_available(false); stats.set_conntrack_control_available(false);
stats.set_conntrack_pressure_active(false); stats.set_conntrack_pressure_active(false);
@ -65,7 +69,9 @@ pub(crate) fn spawn_conntrack_controller(
shared.disable_conntrack_close_sender(); shared.disable_conntrack_close_sender();
shared.set_conntrack_pressure_active(false); shared.set_conntrack_pressure_active(false);
if enabled { if enabled {
warn!("conntrack control is configured but unsupported on this OS; disabling runtime worker"); warn!(
"conntrack control is configured but unsupported on this OS; disabling runtime worker"
);
} }
return; return;
} }
@ -88,7 +94,13 @@ async fn run_conntrack_controller(
let mut delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec; let mut delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec;
let mut backend = pick_backend(cfg.server.conntrack_control.backend); let mut backend = pick_backend(cfg.server.conntrack_control.backend);
apply_runtime_state(stats.as_ref(), shared.as_ref(), &cfg, backend.is_some(), false); apply_runtime_state(
stats.as_ref(),
shared.as_ref(),
&cfg,
backend.is_some(),
false,
);
reconcile_rules(&cfg, backend, stats.as_ref()).await; reconcile_rules(&cfg, backend, stats.as_ref()).await;
loop { loop {
@ -315,7 +327,9 @@ fn pick_backend(configured: ConntrackBackend) -> Option<NetfilterBackend> {
} }
} }
ConntrackBackend::Nftables => command_exists("nft").then_some(NetfilterBackend::Nftables), ConntrackBackend::Nftables => command_exists("nft").then_some(NetfilterBackend::Nftables),
ConntrackBackend::Iptables => command_exists("iptables").then_some(NetfilterBackend::Iptables), ConntrackBackend::Iptables => {
command_exists("iptables").then_some(NetfilterBackend::Iptables)
}
} }
} }
@ -396,7 +410,12 @@ fn notrack_targets(cfg: &ProxyConfig) -> (Vec<Option<IpAddr>>, Vec<Option<IpAddr
} }
async fn apply_nft_rules(cfg: &ProxyConfig) -> Result<(), String> { async fn apply_nft_rules(cfg: &ProxyConfig) -> Result<(), String> {
let _ = run_command("nft", &["delete", "table", "inet", "telemt_conntrack"], None).await; let _ = run_command(
"nft",
&["delete", "table", "inet", "telemt_conntrack"],
None,
)
.await;
if matches!(cfg.server.conntrack_control.mode, ConntrackMode::Tracked) { if matches!(cfg.server.conntrack_control.mode, ConntrackMode::Tracked) {
return Ok(()); return Ok(());
} }
@ -446,7 +465,12 @@ async fn apply_iptables_rules_for_binary(
return Ok(()); return Ok(());
} }
let chain = "TELEMT_NOTRACK"; let chain = "TELEMT_NOTRACK";
let _ = run_command(binary, &["-t", "raw", "-D", "PREROUTING", "-j", chain], None).await; let _ = run_command(
binary,
&["-t", "raw", "-D", "PREROUTING", "-j", chain],
None,
)
.await;
let _ = run_command(binary, &["-t", "raw", "-F", chain], None).await; let _ = run_command(binary, &["-t", "raw", "-F", chain], None).await;
let _ = run_command(binary, &["-t", "raw", "-X", chain], None).await; let _ = run_command(binary, &["-t", "raw", "-X", chain], None).await;
@ -456,8 +480,20 @@ async fn apply_iptables_rules_for_binary(
run_command(binary, &["-t", "raw", "-N", chain], None).await?; run_command(binary, &["-t", "raw", "-N", chain], None).await?;
run_command(binary, &["-t", "raw", "-F", chain], None).await?; run_command(binary, &["-t", "raw", "-F", chain], None).await?;
if run_command(binary, &["-t", "raw", "-C", "PREROUTING", "-j", chain], None).await.is_err() { if run_command(
run_command(binary, &["-t", "raw", "-I", "PREROUTING", "1", "-j", chain], None).await?; binary,
&["-t", "raw", "-C", "PREROUTING", "-j", chain],
None,
)
.await
.is_err()
{
run_command(
binary,
&["-t", "raw", "-I", "PREROUTING", "1", "-j", chain],
None,
)
.await?;
} }
let (v4_targets, v6_targets) = notrack_targets(cfg); let (v4_targets, v6_targets) = notrack_targets(cfg);
@ -487,11 +523,26 @@ async fn apply_iptables_rules_for_binary(
} }
async fn clear_notrack_rules_all_backends() { async fn clear_notrack_rules_all_backends() {
let _ = run_command("nft", &["delete", "table", "inet", "telemt_conntrack"], None).await; let _ = run_command(
let _ = run_command("iptables", &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], None).await; "nft",
&["delete", "table", "inet", "telemt_conntrack"],
None,
)
.await;
let _ = run_command(
"iptables",
&["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"],
None,
)
.await;
let _ = run_command("iptables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await; let _ = run_command("iptables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await;
let _ = run_command("iptables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await; let _ = run_command("iptables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await;
let _ = run_command("ip6tables", &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], None).await; let _ = run_command(
"ip6tables",
&["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"],
None,
)
.await;
let _ = run_command("ip6tables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await; let _ = run_command("ip6tables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await;
let _ = run_command("ip6tables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await; let _ = run_command("ip6tables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await;
} }

View File

@ -88,8 +88,10 @@ pub fn init_logging(
// Use a custom fmt layer that writes to syslog // Use a custom fmt layer that writes to syslog
let fmt_layer = fmt::Layer::default() let fmt_layer = fmt::Layer::default()
.with_ansi(false) .with_ansi(false)
.with_target(true) .with_target(false)
.with_writer(SyslogWriter::new); .with_level(false)
.without_time()
.with_writer(SyslogMakeWriter::new());
tracing_subscriber::registry() tracing_subscriber::registry()
.with(filter_layer) .with(filter_layer)
@ -137,12 +139,17 @@ pub fn init_logging(
/// Syslog writer for tracing. /// Syslog writer for tracing.
#[cfg(unix)] #[cfg(unix)]
#[derive(Clone, Copy)]
struct SyslogMakeWriter;
#[cfg(unix)]
#[derive(Clone, Copy)]
struct SyslogWriter { struct SyslogWriter {
_private: (), priority: libc::c_int,
} }
#[cfg(unix)] #[cfg(unix)]
impl SyslogWriter { impl SyslogMakeWriter {
fn new() -> Self { fn new() -> Self {
// Open syslog connection on first use // Open syslog connection on first use
static INIT: std::sync::Once = std::sync::Once::new(); static INIT: std::sync::Once = std::sync::Once::new();
@ -153,7 +160,18 @@ impl SyslogWriter {
libc::openlog(ident, libc::LOG_PID | libc::LOG_NDELAY, libc::LOG_DAEMON); libc::openlog(ident, libc::LOG_PID | libc::LOG_NDELAY, libc::LOG_DAEMON);
} }
}); });
Self { _private: () } Self
}
}
#[cfg(unix)]
fn syslog_priority_for_level(level: &tracing::Level) -> libc::c_int {
match *level {
tracing::Level::ERROR => libc::LOG_ERR,
tracing::Level::WARN => libc::LOG_WARNING,
tracing::Level::INFO => libc::LOG_INFO,
tracing::Level::DEBUG => libc::LOG_DEBUG,
tracing::Level::TRACE => libc::LOG_DEBUG,
} }
} }
@ -168,26 +186,13 @@ impl std::io::Write for SyslogWriter {
return Ok(buf.len()); return Ok(buf.len());
} }
// Determine priority based on log level in the message
let priority = if msg.contains(" ERROR ") || msg.contains(" error ") {
libc::LOG_ERR
} else if msg.contains(" WARN ") || msg.contains(" warn ") {
libc::LOG_WARNING
} else if msg.contains(" INFO ") || msg.contains(" info ") {
libc::LOG_INFO
} else if msg.contains(" DEBUG ") || msg.contains(" debug ") {
libc::LOG_DEBUG
} else {
libc::LOG_INFO
};
// Write to syslog // Write to syslog
let c_msg = std::ffi::CString::new(msg.as_bytes()) let c_msg = std::ffi::CString::new(msg.as_bytes())
.unwrap_or_else(|_| std::ffi::CString::new("(invalid utf8)").unwrap()); .unwrap_or_else(|_| std::ffi::CString::new("(invalid utf8)").unwrap());
unsafe { unsafe {
libc::syslog( libc::syslog(
priority, self.priority,
b"%s\0".as_ptr() as *const libc::c_char, b"%s\0".as_ptr() as *const libc::c_char,
c_msg.as_ptr(), c_msg.as_ptr(),
); );
@ -202,11 +207,19 @@ impl std::io::Write for SyslogWriter {
} }
#[cfg(unix)] #[cfg(unix)]
impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for SyslogWriter { impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for SyslogMakeWriter {
type Writer = SyslogWriter; type Writer = SyslogWriter;
fn make_writer(&'a self) -> Self::Writer { fn make_writer(&'a self) -> Self::Writer {
SyslogWriter::new() SyslogWriter {
priority: libc::LOG_INFO,
}
}
fn make_writer_for(&'a self, meta: &tracing::Metadata<'_>) -> Self::Writer {
SyslogWriter {
priority: syslog_priority_for_level(meta.level()),
}
} }
} }
@ -302,4 +315,29 @@ mod tests {
LogDestination::Syslog LogDestination::Syslog
)); ));
} }
#[cfg(unix)]
#[test]
fn test_syslog_priority_for_level_mapping() {
assert_eq!(
syslog_priority_for_level(&tracing::Level::ERROR),
libc::LOG_ERR
);
assert_eq!(
syslog_priority_for_level(&tracing::Level::WARN),
libc::LOG_WARNING
);
assert_eq!(
syslog_priority_for_level(&tracing::Level::INFO),
libc::LOG_INFO
);
assert_eq!(
syslog_priority_for_level(&tracing::Level::DEBUG),
libc::LOG_DEBUG
);
assert_eq!(
syslog_priority_for_level(&tracing::Level::TRACE),
libc::LOG_DEBUG
);
}
} }

View File

@ -18,19 +18,38 @@ use crate::transport::middle_proxy::{
pub(crate) fn resolve_runtime_config_path( pub(crate) fn resolve_runtime_config_path(
config_path_cli: &str, config_path_cli: &str,
startup_cwd: &std::path::Path, startup_cwd: &std::path::Path,
config_path_explicit: bool,
) -> PathBuf { ) -> PathBuf {
let raw = PathBuf::from(config_path_cli); if config_path_explicit {
let absolute = if raw.is_absolute() { let raw = PathBuf::from(config_path_cli);
raw let absolute = if raw.is_absolute() {
} else { raw
startup_cwd.join(raw) } else {
}; startup_cwd.join(raw)
absolute.canonicalize().unwrap_or(absolute) };
return absolute.canonicalize().unwrap_or(absolute);
}
let etc_telemt = std::path::Path::new("/etc/telemt");
let candidates = [
startup_cwd.join("config.toml"),
startup_cwd.join("telemt.toml"),
etc_telemt.join("telemt.toml"),
etc_telemt.join("config.toml"),
];
for candidate in candidates {
if candidate.is_file() {
return candidate.canonicalize().unwrap_or(candidate);
}
}
startup_cwd.join("config.toml")
} }
/// Parsed CLI arguments. /// Parsed CLI arguments.
pub(crate) struct CliArgs { pub(crate) struct CliArgs {
pub config_path: String, pub config_path: String,
pub config_path_explicit: bool,
pub data_path: Option<PathBuf>, pub data_path: Option<PathBuf>,
pub silent: bool, pub silent: bool,
pub log_level: Option<String>, pub log_level: Option<String>,
@ -39,6 +58,7 @@ pub(crate) struct CliArgs {
pub(crate) fn parse_cli() -> CliArgs { pub(crate) fn parse_cli() -> CliArgs {
let mut config_path = "config.toml".to_string(); let mut config_path = "config.toml".to_string();
let mut config_path_explicit = false;
let mut data_path: Option<PathBuf> = None; let mut data_path: Option<PathBuf> = None;
let mut silent = false; let mut silent = false;
let mut log_level: Option<String> = None; let mut log_level: Option<String> = None;
@ -74,6 +94,20 @@ pub(crate) fn parse_cli() -> CliArgs {
s.trim_start_matches("--data-path=").to_string(), s.trim_start_matches("--data-path=").to_string(),
)); ));
} }
"--working-dir" => {
i += 1;
if i < args.len() {
data_path = Some(PathBuf::from(args[i].clone()));
} else {
eprintln!("Missing value for --working-dir");
std::process::exit(0);
}
}
s if s.starts_with("--working-dir=") => {
data_path = Some(PathBuf::from(
s.trim_start_matches("--working-dir=").to_string(),
));
}
"--silent" | "-s" => { "--silent" | "-s" => {
silent = true; silent = true;
} }
@ -111,13 +145,11 @@ pub(crate) fn parse_cli() -> CliArgs {
i += 1; i += 1;
} }
} }
s if s.starts_with("--working-dir") => {
if !s.contains('=') {
i += 1;
}
}
s if !s.starts_with('-') => { s if !s.starts_with('-') => {
config_path = s.to_string(); if !matches!(s, "run" | "start" | "stop" | "reload" | "status") {
config_path = s.to_string();
config_path_explicit = true;
}
} }
other => { other => {
eprintln!("Unknown option: {}", other); eprintln!("Unknown option: {}", other);
@ -128,6 +160,7 @@ pub(crate) fn parse_cli() -> CliArgs {
CliArgs { CliArgs {
config_path, config_path,
config_path_explicit,
data_path, data_path,
silent, silent,
log_level, log_level,
@ -152,6 +185,7 @@ fn print_help() {
eprintln!( eprintln!(
" --data-path <DIR> Set data directory (absolute path; overrides config value)" " --data-path <DIR> Set data directory (absolute path; overrides config value)"
); );
eprintln!(" --working-dir <DIR> Alias for --data-path");
eprintln!(" --silent, -s Suppress info logs"); eprintln!(" --silent, -s Suppress info logs");
eprintln!(" --log-level <LEVEL> debug|verbose|normal|silent"); eprintln!(" --log-level <LEVEL> debug|verbose|normal|silent");
eprintln!(" --help, -h Show this help"); eprintln!(" --help, -h Show this help");
@ -210,7 +244,7 @@ mod tests {
let target = startup_cwd.join("config.toml"); let target = startup_cwd.join("config.toml");
std::fs::write(&target, " ").unwrap(); std::fs::write(&target, " ").unwrap();
let resolved = resolve_runtime_config_path("config.toml", &startup_cwd); let resolved = resolve_runtime_config_path("config.toml", &startup_cwd, true);
assert_eq!(resolved, target.canonicalize().unwrap()); assert_eq!(resolved, target.canonicalize().unwrap());
let _ = std::fs::remove_file(&target); let _ = std::fs::remove_file(&target);
@ -226,11 +260,45 @@ mod tests {
let startup_cwd = std::env::temp_dir().join(format!("telemt_cfg_path_missing_{nonce}")); let startup_cwd = std::env::temp_dir().join(format!("telemt_cfg_path_missing_{nonce}"));
std::fs::create_dir_all(&startup_cwd).unwrap(); std::fs::create_dir_all(&startup_cwd).unwrap();
let resolved = resolve_runtime_config_path("missing.toml", &startup_cwd); let resolved = resolve_runtime_config_path("missing.toml", &startup_cwd, true);
assert_eq!(resolved, startup_cwd.join("missing.toml")); assert_eq!(resolved, startup_cwd.join("missing.toml"));
let _ = std::fs::remove_dir(&startup_cwd); let _ = std::fs::remove_dir(&startup_cwd);
} }
#[test]
fn resolve_runtime_config_path_uses_startup_candidates_when_not_explicit() {
let nonce = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let startup_cwd =
std::env::temp_dir().join(format!("telemt_cfg_startup_candidates_{nonce}"));
std::fs::create_dir_all(&startup_cwd).unwrap();
let telemt = startup_cwd.join("telemt.toml");
std::fs::write(&telemt, " ").unwrap();
let resolved = resolve_runtime_config_path("config.toml", &startup_cwd, false);
assert_eq!(resolved, telemt.canonicalize().unwrap());
let _ = std::fs::remove_file(&telemt);
let _ = std::fs::remove_dir(&startup_cwd);
}
#[test]
fn resolve_runtime_config_path_defaults_to_startup_config_when_none_found() {
let nonce = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let startup_cwd = std::env::temp_dir().join(format!("telemt_cfg_startup_default_{nonce}"));
std::fs::create_dir_all(&startup_cwd).unwrap();
let resolved = resolve_runtime_config_path("config.toml", &startup_cwd, false);
assert_eq!(resolved, startup_cwd.join("config.toml"));
let _ = std::fs::remove_dir(&startup_cwd);
}
} }
pub(crate) fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { pub(crate) fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {

View File

@ -28,8 +28,8 @@ use tracing::{error, info, warn};
use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload}; use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload};
use crate::api; use crate::api;
use crate::conntrack_control;
use crate::config::{LogLevel, ProxyConfig}; use crate::config::{LogLevel, ProxyConfig};
use crate::conntrack_control;
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::ip_tracker::UserIpTracker; use crate::ip_tracker::UserIpTracker;
use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe}; use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe};
@ -112,6 +112,7 @@ async fn run_inner(
.await; .await;
let cli_args = parse_cli(); let cli_args = parse_cli();
let config_path_cli = cli_args.config_path; let config_path_cli = cli_args.config_path;
let config_path_explicit = cli_args.config_path_explicit;
let data_path = cli_args.data_path; let data_path = cli_args.data_path;
let cli_silent = cli_args.silent; let cli_silent = cli_args.silent;
let cli_log_level = cli_args.log_level; let cli_log_level = cli_args.log_level;
@ -123,7 +124,8 @@ async fn run_inner(
std::process::exit(1); std::process::exit(1);
} }
}; };
let config_path = resolve_runtime_config_path(&config_path_cli, &startup_cwd); let mut config_path =
resolve_runtime_config_path(&config_path_cli, &startup_cwd, config_path_explicit);
let mut config = match ProxyConfig::load(&config_path) { let mut config = match ProxyConfig::load(&config_path) {
Ok(c) => c, Ok(c) => c,
@ -133,11 +135,99 @@ async fn run_inner(
std::process::exit(1); std::process::exit(1);
} else { } else {
let default = ProxyConfig::default(); let default = ProxyConfig::default();
std::fs::write(&config_path, toml::to_string_pretty(&default).unwrap()).unwrap();
eprintln!( let serialized =
"[telemt] Created default config at {}", match toml::to_string_pretty(&default).or_else(|_| toml::to_string(&default)) {
config_path.display() Ok(value) => Some(value),
); Err(serialize_error) => {
eprintln!(
"[telemt] Warning: failed to serialize default config: {}",
serialize_error
);
None
}
};
if config_path_explicit {
if let Some(serialized) = serialized.as_ref() {
if let Err(write_error) = std::fs::write(&config_path, serialized) {
eprintln!(
"[telemt] Error: failed to create explicit config at {}: {}",
config_path.display(),
write_error
);
std::process::exit(1);
}
eprintln!(
"[telemt] Created default config at {}",
config_path.display()
);
} else {
eprintln!(
"[telemt] Warning: running with in-memory default config without writing to disk"
);
}
} else {
let system_dir = std::path::Path::new("/etc/telemt");
let system_config_path = system_dir.join("telemt.toml");
let startup_config_path = startup_cwd.join("config.toml");
let mut persisted = false;
if let Some(serialized) = serialized.as_ref() {
match std::fs::create_dir_all(system_dir) {
Ok(()) => match std::fs::write(&system_config_path, serialized) {
Ok(()) => {
config_path = system_config_path;
eprintln!(
"[telemt] Created default config at {}",
config_path.display()
);
persisted = true;
}
Err(write_error) => {
eprintln!(
"[telemt] Warning: failed to write default config at {}: {}",
system_config_path.display(),
write_error
);
}
},
Err(create_error) => {
eprintln!(
"[telemt] Warning: failed to create {}: {}",
system_dir.display(),
create_error
);
}
}
if !persisted {
match std::fs::write(&startup_config_path, serialized) {
Ok(()) => {
config_path = startup_config_path;
eprintln!(
"[telemt] Created default config at {}",
config_path.display()
);
persisted = true;
}
Err(write_error) => {
eprintln!(
"[telemt] Warning: failed to write default config at {}: {}",
startup_config_path.display(),
write_error
);
}
}
}
}
if !persisted {
eprintln!(
"[telemt] Warning: running with in-memory default config without writing to disk"
);
}
}
default default
} }
} }

View File

@ -2,8 +2,8 @@
mod api; mod api;
mod cli; mod cli;
mod conntrack_control;
mod config; mod config;
mod conntrack_control;
mod crypto; mod crypto;
#[cfg(unix)] #[cfg(unix)]
mod daemon; mod daemon;

View File

@ -246,7 +246,9 @@ pub fn seed_tier_for_user(user: &str) -> AdaptiveTier {
if now.saturating_duration_since(value.seen_at) <= PROFILE_TTL { if now.saturating_duration_since(value.seen_at) <= PROFILE_TTL {
return value.tier; return value.tier;
} }
profiles().remove_if(user, |_, v| now.saturating_duration_since(v.seen_at) > PROFILE_TTL); profiles().remove_if(user, |_, v| {
now.saturating_duration_since(v.seen_at) > PROFILE_TTL
});
} }
AdaptiveTier::Base AdaptiveTier::Base
} }

View File

@ -518,15 +518,15 @@ where
); );
return Err(ProxyError::Io(e)); return Err(ProxyError::Io(e));
} }
Err(_) => { Err(_) => {
debug!( debug!(
peer = %real_peer, peer = %real_peer,
idle_secs = first_byte_idle_secs, idle_secs = first_byte_idle_secs,
"Closing idle pooled connection before first client byte" "Closing idle pooled connection before first client byte"
); );
return Ok(()); return Ok(());
}
} }
}
}; };
let handshake_timeout = handshake_timeout_with_mask_grace(&config); let handshake_timeout = handshake_timeout_with_mask_grace(&config);

View File

@ -17,13 +17,13 @@ use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::protocol::constants::*; use crate::protocol::constants::*;
use crate::proxy::handshake::{HandshakeSuccess, encrypt_tg_nonce_with_ciphers, generate_tg_nonce}; use crate::proxy::handshake::{HandshakeSuccess, encrypt_tg_nonce_with_ciphers, generate_tg_nonce};
use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
};
use crate::proxy::route_mode::{ use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state, ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
cutover_stagger_delay, cutover_stagger_delay,
}; };
use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
};
use crate::stats::Stats; use crate::stats::Stats;
use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
use crate::transport::UpstreamManager; use crate::transport::UpstreamManager;

View File

@ -118,7 +118,11 @@ fn auth_probe_state_expired(state: &AuthProbeState, now: Instant) -> bool {
now.duration_since(state.last_seen) > retention now.duration_since(state.last_seen) > retention
} }
fn auth_probe_eviction_offset_in(shared: &ProxySharedState, peer_ip: IpAddr, now: Instant) -> usize { fn auth_probe_eviction_offset_in(
shared: &ProxySharedState,
peer_ip: IpAddr,
now: Instant,
) -> usize {
let hasher_state = &shared.handshake.auth_probe_eviction_hasher; let hasher_state = &shared.handshake.auth_probe_eviction_hasher;
let mut hasher = hasher_state.build_hasher(); let mut hasher = hasher_state.build_hasher();
peer_ip.hash(&mut hasher); peer_ip.hash(&mut hasher);

View File

@ -255,7 +255,11 @@ async fn wait_mask_connect_budget(started: Instant) {
// sigma is chosen so ~99% of raw samples land inside [floor, ceiling] before clamp. // sigma is chosen so ~99% of raw samples land inside [floor, ceiling] before clamp.
// When floor > ceiling (misconfiguration), returns ceiling (the smaller value). // When floor > ceiling (misconfiguration), returns ceiling (the smaller value).
// When floor == ceiling, returns that value. When both are 0, returns 0. // When floor == ceiling, returns that value. When both are 0, returns 0.
pub(crate) fn sample_lognormal_percentile_bounded(floor: u64, ceiling: u64, rng: &mut impl Rng) -> u64 { pub(crate) fn sample_lognormal_percentile_bounded(
floor: u64,
ceiling: u64,
rng: &mut impl Rng,
) -> u64 {
if ceiling == 0 && floor == 0 { if ceiling == 0 && floor == 0 {
return 0; return 0;
} }
@ -296,7 +300,9 @@ fn mask_outcome_target_budget(config: &ProxyConfig) -> Duration {
} }
if ceiling > floor { if ceiling > floor {
let mut rng = rand::rng(); let mut rng = rand::rng();
return Duration::from_millis(sample_lognormal_percentile_bounded(floor, ceiling, &mut rng)); return Duration::from_millis(sample_lognormal_percentile_bounded(
floor, ceiling, &mut rng,
));
} }
// ceiling <= floor: use the larger value (fail-closed: preserve longer delay) // ceiling <= floor: use the larger value (fail-closed: preserve longer delay)
return Duration::from_millis(floor.max(ceiling)); return Duration::from_millis(floor.max(ceiling));

View File

@ -3,12 +3,12 @@ use std::collections::hash_map::DefaultHasher;
use std::collections::{BTreeSet, HashMap}; use std::collections::{BTreeSet, HashMap};
#[cfg(test)] #[cfg(test)]
use std::future::Future; use std::future::Future;
use std::hash::{BuildHasher, Hash};
#[cfg(test)] #[cfg(test)]
use std::hash::Hasher; use std::hash::Hasher;
use std::hash::{BuildHasher, Hash};
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
@ -21,13 +21,13 @@ use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::protocol::constants::{secure_padding_len, *}; use crate::protocol::constants::{secure_padding_len, *};
use crate::proxy::handshake::HandshakeSuccess; use crate::proxy::handshake::HandshakeSuccess;
use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
};
use crate::proxy::route_mode::{ use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state, ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
cutover_stagger_delay, cutover_stagger_delay,
}; };
use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
};
use crate::stats::{ use crate::stats::{
MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, QuotaReserveError, Stats, UserStats, MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, QuotaReserveError, Stats, UserStats,
}; };
@ -257,9 +257,7 @@ impl RelayClientIdlePolicy {
if self.soft_idle > self.hard_idle { if self.soft_idle > self.hard_idle {
self.soft_idle = self.hard_idle; self.soft_idle = self.hard_idle;
} }
self.legacy_frame_read_timeout = self self.legacy_frame_read_timeout = self.legacy_frame_read_timeout.min(pressure_hard_idle_cap);
.legacy_frame_read_timeout
.min(pressure_hard_idle_cap);
if self.grace_after_downstream_activity > self.hard_idle { if self.grace_after_downstream_activity > self.hard_idle {
self.grace_after_downstream_activity = self.hard_idle; self.grace_after_downstream_activity = self.hard_idle;
} }
@ -461,12 +459,15 @@ fn report_desync_frame_too_large_in(
.map(|b| matches!(b[0], b'G' | b'P' | b'H' | b'C' | b'D')) .map(|b| matches!(b[0], b'G' | b'P' | b'H' | b'C' | b'D'))
.unwrap_or(false); .unwrap_or(false);
let now = Instant::now(); let now = Instant::now();
let dedup_key = hash_value_in(shared, &( let dedup_key = hash_value_in(
state.user.as_str(), shared,
state.peer_hash, &(
proto_tag, state.user.as_str(),
DESYNC_ERROR_CLASS, state.peer_hash,
)); proto_tag,
DESYNC_ERROR_CLASS,
),
);
let emit_full = should_emit_full_desync_in(shared, dedup_key, state.desync_all_full, now); let emit_full = should_emit_full_desync_in(shared, dedup_key, state.desync_all_full, now);
let duration_ms = state.started_at.elapsed().as_millis() as u64; let duration_ms = state.started_at.elapsed().as_millis() as u64;
let bytes_me2c = state.bytes_me2c.load(Ordering::Relaxed); let bytes_me2c = state.bytes_me2c.load(Ordering::Relaxed);
@ -631,7 +632,10 @@ fn observe_me_d2c_flush_event(
} }
#[cfg(test)] #[cfg(test)]
pub(crate) fn mark_relay_idle_candidate_for_testing(shared: &ProxySharedState, conn_id: u64) -> bool { pub(crate) fn mark_relay_idle_candidate_for_testing(
shared: &ProxySharedState,
conn_id: u64,
) -> bool {
let registry = &shared.middle_relay.relay_idle_registry; let registry = &shared.middle_relay.relay_idle_registry;
let mut guard = match registry.lock() { let mut guard = match registry.lock() {
Ok(guard) => guard, Ok(guard) => guard,
@ -716,7 +720,10 @@ pub(crate) fn relay_pressure_event_seq_for_testing(shared: &ProxySharedState) ->
#[cfg(test)] #[cfg(test)]
pub(crate) fn relay_idle_mark_seq_for_testing(shared: &ProxySharedState) -> u64 { pub(crate) fn relay_idle_mark_seq_for_testing(shared: &ProxySharedState) -> u64 {
shared.middle_relay.relay_idle_mark_seq.load(Ordering::Relaxed) shared
.middle_relay
.relay_idle_mark_seq
.load(Ordering::Relaxed)
} }
#[cfg(test)] #[cfg(test)]
@ -865,10 +872,7 @@ pub(crate) fn desync_dedup_insert_for_testing(shared: &ProxySharedState, key: u6
} }
#[cfg(test)] #[cfg(test)]
pub(crate) fn desync_dedup_get_for_testing( pub(crate) fn desync_dedup_get_for_testing(shared: &ProxySharedState, key: u64) -> Option<Instant> {
shared: &ProxySharedState,
key: u64,
) -> Option<Instant> {
shared shared
.middle_relay .middle_relay
.desync_dedup .desync_dedup
@ -877,7 +881,9 @@ pub(crate) fn desync_dedup_get_for_testing(
} }
#[cfg(test)] #[cfg(test)]
pub(crate) fn desync_dedup_keys_for_testing(shared: &ProxySharedState) -> std::collections::HashSet<u64> { pub(crate) fn desync_dedup_keys_for_testing(
shared: &ProxySharedState,
) -> std::collections::HashSet<u64> {
shared shared
.middle_relay .middle_relay
.desync_dedup .desync_dedup

View File

@ -8,7 +8,7 @@ use std::time::Instant;
use dashmap::DashMap; use dashmap::DashMap;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::proxy::handshake::{AuthProbeState, AuthProbeSaturationState}; use crate::proxy::handshake::{AuthProbeSaturationState, AuthProbeState};
use crate::proxy::middle_relay::{DesyncDedupRotationState, RelayIdleCandidateRegistry}; use crate::proxy::middle_relay::{DesyncDedupRotationState, RelayIdleCandidateRegistry};
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -136,7 +136,8 @@ impl ProxySharedState {
} }
pub(crate) fn set_conntrack_pressure_active(&self, active: bool) { pub(crate) fn set_conntrack_pressure_active(&self, active: bool) {
self.conntrack_pressure_active.store(active, Ordering::Relaxed); self.conntrack_pressure_active
.store(active, Ordering::Relaxed);
} }
pub(crate) fn conntrack_pressure_active(&self) -> bool { pub(crate) fn conntrack_pressure_active(&self) -> bool {

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
static RACE_TEST_KEY_COUNTER: AtomicUsize = AtomicUsize::new(1_000_000); static RACE_TEST_KEY_COUNTER: AtomicUsize = AtomicUsize::new(1_000_000);

View File

@ -65,9 +65,15 @@ fn adaptive_base_tier_buffers_unchanged() {
fn adaptive_tier1_buffers_within_caps() { fn adaptive_tier1_buffers_within_caps() {
let (c2s, s2c) = direct_copy_buffers_for_tier(AdaptiveTier::Tier1, 65536, 262144); let (c2s, s2c) = direct_copy_buffers_for_tier(AdaptiveTier::Tier1, 65536, 262144);
assert!(c2s > 65536, "Tier1 c2s should exceed Base"); assert!(c2s > 65536, "Tier1 c2s should exceed Base");
assert!(c2s <= 128 * 1024, "Tier1 c2s should not exceed DIRECT_C2S_CAP_BYTES"); assert!(
c2s <= 128 * 1024,
"Tier1 c2s should not exceed DIRECT_C2S_CAP_BYTES"
);
assert!(s2c > 262144, "Tier1 s2c should exceed Base"); assert!(s2c > 262144, "Tier1 s2c should exceed Base");
assert!(s2c <= 512 * 1024, "Tier1 s2c should not exceed DIRECT_S2C_CAP_BYTES"); assert!(
s2c <= 512 * 1024,
"Tier1 s2c should not exceed DIRECT_S2C_CAP_BYTES"
);
} }
#[test] #[test]

View File

@ -19,7 +19,8 @@ fn adversarial_large_state_offsets_escape_first_scan_window() {
((i.wrapping_mul(131)) & 0xff) as u8, ((i.wrapping_mul(131)) & 0xff) as u8,
)); ));
let now = base + Duration::from_nanos(i); let now = base + Duration::from_nanos(i);
let start = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); let start =
auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
if start >= scan_limit { if start >= scan_limit {
saw_offset_outside_first_window = true; saw_offset_outside_first_window = true;
break; break;
@ -48,7 +49,8 @@ fn stress_large_state_offsets_cover_many_scan_windows() {
((i.wrapping_mul(17)) & 0xff) as u8, ((i.wrapping_mul(17)) & 0xff) as u8,
)); ));
let now = base + Duration::from_micros(i); let now = base + Duration::from_micros(i);
let start = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); let start =
auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
covered_windows.insert(start / scan_limit); covered_windows.insert(start / scan_limit);
} }
@ -80,7 +82,8 @@ fn light_fuzz_offset_always_stays_inside_state_len() {
let state_len = ((seed >> 16) as usize % 200_000).saturating_add(1); let state_len = ((seed >> 16) as usize % 200_000).saturating_add(1);
let scan_limit = ((seed >> 40) as usize % 2_048).saturating_add(1); let scan_limit = ((seed >> 40) as usize % 2_048).saturating_add(1);
let now = base + Duration::from_nanos(seed & 0x0fff); let now = base + Duration::from_nanos(seed & 0x0fff);
let start = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); let start =
auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
assert!( assert!(
start < state_len, start < state_len,

View File

@ -87,7 +87,11 @@ fn adversarial_saturation_grace_requires_extra_failures_before_preauth_throttle(
} }
assert!( assert!(
auth_probe_should_apply_preauth_throttle_in(shared.as_ref(), ip, now + Duration::from_millis(1)), auth_probe_should_apply_preauth_throttle_in(
shared.as_ref(),
ip,
now + Duration::from_millis(1)
),
"after grace failures are exhausted, preauth throttle must activate" "after grace failures are exhausted, preauth throttle must activate"
); );
} }
@ -134,7 +138,11 @@ fn light_fuzz_randomized_failures_preserve_cap_and_nonzero_streaks() {
(seed >> 8) as u8, (seed >> 8) as u8,
seed as u8, seed as u8,
)); ));
auth_probe_record_failure_in(shared.as_ref(), ip, now + Duration::from_millis((seed & 0x3f) as u64)); auth_probe_record_failure_in(
shared.as_ref(),
ip,
now + Duration::from_millis((seed & 0x3f) as u64),
);
} }
let state = auth_probe_state_for_testing_in_shared(shared.as_ref()); let state = auth_probe_state_for_testing_in_shared(shared.as_ref());
@ -162,7 +170,11 @@ async fn stress_parallel_failure_flood_keeps_state_hard_capped() {
((i >> 8) & 0xff) as u8, ((i >> 8) & 0xff) as u8,
(i & 0xff) as u8, (i & 0xff) as u8,
)); ));
auth_probe_record_failure_in(shared.as_ref(), ip, start + Duration::from_millis((i % 4) as u64)); auth_probe_record_failure_in(
shared.as_ref(),
ip,
start + Duration::from_millis((i % 4) as u64),
);
} }
})); }));
} }

View File

@ -31,7 +31,8 @@ fn adversarial_large_state_must_allow_start_offset_outside_scan_budget_window()
(i & 0xff) as u8, (i & 0xff) as u8,
)); ));
let now = base + Duration::from_micros(i as u64); let now = base + Duration::from_micros(i as u64);
let start = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); let start =
auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
assert!( assert!(
start < state_len, start < state_len,
"start offset must stay within state length; start={start}, len={state_len}" "start offset must stay within state length; start={start}, len={state_len}"
@ -83,7 +84,8 @@ fn light_fuzz_scan_offset_budget_never_exceeds_effective_window() {
let state_len = ((seed >> 8) as usize % 131_072).saturating_add(1); let state_len = ((seed >> 8) as usize % 131_072).saturating_add(1);
let scan_limit = ((seed >> 32) as usize % 512).saturating_add(1); let scan_limit = ((seed >> 32) as usize % 512).saturating_add(1);
let now = base + Duration::from_nanos(seed & 0xffff); let now = base + Duration::from_nanos(seed & 0xffff);
let start = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); let start =
auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
assert!( assert!(
start < state_len, start < state_len,

View File

@ -36,7 +36,13 @@ fn adversarial_many_ips_same_time_spreads_offsets_without_bias_collapse() {
i as u8, i as u8,
(255 - (i as u8)), (255 - (i as u8)),
)); ));
uniq.insert(auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, 65_536, 16)); uniq.insert(auth_probe_scan_start_offset_in(
shared.as_ref(),
ip,
now,
65_536,
16,
));
} }
assert!( assert!(
@ -63,7 +69,11 @@ async fn stress_parallel_failure_churn_under_saturation_remains_capped_and_live(
((i >> 8) & 0xff) as u8, ((i >> 8) & 0xff) as u8,
(i & 0xff) as u8, (i & 0xff) as u8,
)); ));
auth_probe_record_failure_in(shared.as_ref(), ip, start + Duration::from_micros((i % 128) as u64)); auth_probe_record_failure_in(
shared.as_ref(),
ip,
start + Duration::from_micros((i % 128) as u64),
);
} }
})); }));
} }
@ -73,12 +83,17 @@ async fn stress_parallel_failure_churn_under_saturation_remains_capped_and_live(
} }
assert!( assert!(
auth_probe_state_for_testing_in_shared(shared.as_ref()).len() <= AUTH_PROBE_TRACK_MAX_ENTRIES, auth_probe_state_for_testing_in_shared(shared.as_ref()).len()
<= AUTH_PROBE_TRACK_MAX_ENTRIES,
"state must remain hard-capped under parallel saturation churn" "state must remain hard-capped under parallel saturation churn"
); );
let probe = IpAddr::V4(Ipv4Addr::new(10, 4, 1, 1)); let probe = IpAddr::V4(Ipv4Addr::new(10, 4, 1, 1));
let _ = auth_probe_should_apply_preauth_throttle_in(shared.as_ref(), probe, start + Duration::from_millis(1)); let _ = auth_probe_should_apply_preauth_throttle_in(
shared.as_ref(),
probe,
start + Duration::from_millis(1),
);
} }
#[test] #[test]
@ -102,7 +117,8 @@ fn light_fuzz_scan_offset_stays_within_window_for_randomized_inputs() {
let scan_limit = ((seed >> 40) as usize % 1024).saturating_add(1); let scan_limit = ((seed >> 40) as usize % 1024).saturating_add(1);
let now = base + Duration::from_nanos(seed & 0x1fff); let now = base + Duration::from_nanos(seed & 0x1fff);
let offset = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit); let offset =
auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
assert!( assert!(
offset < state_len, offset < state_len,
"scan offset must always remain inside state length" "scan offset must always remain inside state length"

View File

@ -116,8 +116,14 @@ async fn handshake_baseline_auth_probe_streak_increments_per_ip() {
) )
.await; .await;
assert!(matches!(res, HandshakeResult::BadClient { .. })); assert!(matches!(res, HandshakeResult::BadClient { .. }));
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()), Some(expected)); assert_eq!(
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), untouched_ip), None); auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()),
Some(expected)
);
assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), untouched_ip),
None
);
} }
} }
@ -149,7 +155,8 @@ fn handshake_baseline_repeated_probes_streak_monotonic() {
for _ in 0..100 { for _ in 0..100 {
auth_probe_record_failure_in(shared.as_ref(), ip, now); auth_probe_record_failure_in(shared.as_ref(), ip, now);
let current = auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), ip).unwrap_or(0); let current =
auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), ip).unwrap_or(0);
assert!(current >= prev, "streak must be monotonic"); assert!(current >= prev, "streak must be monotonic");
prev = current; prev = current;
} }
@ -173,8 +180,16 @@ fn handshake_baseline_throttled_ip_incurs_backoff_delay() {
let before_expiry = now + delay.saturating_sub(Duration::from_millis(1)); let before_expiry = now + delay.saturating_sub(Duration::from_millis(1));
let after_expiry = now + delay + Duration::from_millis(1); let after_expiry = now + delay + Duration::from_millis(1);
assert!(auth_probe_is_throttled_in(shared.as_ref(), ip, before_expiry)); assert!(auth_probe_is_throttled_in(
assert!(!auth_probe_is_throttled_in(shared.as_ref(), ip, after_expiry)); shared.as_ref(),
ip,
before_expiry
));
assert!(!auth_probe_is_throttled_in(
shared.as_ref(),
ip,
after_expiry
));
} }
#[tokio::test] #[tokio::test]
@ -212,7 +227,10 @@ async fn handshake_baseline_malformed_probe_frames_fail_closed_to_masking() {
.expect("malformed probe handling must complete in bounded time"); .expect("malformed probe handling must complete in bounded time");
assert!( assert!(
matches!(res, HandshakeResult::BadClient { .. } | HandshakeResult::Error(_)), matches!(
res,
HandshakeResult::BadClient { .. } | HandshakeResult::Error(_)
),
"malformed probe must fail closed" "malformed probe must fail closed"
); );
} }

View File

@ -332,7 +332,13 @@ async fn invalid_secret_warning_lock_contention_and_bound() {
b.wait().await; b.wait().await;
for i in 0..iterations_per_task { for i in 0..iterations_per_task {
let user_name = format!("contention_user_{}_{}", t, i); let user_name = format!("contention_user_{}_{}", t, i);
warn_invalid_secret_once_in(shared.as_ref(), &user_name, "invalid_hex", ACCESS_SECRET_BYTES, None); warn_invalid_secret_once_in(
shared.as_ref(),
&user_name,
"invalid_hex",
ACCESS_SECRET_BYTES,
None,
);
} }
})); }));
} }
@ -629,7 +635,8 @@ fn auth_probe_saturation_note_resets_retention_window() {
// This call may return false if backoff has elapsed, but it must not clear // This call may return false if backoff has elapsed, but it must not clear
// the saturation state because `later` refreshed last_seen. // the saturation state because `later` refreshed last_seen.
let _ = auth_probe_saturation_is_throttled_at_for_testing_in_shared(shared.as_ref(), check_time); let _ =
auth_probe_saturation_is_throttled_at_for_testing_in_shared(shared.as_ref(), check_time);
let guard = auth_probe_saturation_state_lock_for_testing_in_shared(shared.as_ref()); let guard = auth_probe_saturation_state_lock_for_testing_in_shared(shared.as_ref());
assert!( assert!(
guard.is_some(), guard.is_some(),

View File

@ -206,7 +206,12 @@ fn auth_probe_eviction_identical_timestamps_keeps_map_bounded() {
} }
let new_ip = IpAddr::V4(Ipv4Addr::new(192, 168, 21, 21)); let new_ip = IpAddr::V4(Ipv4Addr::new(192, 168, 21, 21));
auth_probe_record_failure_with_state_in(shared.as_ref(), state, new_ip, same + Duration::from_millis(1)); auth_probe_record_failure_with_state_in(
shared.as_ref(),
state,
new_ip,
same + Duration::from_millis(1),
);
assert_eq!(state.len(), AUTH_PROBE_TRACK_MAX_ENTRIES); assert_eq!(state.len(), AUTH_PROBE_TRACK_MAX_ENTRIES);
assert!(state.contains_key(&new_ip)); assert!(state.contains_key(&new_ip));
@ -325,7 +330,8 @@ async fn saturation_grace_exhaustion_under_concurrency_keeps_peer_throttled() {
final_state.fail_streak final_state.fail_streak
>= AUTH_PROBE_BACKOFF_START_FAILS + AUTH_PROBE_SATURATION_GRACE_FAILS >= AUTH_PROBE_BACKOFF_START_FAILS + AUTH_PROBE_SATURATION_GRACE_FAILS
); );
assert!(auth_probe_should_apply_preauth_throttle_in(shared.as_ref(), assert!(auth_probe_should_apply_preauth_throttle_in(
shared.as_ref(),
peer_ip, peer_ip,
Instant::now() Instant::now()
)); ));

View File

@ -54,7 +54,9 @@ fn clear_auth_probe_state_clears_saturation_even_if_poisoned() {
poison_saturation_mutex(shared.as_ref()); poison_saturation_mutex(shared.as_ref());
auth_probe_note_saturation_in(shared.as_ref(), Instant::now()); auth_probe_note_saturation_in(shared.as_ref(), Instant::now());
assert!(auth_probe_saturation_is_throttled_for_testing_in_shared(shared.as_ref())); assert!(auth_probe_saturation_is_throttled_for_testing_in_shared(
shared.as_ref()
));
clear_auth_probe_state_for_testing_in_shared(shared.as_ref()); clear_auth_probe_state_for_testing_in_shared(shared.as_ref());
assert!( assert!(

View File

@ -1427,7 +1427,13 @@ fn invalid_secret_warning_cache_is_bounded() {
for idx in 0..(WARNED_SECRET_MAX_ENTRIES + 32) { for idx in 0..(WARNED_SECRET_MAX_ENTRIES + 32) {
let user = format!("warned_user_{idx}"); let user = format!("warned_user_{idx}");
warn_invalid_secret_once_in(shared.as_ref(), &user, "invalid_length", ACCESS_SECRET_BYTES, Some(idx)); warn_invalid_secret_once_in(
shared.as_ref(),
&user,
"invalid_length",
ACCESS_SECRET_BYTES,
Some(idx),
);
} }
let warned = warned_secrets_for_testing_in_shared(shared.as_ref()); let warned = warned_secrets_for_testing_in_shared(shared.as_ref());
@ -1640,11 +1646,15 @@ fn unknown_sni_warn_cooldown_first_event_is_warn_and_repeated_events_are_info_un
"first unknown SNI event must be eligible for WARN emission" "first unknown SNI event must be eligible for WARN emission"
); );
assert!( assert!(
!should_emit_unknown_sni_warn_for_testing_in_shared(shared.as_ref(), now + Duration::from_secs(1)), !should_emit_unknown_sni_warn_for_testing_in_shared(
shared.as_ref(),
now + Duration::from_secs(1)
),
"events inside cooldown window must be demoted from WARN to INFO" "events inside cooldown window must be demoted from WARN to INFO"
); );
assert!( assert!(
should_emit_unknown_sni_warn_for_testing_in_shared(shared.as_ref(), should_emit_unknown_sni_warn_for_testing_in_shared(
shared.as_ref(),
now + Duration::from_secs(UNKNOWN_SNI_WARN_COOLDOWN_SECS) now + Duration::from_secs(UNKNOWN_SNI_WARN_COOLDOWN_SECS)
), ),
"once cooldown expires, next unknown SNI event must be WARN-eligible again" "once cooldown expires, next unknown SNI event must be WARN-eligible again"
@ -1725,7 +1735,12 @@ fn auth_probe_over_cap_churn_still_tracks_newcomer_after_round_limit() {
} }
let newcomer = IpAddr::V4(Ipv4Addr::new(203, 0, 114, 77)); let newcomer = IpAddr::V4(Ipv4Addr::new(203, 0, 114, 77));
auth_probe_record_failure_with_state_in(shared.as_ref(), &state, newcomer, now + Duration::from_secs(1)); auth_probe_record_failure_with_state_in(
shared.as_ref(),
&state,
newcomer,
now + Duration::from_secs(1),
);
assert!( assert!(
state.get(&newcomer).is_some(), state.get(&newcomer).is_some(),
@ -1931,8 +1946,18 @@ fn auth_probe_ipv6_is_bucketed_by_prefix_64() {
let ip_a = IpAddr::V6("2001:db8:abcd:1234:1:2:3:4".parse().unwrap()); let ip_a = IpAddr::V6("2001:db8:abcd:1234:1:2:3:4".parse().unwrap());
let ip_b = IpAddr::V6("2001:db8:abcd:1234:ffff:eeee:dddd:cccc".parse().unwrap()); let ip_b = IpAddr::V6("2001:db8:abcd:1234:ffff:eeee:dddd:cccc".parse().unwrap());
auth_probe_record_failure_with_state_in(shared.as_ref(), &state, normalize_auth_probe_ip(ip_a), now); auth_probe_record_failure_with_state_in(
auth_probe_record_failure_with_state_in(shared.as_ref(), &state, normalize_auth_probe_ip(ip_b), now); shared.as_ref(),
&state,
normalize_auth_probe_ip(ip_a),
now,
);
auth_probe_record_failure_with_state_in(
shared.as_ref(),
&state,
normalize_auth_probe_ip(ip_b),
now,
);
let normalized = normalize_auth_probe_ip(ip_a); let normalized = normalize_auth_probe_ip(ip_a);
assert_eq!( assert_eq!(
@ -1956,8 +1981,18 @@ fn auth_probe_ipv6_different_prefixes_use_distinct_buckets() {
let ip_a = IpAddr::V6("2001:db8:1111:2222:1:2:3:4".parse().unwrap()); let ip_a = IpAddr::V6("2001:db8:1111:2222:1:2:3:4".parse().unwrap());
let ip_b = IpAddr::V6("2001:db8:1111:3333:1:2:3:4".parse().unwrap()); let ip_b = IpAddr::V6("2001:db8:1111:3333:1:2:3:4".parse().unwrap());
auth_probe_record_failure_with_state_in(shared.as_ref(), &state, normalize_auth_probe_ip(ip_a), now); auth_probe_record_failure_with_state_in(
auth_probe_record_failure_with_state_in(shared.as_ref(), &state, normalize_auth_probe_ip(ip_b), now); shared.as_ref(),
&state,
normalize_auth_probe_ip(ip_a),
now,
);
auth_probe_record_failure_with_state_in(
shared.as_ref(),
&state,
normalize_auth_probe_ip(ip_b),
now,
);
assert_eq!( assert_eq!(
state.len(), state.len(),
@ -2070,7 +2105,12 @@ fn auth_probe_round_limited_overcap_eviction_marks_saturation_and_keeps_newcomer
} }
let newcomer = IpAddr::V4(Ipv4Addr::new(203, 0, 113, 40)); let newcomer = IpAddr::V4(Ipv4Addr::new(203, 0, 113, 40));
auth_probe_record_failure_with_state_in(shared.as_ref(), &state, newcomer, now + Duration::from_millis(1)); auth_probe_record_failure_with_state_in(
shared.as_ref(),
&state,
newcomer,
now + Duration::from_millis(1),
);
assert!( assert!(
state.get(&newcomer).is_some(), state.get(&newcomer).is_some(),
@ -2081,7 +2121,10 @@ fn auth_probe_round_limited_overcap_eviction_marks_saturation_and_keeps_newcomer
"high fail-streak sentinel must survive round-limited eviction" "high fail-streak sentinel must survive round-limited eviction"
); );
assert!( assert!(
auth_probe_saturation_is_throttled_at_for_testing_in_shared(shared.as_ref(), now + Duration::from_millis(1)), auth_probe_saturation_is_throttled_at_for_testing_in_shared(
shared.as_ref(),
now + Duration::from_millis(1)
),
"round-limited over-cap path must activate saturation throttle marker" "round-limited over-cap path must activate saturation throttle marker"
); );
} }
@ -2163,7 +2206,8 @@ fn stress_auth_probe_overcap_churn_does_not_starve_high_threat_sentinel_bucket()
((step >> 8) & 0xff) as u8, ((step >> 8) & 0xff) as u8,
(step & 0xff) as u8, (step & 0xff) as u8,
)); ));
auth_probe_record_failure_with_state_in(shared.as_ref(), auth_probe_record_failure_with_state_in(
shared.as_ref(),
&state, &state,
newcomer, newcomer,
base_now + Duration::from_millis(step as u64 + 1), base_now + Duration::from_millis(step as u64 + 1),
@ -2226,7 +2270,8 @@ fn light_fuzz_auth_probe_overcap_eviction_prefers_less_threatening_entries() {
((round >> 8) & 0xff) as u8, ((round >> 8) & 0xff) as u8,
(round & 0xff) as u8, (round & 0xff) as u8,
)); ));
auth_probe_record_failure_with_state_in(shared.as_ref(), auth_probe_record_failure_with_state_in(
shared.as_ref(),
&state, &state,
newcomer, newcomer,
now + Duration::from_millis(round as u64 + 1), now + Duration::from_millis(round as u64 + 1),
@ -3105,7 +3150,10 @@ async fn saturation_grace_boundary_still_admits_valid_tls_before_exhaustion() {
matches!(result, HandshakeResult::Success(_)), matches!(result, HandshakeResult::Success(_)),
"valid TLS should still pass while peer remains within saturation grace budget" "valid TLS should still pass while peer remains within saturation grace budget"
); );
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()), None); assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()),
None
);
} }
#[tokio::test] #[tokio::test]
@ -3171,7 +3219,10 @@ async fn saturation_grace_exhaustion_blocks_valid_tls_until_backoff_expires() {
matches!(allowed, HandshakeResult::Success(_)), matches!(allowed, HandshakeResult::Success(_)),
"valid TLS should recover after peer-specific pre-auth backoff has elapsed" "valid TLS should recover after peer-specific pre-auth backoff has elapsed"
); );
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()), None); assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()),
None
);
} }
#[tokio::test] #[tokio::test]

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
use rand::rngs::StdRng;
use rand::SeedableRng; use rand::SeedableRng;
use rand::rngs::StdRng;
fn seeded_rng(seed: u64) -> StdRng { fn seeded_rng(seed: u64) -> StdRng {
StdRng::seed_from_u64(seed) StdRng::seed_from_u64(seed)
@ -57,7 +57,10 @@ fn masking_lognormal_degenerate_floor_eq_ceiling_returns_floor() {
let mut rng = seeded_rng(99); let mut rng = seeded_rng(99);
for _ in 0..100 { for _ in 0..100 {
let val = sample_lognormal_percentile_bounded(1000, 1000, &mut rng); let val = sample_lognormal_percentile_bounded(1000, 1000, &mut rng);
assert_eq!(val, 1000, "floor == ceiling must always return exactly that value"); assert_eq!(
val, 1000,
"floor == ceiling must always return exactly that value"
);
} }
} }

View File

@ -7,13 +7,22 @@ fn middle_relay_baseline_public_api_idle_roundtrip_contract() {
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 7001)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 7001));
assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(7001)); assert_eq!(
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(7001)
);
clear_relay_idle_candidate_for_testing(shared.as_ref(), 7001); clear_relay_idle_candidate_for_testing(shared.as_ref(), 7001);
assert_ne!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(7001)); assert_ne!(
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(7001)
);
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 7001)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 7001));
assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(7001)); assert_eq!(
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(7001)
);
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
} }
@ -26,7 +35,12 @@ fn middle_relay_baseline_public_api_desync_window_contract() {
let key = 0xDEAD_BEEF_0000_0001u64; let key = 0xDEAD_BEEF_0000_0001u64;
let t0 = Instant::now(); let t0 = Instant::now();
assert!(should_emit_full_desync_for_testing(shared.as_ref(), key, false, t0)); assert!(should_emit_full_desync_for_testing(
shared.as_ref(),
key,
false,
t0
));
assert!(!should_emit_full_desync_for_testing( assert!(!should_emit_full_desync_for_testing(
shared.as_ref(), shared.as_ref(),
key, key,
@ -35,7 +49,12 @@ fn middle_relay_baseline_public_api_desync_window_contract() {
)); ));
let t1 = t0 + DESYNC_DEDUP_WINDOW + Duration::from_millis(10); let t1 = t0 + DESYNC_DEDUP_WINDOW + Duration::from_millis(10);
assert!(should_emit_full_desync_for_testing(shared.as_ref(), key, false, t1)); assert!(should_emit_full_desync_for_testing(
shared.as_ref(),
key,
false,
t1
));
clear_desync_dedup_for_testing_in_shared(shared.as_ref()); clear_desync_dedup_for_testing_in_shared(shared.as_ref());
} }

View File

@ -13,7 +13,12 @@ fn desync_all_full_bypass_does_not_initialize_or_grow_dedup_cache() {
for i in 0..20_000u64 { for i in 0..20_000u64 {
assert!( assert!(
should_emit_full_desync_for_testing(shared.as_ref(), 0xD35E_D000_0000_0000u64 ^ i, true, now), should_emit_full_desync_for_testing(
shared.as_ref(),
0xD35E_D000_0000_0000u64 ^ i,
true,
now
),
"desync_all_full path must always emit" "desync_all_full path must always emit"
); );
} }
@ -37,7 +42,12 @@ fn desync_all_full_bypass_keeps_existing_dedup_entries_unchanged() {
let now = Instant::now(); let now = Instant::now();
for i in 0..2048u64 { for i in 0..2048u64 {
assert!( assert!(
should_emit_full_desync_for_testing(shared.as_ref(), 0xF011_F000_0000_0000u64 ^ i, true, now), should_emit_full_desync_for_testing(
shared.as_ref(),
0xF011_F000_0000_0000u64 ^ i,
true,
now
),
"desync_all_full must bypass suppression and dedup refresh" "desync_all_full must bypass suppression and dedup refresh"
); );
} }
@ -68,7 +78,8 @@ fn edge_all_full_burst_does_not_poison_later_false_path_tracking() {
let now = Instant::now(); let now = Instant::now();
for i in 0..8192u64 { for i in 0..8192u64 {
assert!(should_emit_full_desync_for_testing(shared.as_ref(), assert!(should_emit_full_desync_for_testing(
shared.as_ref(),
0xABCD_0000_0000_0000 ^ i, 0xABCD_0000_0000_0000 ^ i,
true, true,
now now
@ -102,7 +113,12 @@ fn adversarial_mixed_sequence_true_steps_never_change_cache_len() {
let flag_all_full = (seed & 0x1) == 1; let flag_all_full = (seed & 0x1) == 1;
let key = 0x7000_0000_0000_0000u64 ^ i ^ seed; let key = 0x7000_0000_0000_0000u64 ^ i ^ seed;
let before = desync_dedup_len_for_testing(shared.as_ref()); let before = desync_dedup_len_for_testing(shared.as_ref());
let _ = should_emit_full_desync_for_testing(shared.as_ref(), key, flag_all_full, Instant::now()); let _ = should_emit_full_desync_for_testing(
shared.as_ref(),
key,
flag_all_full,
Instant::now(),
);
let after = desync_dedup_len_for_testing(shared.as_ref()); let after = desync_dedup_len_for_testing(shared.as_ref());
if flag_all_full { if flag_all_full {
@ -124,7 +140,12 @@ fn light_fuzz_all_full_mode_always_emits_and_stays_bounded() {
seed ^= seed >> 9; seed ^= seed >> 9;
seed ^= seed << 8; seed ^= seed << 8;
let key = seed ^ 0x55AA_55AA_55AA_55AAu64; let key = seed ^ 0x55AA_55AA_55AA_55AAu64;
assert!(should_emit_full_desync_for_testing(shared.as_ref(), key, true, Instant::now())); assert!(should_emit_full_desync_for_testing(
shared.as_ref(),
key,
true,
Instant::now()
));
} }
let after = desync_dedup_len_for_testing(shared.as_ref()); let after = desync_dedup_len_for_testing(shared.as_ref());

View File

@ -366,23 +366,42 @@ fn pressure_evicts_oldest_idle_candidate_with_deterministic_ordering() {
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 10)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 10));
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 11)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 11));
assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(10)); assert_eq!(
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(10)
);
note_relay_pressure_event_for_testing(shared.as_ref()); note_relay_pressure_event_for_testing(shared.as_ref());
let mut seen_for_newer = 0u64; let mut seen_for_newer = 0u64;
assert!( assert!(
!maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 11, &mut seen_for_newer, &stats), !maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
11,
&mut seen_for_newer,
&stats
),
"newer idle candidate must not be evicted while older candidate exists" "newer idle candidate must not be evicted while older candidate exists"
); );
assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(10)); assert_eq!(
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(10)
);
let mut seen_for_oldest = 0u64; let mut seen_for_oldest = 0u64;
assert!( assert!(
maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 10, &mut seen_for_oldest, &stats), maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
10,
&mut seen_for_oldest,
&stats
),
"oldest idle candidate must be evicted first under pressure" "oldest idle candidate must be evicted first under pressure"
); );
assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(11)); assert_eq!(
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(11)
);
assert_eq!(stats.get_relay_pressure_evict_total(), 1); assert_eq!(stats.get_relay_pressure_evict_total(), 1);
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
@ -402,7 +421,10 @@ fn pressure_does_not_evict_without_new_pressure_signal() {
"without new pressure signal, candidate must stay" "without new pressure signal, candidate must stay"
); );
assert_eq!(stats.get_relay_pressure_evict_total(), 0); assert_eq!(stats.get_relay_pressure_evict_total(), 0);
assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(21)); assert_eq!(
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(21)
);
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
} }
@ -415,7 +437,10 @@ fn stress_pressure_eviction_preserves_fifo_across_many_candidates() {
let mut seen_per_conn = std::collections::HashMap::new(); let mut seen_per_conn = std::collections::HashMap::new();
for conn_id in 1000u64..1064u64 { for conn_id in 1000u64..1064u64 {
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), conn_id)); assert!(mark_relay_idle_candidate_for_testing(
shared.as_ref(),
conn_id
));
seen_per_conn.insert(conn_id, 0u64); seen_per_conn.insert(conn_id, 0u64);
} }
@ -426,7 +451,12 @@ fn stress_pressure_eviction_preserves_fifo_across_many_candidates() {
.get(&expected) .get(&expected)
.expect("per-conn pressure cursor must exist"); .expect("per-conn pressure cursor must exist");
assert!( assert!(
maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), expected, &mut seen, &stats), maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
expected,
&mut seen,
&stats
),
"expected conn_id {expected} must be evicted next by deterministic FIFO ordering" "expected conn_id {expected} must be evicted next by deterministic FIFO ordering"
); );
seen_per_conn.insert(expected, seen); seen_per_conn.insert(expected, seen);
@ -436,7 +466,10 @@ fn stress_pressure_eviction_preserves_fifo_across_many_candidates() {
} else { } else {
Some(expected + 1) Some(expected + 1)
}; };
assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), next); assert_eq!(
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
next
);
} }
assert_eq!(stats.get_relay_pressure_evict_total(), 64); assert_eq!(stats.get_relay_pressure_evict_total(), 64);
@ -460,9 +493,24 @@ fn blackhat_single_pressure_event_must_not_evict_more_than_one_candidate() {
// Single pressure event should authorize at most one eviction globally. // Single pressure event should authorize at most one eviction globally.
note_relay_pressure_event_for_testing(shared.as_ref()); note_relay_pressure_event_for_testing(shared.as_ref());
let evicted_301 = maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 301, &mut seen_301, &stats); let evicted_301 = maybe_evict_idle_candidate_on_pressure_for_testing(
let evicted_302 = maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 302, &mut seen_302, &stats); shared.as_ref(),
let evicted_303 = maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 303, &mut seen_303, &stats); 301,
&mut seen_301,
&stats,
);
let evicted_302 = maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
302,
&mut seen_302,
&stats,
);
let evicted_303 = maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
303,
&mut seen_303,
&stats,
);
let evicted_total = [evicted_301, evicted_302, evicted_303] let evicted_total = [evicted_301, evicted_302, evicted_303]
.iter() .iter()
@ -492,12 +540,22 @@ fn blackhat_pressure_counter_must_track_global_budget_not_per_session_cursor() {
note_relay_pressure_event_for_testing(shared.as_ref()); note_relay_pressure_event_for_testing(shared.as_ref());
assert!( assert!(
maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 401, &mut seen_oldest, &stats), maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
401,
&mut seen_oldest,
&stats
),
"oldest candidate must consume pressure budget first" "oldest candidate must consume pressure budget first"
); );
assert!( assert!(
!maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 402, &mut seen_next, &stats), !maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
402,
&mut seen_next,
&stats
),
"next candidate must not consume the same pressure budget" "next candidate must not consume the same pressure budget"
); );
@ -522,7 +580,12 @@ fn blackhat_stale_pressure_before_idle_mark_must_not_trigger_eviction() {
let mut seen = 0u64; let mut seen = 0u64;
assert!( assert!(
!maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 501, &mut seen, &stats), !maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
501,
&mut seen,
&stats
),
"stale pressure (before soft-idle mark) must not evict newly marked candidate" "stale pressure (before soft-idle mark) must not evict newly marked candidate"
); );
@ -545,9 +608,24 @@ fn blackhat_stale_pressure_must_not_evict_any_of_newly_marked_batch() {
let mut seen_513 = 0u64; let mut seen_513 = 0u64;
let evicted = [ let evicted = [
maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 511, &mut seen_511, &stats), maybe_evict_idle_candidate_on_pressure_for_testing(
maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 512, &mut seen_512, &stats), shared.as_ref(),
maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 513, &mut seen_513, &stats), 511,
&mut seen_511,
&stats,
),
maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
512,
&mut seen_512,
&stats,
),
maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
513,
&mut seen_513,
&stats,
),
] ]
.iter() .iter()
.filter(|value| **value) .filter(|value| **value)
@ -572,7 +650,12 @@ fn blackhat_stale_pressure_seen_without_candidates_must_be_globally_invalidated(
// Session A observed pressure while there were no candidates. // Session A observed pressure while there were no candidates.
let mut seen_a = 0u64; let mut seen_a = 0u64;
assert!( assert!(
!maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 999_001, &mut seen_a, &stats), !maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
999_001,
&mut seen_a,
&stats
),
"no candidate existed, so no eviction is possible" "no candidate existed, so no eviction is possible"
); );
@ -580,7 +663,12 @@ fn blackhat_stale_pressure_seen_without_candidates_must_be_globally_invalidated(
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 521)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 521));
let mut seen_b = 0u64; let mut seen_b = 0u64;
assert!( assert!(
!maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 521, &mut seen_b, &stats), !maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
521,
&mut seen_b,
&stats
),
"once pressure is observed with empty candidate set, it must not be replayed later" "once pressure is observed with empty candidate set, it must not be replayed later"
); );
@ -600,7 +688,12 @@ fn blackhat_stale_pressure_must_not_survive_candidate_churn() {
let mut seen = 0u64; let mut seen = 0u64;
assert!( assert!(
!maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 532, &mut seen, &stats), !maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
532,
&mut seen,
&stats
),
"stale pressure must not survive clear+remark churn cycles" "stale pressure must not survive clear+remark churn cycles"
); );
@ -663,7 +756,10 @@ async fn integration_race_single_pressure_event_allows_at_most_one_eviction_unde
let mut seen_per_session = vec![0u64; sessions]; let mut seen_per_session = vec![0u64; sessions];
for conn_id in &conn_ids { for conn_id in &conn_ids {
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), *conn_id)); assert!(mark_relay_idle_candidate_for_testing(
shared.as_ref(),
*conn_id
));
} }
for round in 0..rounds { for round in 0..rounds {
@ -676,8 +772,12 @@ async fn integration_race_single_pressure_event_allows_at_most_one_eviction_unde
let stats = stats.clone(); let stats = stats.clone();
let shared = shared.clone(); let shared = shared.clone();
joins.push(tokio::spawn(async move { joins.push(tokio::spawn(async move {
let evicted = let evicted = maybe_evict_idle_candidate_on_pressure_for_testing(
maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), conn_id, &mut seen, stats.as_ref()); shared.as_ref(),
conn_id,
&mut seen,
stats.as_ref(),
);
(idx, conn_id, seen, evicted) (idx, conn_id, seen, evicted)
})); }));
} }
@ -729,7 +829,10 @@ async fn integration_race_burst_pressure_with_churn_preserves_empty_set_invalida
let mut seen_per_session = vec![0u64; sessions]; let mut seen_per_session = vec![0u64; sessions];
for conn_id in &conn_ids { for conn_id in &conn_ids {
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), *conn_id)); assert!(mark_relay_idle_candidate_for_testing(
shared.as_ref(),
*conn_id
));
} }
let mut expected_total_evictions = 0u64; let mut expected_total_evictions = 0u64;
@ -751,8 +854,12 @@ async fn integration_race_burst_pressure_with_churn_preserves_empty_set_invalida
let stats = stats.clone(); let stats = stats.clone();
let shared = shared.clone(); let shared = shared.clone();
joins.push(tokio::spawn(async move { joins.push(tokio::spawn(async move {
let evicted = let evicted = maybe_evict_idle_candidate_on_pressure_for_testing(
maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), conn_id, &mut seen, stats.as_ref()); shared.as_ref(),
conn_id,
&mut seen,
stats.as_ref(),
);
(idx, conn_id, seen, evicted) (idx, conn_id, seen, evicted)
})); }));
} }
@ -774,7 +881,10 @@ async fn integration_race_burst_pressure_with_churn_preserves_empty_set_invalida
"round {round}: empty candidate phase must not allow stale-pressure eviction" "round {round}: empty candidate phase must not allow stale-pressure eviction"
); );
for conn_id in &conn_ids { for conn_id in &conn_ids {
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), *conn_id)); assert!(mark_relay_idle_candidate_for_testing(
shared.as_ref(),
*conn_id
));
} }
} else { } else {
assert!( assert!(
@ -783,7 +893,10 @@ async fn integration_race_burst_pressure_with_churn_preserves_empty_set_invalida
); );
if let Some(conn_id) = evicted_conn { if let Some(conn_id) = evicted_conn {
expected_total_evictions = expected_total_evictions.saturating_add(1); expected_total_evictions = expected_total_evictions.saturating_add(1);
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), conn_id)); assert!(mark_relay_idle_candidate_for_testing(
shared.as_ref(),
conn_id
));
} }
} }
} }

View File

@ -25,7 +25,10 @@ fn blackhat_registry_poison_recovers_with_fail_closed_reset_and_pressure_account
// Helper lock must recover from poison, reset stale state, and continue. // Helper lock must recover from poison, reset stale state, and continue.
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 42)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 42));
assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(42)); assert_eq!(
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(42)
);
let before = relay_pressure_event_seq_for_testing(shared.as_ref()); let before = relay_pressure_event_seq_for_testing(shared.as_ref());
note_relay_pressure_event_for_testing(shared.as_ref()); note_relay_pressure_event_for_testing(shared.as_ref());
@ -54,11 +57,17 @@ fn clear_state_helper_must_reset_poisoned_registry_for_deterministic_fifo_tests(
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), None); assert_eq!(
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
None
);
assert_eq!(relay_pressure_event_seq_for_testing(shared.as_ref()), 0); assert_eq!(relay_pressure_event_seq_for_testing(shared.as_ref()), 0);
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 7)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 7));
assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(7)); assert_eq!(
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(7)
);
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
} }

View File

@ -1,10 +1,10 @@
use crate::proxy::client::handle_client_stream_with_shared;
use crate::proxy::handshake::{ use crate::proxy::handshake::{
auth_probe_fail_streak_for_testing_in_shared, auth_probe_is_throttled_for_testing_in_shared, auth_probe_fail_streak_for_testing_in_shared, auth_probe_is_throttled_for_testing_in_shared,
auth_probe_record_failure_for_testing, clear_auth_probe_state_for_testing_in_shared, auth_probe_record_failure_for_testing, clear_auth_probe_state_for_testing_in_shared,
clear_unknown_sni_warn_state_for_testing_in_shared, clear_warned_secrets_for_testing_in_shared, clear_unknown_sni_warn_state_for_testing_in_shared, clear_warned_secrets_for_testing_in_shared,
should_emit_unknown_sni_warn_for_testing_in_shared, warned_secrets_for_testing_in_shared, should_emit_unknown_sni_warn_for_testing_in_shared, warned_secrets_for_testing_in_shared,
}; };
use crate::proxy::client::handle_client_stream_with_shared;
use crate::proxy::middle_relay::{ use crate::proxy::middle_relay::{
clear_desync_dedup_for_testing_in_shared, clear_relay_idle_candidate_for_testing, clear_desync_dedup_for_testing_in_shared, clear_relay_idle_candidate_for_testing,
clear_relay_idle_pressure_state_for_testing_in_shared, mark_relay_idle_candidate_for_testing, clear_relay_idle_pressure_state_for_testing_in_shared, mark_relay_idle_candidate_for_testing,
@ -81,7 +81,10 @@ fn new_client_harness() -> ClientHarness {
} }
} }
async fn drive_invalid_mtproto_handshake(shared: Arc<ProxySharedState>, peer: std::net::SocketAddr) { async fn drive_invalid_mtproto_handshake(
shared: Arc<ProxySharedState>,
peer: std::net::SocketAddr,
) {
let harness = new_client_harness(); let harness = new_client_harness();
let (server_side, mut client_side) = duplex(4096); let (server_side, mut client_side) = duplex(4096);
let invalid = [0u8; 64]; let invalid = [0u8; 64];
@ -108,7 +111,10 @@ async fn drive_invalid_mtproto_handshake(shared: Arc<ProxySharedState>, peer: st
.write_all(&invalid) .write_all(&invalid)
.await .await
.expect("failed to write invalid handshake"); .expect("failed to write invalid handshake");
client_side.shutdown().await.expect("failed to shutdown client"); client_side
.shutdown()
.await
.expect("failed to shutdown client");
let _ = tokio::time::timeout(Duration::from_secs(3), task) let _ = tokio::time::timeout(Duration::from_secs(3), task)
.await .await
.expect("client task timed out") .expect("client task timed out")
@ -128,7 +134,10 @@ fn proxy_shared_state_two_instances_do_not_share_auth_probe_state() {
auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip), auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip),
Some(1) Some(1)
); );
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip), None); assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip),
None
);
} }
#[test] #[test]
@ -139,8 +148,18 @@ fn proxy_shared_state_two_instances_do_not_share_desync_dedup() {
let now = Instant::now(); let now = Instant::now();
let key = 0xA5A5_u64; let key = 0xA5A5_u64;
assert!(should_emit_full_desync_for_testing(a.as_ref(), key, false, now)); assert!(should_emit_full_desync_for_testing(
assert!(should_emit_full_desync_for_testing(b.as_ref(), key, false, now)); a.as_ref(),
key,
false,
now
));
assert!(should_emit_full_desync_for_testing(
b.as_ref(),
key,
false,
now
));
} }
#[test] #[test]
@ -150,7 +169,10 @@ fn proxy_shared_state_two_instances_do_not_share_idle_registry() {
clear_relay_idle_pressure_state_for_testing_in_shared(a.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(a.as_ref());
assert!(mark_relay_idle_candidate_for_testing(a.as_ref(), 111)); assert!(mark_relay_idle_candidate_for_testing(a.as_ref(), 111));
assert_eq!(oldest_relay_idle_candidate_for_testing(a.as_ref()), Some(111)); assert_eq!(
oldest_relay_idle_candidate_for_testing(a.as_ref()),
Some(111)
);
assert_eq!(oldest_relay_idle_candidate_for_testing(b.as_ref()), None); assert_eq!(oldest_relay_idle_candidate_for_testing(b.as_ref()), None);
} }
@ -168,7 +190,10 @@ fn proxy_shared_state_reset_in_one_instance_does_not_affect_another() {
auth_probe_record_failure_for_testing(b.as_ref(), ip_b, now); auth_probe_record_failure_for_testing(b.as_ref(), ip_b, now);
clear_auth_probe_state_for_testing_in_shared(a.as_ref()); clear_auth_probe_state_for_testing_in_shared(a.as_ref());
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a), None); assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a),
None
);
assert_eq!( assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip_b), auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip_b),
Some(1) Some(1)
@ -191,8 +216,14 @@ fn proxy_shared_state_parallel_auth_probe_updates_stay_per_instance() {
auth_probe_record_failure_for_testing(b.as_ref(), ip, now + Duration::from_millis(1)); auth_probe_record_failure_for_testing(b.as_ref(), ip, now + Duration::from_millis(1));
} }
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip), Some(5)); assert_eq!(
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip), Some(3)); auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip),
Some(5)
);
assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip),
Some(3)
);
} }
#[tokio::test] #[tokio::test]
@ -317,8 +348,14 @@ fn proxy_shared_state_auth_saturation_does_not_bleed_across_instances() {
auth_probe_record_failure_for_testing(a.as_ref(), ip, future_now); auth_probe_record_failure_for_testing(a.as_ref(), ip, future_now);
} }
assert!(auth_probe_is_throttled_for_testing_in_shared(a.as_ref(), ip)); assert!(auth_probe_is_throttled_for_testing_in_shared(
assert!(!auth_probe_is_throttled_for_testing_in_shared(b.as_ref(), ip)); a.as_ref(),
ip
));
assert!(!auth_probe_is_throttled_for_testing_in_shared(
b.as_ref(),
ip
));
} }
#[test] #[test]
@ -348,7 +385,10 @@ fn proxy_shared_state_poison_clear_in_one_instance_does_not_affect_other_instanc
clear_auth_probe_state_for_testing_in_shared(a.as_ref()); clear_auth_probe_state_for_testing_in_shared(a.as_ref());
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a), None); assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a),
None
);
assert_eq!( assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip_b), auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip_b),
Some(1), Some(1),
@ -463,7 +503,10 @@ fn proxy_shared_state_warned_secret_clear_in_one_instance_does_not_clear_other()
clear_warned_secrets_for_testing_in_shared(a.as_ref()); clear_warned_secrets_for_testing_in_shared(a.as_ref());
clear_warned_secrets_for_testing_in_shared(b.as_ref()); clear_warned_secrets_for_testing_in_shared(b.as_ref());
let key = ("clear-isolation-user".to_string(), "invalid_length".to_string()); let key = (
"clear-isolation-user".to_string(),
"invalid_length".to_string(),
);
{ {
let warned_a = warned_secrets_for_testing_in_shared(a.as_ref()); let warned_a = warned_secrets_for_testing_in_shared(a.as_ref());
let mut guard_a = warned_a let mut guard_a = warned_a
@ -508,14 +551,24 @@ fn proxy_shared_state_desync_duplicate_suppression_is_instance_scoped() {
let now = Instant::now(); let now = Instant::now();
let key = 0xBEEF_0000_0000_0001u64; let key = 0xBEEF_0000_0000_0001u64;
assert!(should_emit_full_desync_for_testing(a.as_ref(), key, false, now)); assert!(should_emit_full_desync_for_testing(
a.as_ref(),
key,
false,
now
));
assert!(!should_emit_full_desync_for_testing( assert!(!should_emit_full_desync_for_testing(
a.as_ref(), a.as_ref(),
key, key,
false, false,
now + Duration::from_millis(1) now + Duration::from_millis(1)
)); ));
assert!(should_emit_full_desync_for_testing(b.as_ref(), key, false, now)); assert!(should_emit_full_desync_for_testing(
b.as_ref(),
key,
false,
now
));
} }
#[test] #[test]
@ -527,8 +580,18 @@ fn proxy_shared_state_desync_clear_in_one_instance_does_not_clear_other() {
let now = Instant::now(); let now = Instant::now();
let key = 0xCAFE_0000_0000_0001u64; let key = 0xCAFE_0000_0000_0001u64;
assert!(should_emit_full_desync_for_testing(a.as_ref(), key, false, now)); assert!(should_emit_full_desync_for_testing(
assert!(should_emit_full_desync_for_testing(b.as_ref(), key, false, now)); a.as_ref(),
key,
false,
now
));
assert!(should_emit_full_desync_for_testing(
b.as_ref(),
key,
false,
now
));
clear_desync_dedup_for_testing_in_shared(a.as_ref()); clear_desync_dedup_for_testing_in_shared(a.as_ref());
@ -558,7 +621,10 @@ fn proxy_shared_state_idle_candidate_clear_in_one_instance_does_not_affect_other
clear_relay_idle_candidate_for_testing(a.as_ref(), 1001); clear_relay_idle_candidate_for_testing(a.as_ref(), 1001);
assert_eq!(oldest_relay_idle_candidate_for_testing(a.as_ref()), None); assert_eq!(oldest_relay_idle_candidate_for_testing(a.as_ref()), None);
assert_eq!(oldest_relay_idle_candidate_for_testing(b.as_ref()), Some(2002)); assert_eq!(
oldest_relay_idle_candidate_for_testing(b.as_ref()),
Some(2002)
);
} }
#[test] #[test]

View File

@ -1,16 +1,17 @@
use crate::proxy::handshake::{ use crate::proxy::handshake::{
auth_probe_fail_streak_for_testing_in_shared, auth_probe_record_failure_for_testing, auth_probe_fail_streak_for_testing_in_shared, auth_probe_record_failure_for_testing,
clear_auth_probe_state_for_testing_in_shared, clear_unknown_sni_warn_state_for_testing_in_shared, clear_auth_probe_state_for_testing_in_shared,
clear_unknown_sni_warn_state_for_testing_in_shared,
should_emit_unknown_sni_warn_for_testing_in_shared, should_emit_unknown_sni_warn_for_testing_in_shared,
}; };
use crate::proxy::middle_relay::{ use crate::proxy::middle_relay::{
clear_desync_dedup_for_testing_in_shared, clear_relay_idle_pressure_state_for_testing_in_shared, clear_desync_dedup_for_testing_in_shared,
mark_relay_idle_candidate_for_testing, oldest_relay_idle_candidate_for_testing, clear_relay_idle_pressure_state_for_testing_in_shared, mark_relay_idle_candidate_for_testing,
should_emit_full_desync_for_testing, oldest_relay_idle_candidate_for_testing, should_emit_full_desync_for_testing,
}; };
use crate::proxy::shared_state::ProxySharedState; use crate::proxy::shared_state::ProxySharedState;
use rand::SeedableRng;
use rand::RngExt; use rand::RngExt;
use rand::SeedableRng;
use rand::rngs::StdRng; use rand::rngs::StdRng;
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc; use std::sync::Arc;
@ -99,8 +100,14 @@ async fn proxy_shared_state_dual_instance_same_ip_high_contention_no_counter_ble
handle.await.expect("task join failed"); handle.await.expect("task join failed");
} }
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip), Some(64)); assert_eq!(
assert_eq!(auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip), Some(64)); auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip),
Some(64)
);
assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip),
Some(64)
);
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
@ -183,12 +190,7 @@ async fn proxy_shared_state_seed_matrix_concurrency_isolation_no_counter_bleed()
clear_auth_probe_state_for_testing_in_shared(shared_a.as_ref()); clear_auth_probe_state_for_testing_in_shared(shared_a.as_ref());
clear_auth_probe_state_for_testing_in_shared(shared_b.as_ref()); clear_auth_probe_state_for_testing_in_shared(shared_b.as_ref());
let ip = IpAddr::V4(Ipv4Addr::new( let ip = IpAddr::V4(Ipv4Addr::new(198, 51, 100, rng.random_range(1_u8..=250_u8)));
198,
51,
100,
rng.random_range(1_u8..=250_u8),
));
let workers = rng.random_range(16_usize..=48_usize); let workers = rng.random_range(16_usize..=48_usize);
let rounds = rng.random_range(4_usize..=10_usize); let rounds = rng.random_range(4_usize..=10_usize);
@ -210,7 +212,11 @@ async fn proxy_shared_state_seed_matrix_concurrency_isolation_no_counter_bleed()
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
start_a.wait().await; start_a.wait().await;
for _ in 0..a_ops { for _ in 0..a_ops {
auth_probe_record_failure_for_testing(shared_a.as_ref(), ip, Instant::now()); auth_probe_record_failure_for_testing(
shared_a.as_ref(),
ip,
Instant::now(),
);
} }
})); }));
@ -219,7 +225,11 @@ async fn proxy_shared_state_seed_matrix_concurrency_isolation_no_counter_bleed()
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
start_b.wait().await; start_b.wait().await;
for _ in 0..b_ops { for _ in 0..b_ops {
auth_probe_record_failure_for_testing(shared_b.as_ref(), ip, Instant::now()); auth_probe_record_failure_for_testing(
shared_b.as_ref(),
ip,
Instant::now(),
);
} }
})); }));
} }

View File

@ -69,7 +69,10 @@ async fn relay_baseline_activity_timeout_fires_after_inactivity() {
.expect("relay must complete after inactivity timeout") .expect("relay must complete after inactivity timeout")
.expect("relay task must not panic"); .expect("relay task must not panic");
assert!(done.is_ok(), "relay must return Ok(()) after inactivity timeout"); assert!(
done.is_ok(),
"relay must return Ok(()) after inactivity timeout"
);
} }
#[tokio::test] #[tokio::test]
@ -155,7 +158,10 @@ async fn relay_baseline_bidirectional_bytes_counted_symmetrically() {
.expect("relay task must not panic"); .expect("relay task must not panic");
assert!(done.is_ok()); assert!(done.is_ok());
assert_eq!(stats.get_user_total_octets(user), (c2s.len() + s2c.len()) as u64); assert_eq!(
stats.get_user_total_octets(user),
(c2s.len() + s2c.len()) as u64
);
} }
#[tokio::test] #[tokio::test]
@ -222,7 +228,10 @@ async fn relay_baseline_broken_pipe_midtransfer_returns_error() {
match done { match done {
Err(ProxyError::Io(err)) => { Err(ProxyError::Io(err)) => {
assert!( assert!(
matches!(err.kind(), io::ErrorKind::BrokenPipe | io::ErrorKind::ConnectionReset), matches!(
err.kind(),
io::ErrorKind::BrokenPipe | io::ErrorKind::ConnectionReset
),
"expected BrokenPipe/ConnectionReset, got {:?}", "expected BrokenPipe/ConnectionReset, got {:?}",
err.kind() err.kind()
); );

View File

@ -1,6 +1,6 @@
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
use rand::rngs::StdRng;
use rand::SeedableRng; use rand::SeedableRng;
use rand::rngs::StdRng;
use std::io; use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
@ -18,7 +18,10 @@ mod tests {
let arc = Arc::<AtomicUsize>::from_raw(data.cast::<AtomicUsize>()); let arc = Arc::<AtomicUsize>::from_raw(data.cast::<AtomicUsize>());
let cloned = Arc::clone(&arc); let cloned = Arc::clone(&arc);
let _ = Arc::into_raw(arc); let _ = Arc::into_raw(arc);
RawWaker::new(Arc::into_raw(cloned).cast::<()>(), &WAKE_COUNTER_WAKER_VTABLE) RawWaker::new(
Arc::into_raw(cloned).cast::<()>(),
&WAKE_COUNTER_WAKER_VTABLE,
)
} }
unsafe fn wake_counter_wake(data: *const ()) { unsafe fn wake_counter_wake(data: *const ()) {

View File

@ -1593,13 +1593,15 @@ impl Stats {
self.conntrack_delete_success_total.load(Ordering::Relaxed) self.conntrack_delete_success_total.load(Ordering::Relaxed)
} }
pub fn get_conntrack_delete_not_found_total(&self) -> u64 { pub fn get_conntrack_delete_not_found_total(&self) -> u64 {
self.conntrack_delete_not_found_total.load(Ordering::Relaxed) self.conntrack_delete_not_found_total
.load(Ordering::Relaxed)
} }
pub fn get_conntrack_delete_error_total(&self) -> u64 { pub fn get_conntrack_delete_error_total(&self) -> u64 {
self.conntrack_delete_error_total.load(Ordering::Relaxed) self.conntrack_delete_error_total.load(Ordering::Relaxed)
} }
pub fn get_conntrack_close_event_drop_total(&self) -> u64 { pub fn get_conntrack_close_event_drop_total(&self) -> u64 {
self.conntrack_close_event_drop_total.load(Ordering::Relaxed) self.conntrack_close_event_drop_total
.load(Ordering::Relaxed)
} }
pub fn get_me_keepalive_sent(&self) -> u64 { pub fn get_me_keepalive_sent(&self) -> u64 {
self.me_keepalive_sent.load(Ordering::Relaxed) self.me_keepalive_sent.load(Ordering::Relaxed)