Compare commits

..

No commits in common. "main" and "3.3.37" have entirely different histories.
main ... 3.3.37

44 changed files with 635 additions and 4325 deletions

View File

@ -151,14 +151,6 @@ jobs:
mkdir -p dist mkdir -p dist
cp "target/${{ matrix.target }}/release/${{ env.BINARY_NAME }}" dist/telemt cp "target/${{ matrix.target }}/release/${{ env.BINARY_NAME }}" dist/telemt
if [ "${{ matrix.target }}" = "aarch64-unknown-linux-gnu" ]; then
STRIP_BIN=aarch64-linux-gnu-strip
else
STRIP_BIN=strip
fi
"${STRIP_BIN}" dist/telemt
cd dist cd dist
tar -czf "${{ matrix.asset }}.tar.gz" \ tar -czf "${{ matrix.asset }}.tar.gz" \
--owner=0 --group=0 --numeric-owner \ --owner=0 --group=0 --numeric-owner \
@ -287,14 +279,6 @@ jobs:
mkdir -p dist mkdir -p dist
cp "target/${{ matrix.target }}/release/${{ env.BINARY_NAME }}" dist/telemt cp "target/${{ matrix.target }}/release/${{ env.BINARY_NAME }}" dist/telemt
if [ "${{ matrix.target }}" = "aarch64-unknown-linux-musl" ]; then
STRIP_BIN=aarch64-linux-musl-strip
else
STRIP_BIN=strip
fi
"${STRIP_BIN}" dist/telemt
cd dist cd dist
tar -czf "${{ matrix.asset }}.tar.gz" \ tar -czf "${{ matrix.asset }}.tar.gz" \
--owner=0 --group=0 --numeric-owner \ --owner=0 --group=0 --numeric-owner \

2
Cargo.lock generated
View File

@ -2780,7 +2780,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]] [[package]]
name = "telemt" name = "telemt"
version = "3.3.38" version = "3.3.37"
dependencies = [ dependencies = [
"aes", "aes",
"anyhow", "anyhow",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "3.3.38" version = "3.3.37"
edition = "2024" edition = "2024"
[features] [features]

16
LICENSE
View File

@ -1,4 +1,4 @@
######## TELEMT LICENSE 3.3 ######### ###### TELEMT Public License 3 ######
##### Copyright (c) 2026 Telemt ##### ##### Copyright (c) 2026 Telemt #####
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
@ -14,15 +14,11 @@ are preserved and complied with.
The canonical version of this License is the English version. The canonical version of this License is the English version.
Official translations are provided for informational purposes only Official translations are provided for informational purposes only
and for convenience, and do not have legal force. In case of any and for convenience, and do not have legal force. In case of any
discrepancy, the English version of this License shall prevail discrepancy, the English version of this License shall prevail.
Available versions:
/----------------------------------------------------------\ - English in Markdown: docs/LICENSE/LICENSE.md
| Language | Location | - German: docs/LICENSE/LICENSE.de.md
|-------------|--------------------------------------------| - Russian: docs/LICENSE/LICENSE.ru.md
| English | docs/LICENSE/TELEMT-LICENSE.en.md |
| German | docs/LICENSE/TELEMT-LICENSE.de.md |
| Russian | docs/LICENSE/TELEMT-LICENSE.ru.md |
\----------------------------------------------------------/
### License Versioning Policy ### License Versioning Policy

View File

@ -2,10 +2,7 @@
***Löst Probleme, bevor andere überhaupt wissen, dass sie existieren*** / ***It solves problems before others even realize they exist*** ***Löst Probleme, bevor andere überhaupt wissen, dass sie existieren*** / ***It solves problems before others even realize they exist***
### [**Telemt Chat in Telegram**](https://t.me/telemtrs) [**Telemt Chat in Telegram**](https://t.me/telemtrs)
#### Fixed TLS ClientHello is now available in Telegram Desktop starting from version 6.7.2: to work with EE-MTProxy, please update your client;
#### Fixed TLS ClientHello for Telegram Android Client is available in [our chat](https://t.me/telemtrs/30234/36441); official releases for Android and iOS are "work in progress";
**Telemt** is a fast, secure, and feature-rich server written in Rust: it fully implements the official Telegram proxy algo and adds many production-ready improvements such as: **Telemt** is a fast, secure, and feature-rich server written in Rust: it fully implements the official Telegram proxy algo and adds many production-ready improvements such as:
- [ME Pool + Reader/Writer + Registry + Refill + Adaptive Floor + Trio-State + Generation Lifecycle](https://github.com/telemt/telemt/blob/main/docs/model/MODEL.en.md) - [ME Pool + Reader/Writer + Registry + Refill + Adaptive Floor + Trio-State + Generation Lifecycle](https://github.com/telemt/telemt/blob/main/docs/model/MODEL.en.md)
@ -54,12 +51,8 @@
- [FAQ EN](docs/FAQ.en.md) - [FAQ EN](docs/FAQ.en.md)
### Recognizability for DPI and crawler ### Recognizability for DPI and crawler
Since version 1.1.0.0, we have debugged masking perfectly: for all clients without "presenting" a key,
On April 1, 2026, we became aware of a method for detecting MTProxy Fake-TLS, we transparently direct traffic to the target host!
based on the ECH extension and the ordering of cipher suites,
as well as an overall unique JA3/JA4 fingerprint
that does not occur in modern browsers:
we have already submitted initial changes to the Telegram Desktop developers and are working on updates for other clients.
- We consider this a breakthrough aspect, which has no stable analogues today - We consider this a breakthrough aspect, which has no stable analogues today
- Based on this: if `telemt` configured correctly, **TLS mode is completely identical to real-life handshake + communication** with a specified host - Based on this: if `telemt` configured correctly, **TLS mode is completely identical to real-life handshake + communication** with a specified host

File diff suppressed because it is too large Load Diff

View File

@ -128,8 +128,8 @@ WorkingDirectory=/opt/telemt
ExecStart=/bin/telemt /etc/telemt/telemt.toml ExecStart=/bin/telemt /etc/telemt/telemt.toml
Restart=on-failure Restart=on-failure
LimitNOFILE=65536 LimitNOFILE=65536
AmbientCapabilities=CAP_NET_ADMIN CAP_NET_BIND_SERVICE AmbientCapabilities=CAP_NET_BIND_SERVICE
CapabilityBoundingSet=CAP_NET_ADMIN CAP_NET_BIND_SERVICE CapabilityBoundingSet=CAP_NET_BIND_SERVICE
NoNewPrivileges=true NoNewPrivileges=true
[Install] [Install]

View File

@ -128,8 +128,8 @@ WorkingDirectory=/opt/telemt
ExecStart=/bin/telemt /etc/telemt/telemt.toml ExecStart=/bin/telemt /etc/telemt/telemt.toml
Restart=on-failure Restart=on-failure
LimitNOFILE=65536 LimitNOFILE=65536
AmbientCapabilities=CAP_NET_ADMIN CAP_NET_BIND_SERVICE AmbientCapabilities=CAP_NET_BIND_SERVICE
CapabilityBoundingSet=CAP_NET_ADMIN CAP_NET_BIND_SERVICE CapabilityBoundingSet=CAP_NET_BIND_SERVICE
NoNewPrivileges=true NoNewPrivileges=true
[Install] [Install]

View File

@ -8,62 +8,18 @@ CONFIG_DIR="${CONFIG_DIR:-/etc/telemt}"
CONFIG_FILE="${CONFIG_FILE:-${CONFIG_DIR}/telemt.toml}" CONFIG_FILE="${CONFIG_FILE:-${CONFIG_DIR}/telemt.toml}"
WORK_DIR="${WORK_DIR:-/opt/telemt}" WORK_DIR="${WORK_DIR:-/opt/telemt}"
TLS_DOMAIN="${TLS_DOMAIN:-petrovich.ru}" TLS_DOMAIN="${TLS_DOMAIN:-petrovich.ru}"
SERVER_PORT="${SERVER_PORT:-443}"
USER_SECRET=""
AD_TAG=""
SERVICE_NAME="telemt" SERVICE_NAME="telemt"
TEMP_DIR="" TEMP_DIR=""
SUDO="" SUDO=""
CONFIG_PARENT_DIR="" CONFIG_PARENT_DIR=""
SERVICE_START_FAILED=0 SERVICE_START_FAILED=0
PORT_PROVIDED=0
SECRET_PROVIDED=0
AD_TAG_PROVIDED=0
DOMAIN_PROVIDED=0
ACTION="install" ACTION="install"
TARGET_VERSION="${VERSION:-latest}" TARGET_VERSION="${VERSION:-latest}"
while [ $# -gt 0 ]; do while [ $# -gt 0 ]; do
case "$1" in case "$1" in
-h|--help) ACTION="help"; shift ;; -h|--help) ACTION="help"; shift ;;
-d|--domain)
if [ "$#" -lt 2 ] || [ -z "$2" ]; then
printf '[ERROR] %s requires a domain argument.\n' "$1" >&2
exit 1
fi
TLS_DOMAIN="$2"; DOMAIN_PROVIDED=1; shift 2 ;;
-p|--port)
if [ "$#" -lt 2 ] || [ -z "$2" ]; then
printf '[ERROR] %s requires a port argument.\n' "$1" >&2; exit 1
fi
case "$2" in
*[!0-9]*) printf '[ERROR] Port must be a valid number.\n' >&2; exit 1 ;;
esac
port_num="$(printf '%s\n' "$2" | sed 's/^0*//')"
[ -z "$port_num" ] && port_num="0"
if [ "${#port_num}" -gt 5 ] || [ "$port_num" -lt 1 ] || [ "$port_num" -gt 65535 ]; then
printf '[ERROR] Port must be between 1 and 65535.\n' >&2; exit 1
fi
SERVER_PORT="$port_num"; PORT_PROVIDED=1; shift 2 ;;
-s|--secret)
if [ "$#" -lt 2 ] || [ -z "$2" ]; then
printf '[ERROR] %s requires a secret argument.\n' "$1" >&2; exit 1
fi
case "$2" in
*[!0-9a-fA-F]*)
printf '[ERROR] Secret must contain only hex characters.\n' >&2; exit 1 ;;
esac
if [ "${#2}" -ne 32 ]; then
printf '[ERROR] Secret must be exactly 32 chars.\n' >&2; exit 1
fi
USER_SECRET="$2"; SECRET_PROVIDED=1; shift 2 ;;
-a|--ad-tag|--ad_tag)
if [ "$#" -lt 2 ] || [ -z "$2" ]; then
printf '[ERROR] %s requires an ad_tag argument.\n' "$1" >&2; exit 1
fi
AD_TAG="$2"; AD_TAG_PROVIDED=1; shift 2 ;;
uninstall|--uninstall) uninstall|--uninstall)
if [ "$ACTION" != "purge" ]; then ACTION="uninstall"; fi if [ "$ACTION" != "purge" ]; then ACTION="uninstall"; fi
shift ;; shift ;;
@ -96,17 +52,11 @@ cleanup() {
trap cleanup EXIT INT TERM trap cleanup EXIT INT TERM
show_help() { show_help() {
say "Usage: $0 [ <version> | install | uninstall | purge ] [ options ]" say "Usage: $0 [ <version> | install | uninstall | purge | --help ]"
say " <version> Install specific version (e.g. 3.3.15, default: latest)" say " <version> Install specific version (e.g. 3.3.15, default: latest)"
say " install Install the latest version" say " install Install the latest version"
say " uninstall Remove the binary and service" say " uninstall Remove the binary and service (keeps config and user)"
say " purge Remove everything including configuration, data, and user" say " purge Remove everything including configuration, data, and user"
say ""
say "Options:"
say " -d, --domain Set TLS domain (default: petrovich.ru)"
say " -p, --port Set server port (default: 443)"
say " -s, --secret Set specific user secret (32 hex characters)"
say " -a, --ad-tag Set ad_tag"
exit 0 exit 0
} }
@ -162,14 +112,6 @@ get_svc_mgr() {
else echo "none"; fi else echo "none"; fi
} }
is_config_exists() {
if [ -n "$SUDO" ]; then
$SUDO sh -c '[ -f "$1" ]' _ "$CONFIG_FILE"
else
[ -f "$CONFIG_FILE" ]
fi
}
verify_common() { verify_common() {
[ -n "$BIN_NAME" ] || die "BIN_NAME cannot be empty." [ -n "$BIN_NAME" ] || die "BIN_NAME cannot be empty."
[ -n "$INSTALL_DIR" ] || die "INSTALL_DIR cannot be empty." [ -n "$INSTALL_DIR" ] || die "INSTALL_DIR cannot be empty."
@ -177,7 +119,7 @@ verify_common() {
[ -n "$CONFIG_FILE" ] || die "CONFIG_FILE cannot be empty." [ -n "$CONFIG_FILE" ] || die "CONFIG_FILE cannot be empty."
case "${INSTALL_DIR}${CONFIG_DIR}${WORK_DIR}${CONFIG_FILE}" in case "${INSTALL_DIR}${CONFIG_DIR}${WORK_DIR}${CONFIG_FILE}" in
*[!a-zA-Z0-9_./-]*) die "Invalid characters in paths." ;; *[!a-zA-Z0-9_./-]*) die "Invalid characters in paths. Only alphanumeric, _, ., -, and / allowed." ;;
esac esac
case "$TARGET_VERSION" in *[!a-zA-Z0-9_.-]*) die "Invalid characters in version." ;; esac case "$TARGET_VERSION" in *[!a-zA-Z0-9_.-]*) die "Invalid characters in version." ;; esac
@ -195,11 +137,11 @@ verify_common() {
if [ "$(id -u)" -eq 0 ]; then if [ "$(id -u)" -eq 0 ]; then
SUDO="" SUDO=""
else else
command -v sudo >/dev/null 2>&1 || die "This script requires root or sudo." command -v sudo >/dev/null 2>&1 || die "This script requires root or sudo. Neither found."
SUDO="sudo" SUDO="sudo"
if ! sudo -n true 2>/dev/null; then if ! sudo -n true 2>/dev/null; then
if ! [ -t 0 ]; then if ! [ -t 0 ]; then
die "sudo requires a password, but no TTY detected." die "sudo requires a password, but no TTY detected. Aborting to prevent hang."
fi fi
fi fi
fi fi
@ -212,7 +154,21 @@ verify_common() {
die "Safety check failed: CONFIG_FILE '$CONFIG_FILE' is a directory." die "Safety check failed: CONFIG_FILE '$CONFIG_FILE' is a directory."
fi fi
for cmd in id uname awk grep find rm chown chmod mv mktemp mkdir tr dd sed ps head sleep cat tar gzip; do for path in "$CONFIG_DIR" "$CONFIG_PARENT_DIR" "$WORK_DIR"; do
check_path="$(get_realpath "$path")"
case "$check_path" in
/|/bin|/sbin|/usr|/usr/bin|/usr/sbin|/usr/local|/usr/local/bin|/usr/local/sbin|/usr/local/etc|/usr/local/share|/etc|/var|/var/lib|/var/log|/var/run|/home|/root|/tmp|/lib|/lib64|/opt|/run|/boot|/dev|/sys|/proc)
die "Safety check failed: '$path' (resolved to '$check_path') is a critical system directory." ;;
esac
done
check_install_dir="$(get_realpath "$INSTALL_DIR")"
case "$check_install_dir" in
/|/etc|/var|/home|/root|/tmp|/usr|/usr/local|/opt|/boot|/dev|/sys|/proc|/run)
die "Safety check failed: INSTALL_DIR '$INSTALL_DIR' is a critical system directory." ;;
esac
for cmd in id uname grep find rm chown chmod mv mktemp mkdir tr dd sed ps head sleep cat tar gzip rmdir; do
command -v "$cmd" >/dev/null 2>&1 || die "Required command not found: $cmd" command -v "$cmd" >/dev/null 2>&1 || die "Required command not found: $cmd"
done done
} }
@ -221,41 +177,14 @@ verify_install_deps() {
command -v curl >/dev/null 2>&1 || command -v wget >/dev/null 2>&1 || die "Neither curl nor wget is installed." command -v curl >/dev/null 2>&1 || command -v wget >/dev/null 2>&1 || die "Neither curl nor wget is installed."
command -v cp >/dev/null 2>&1 || command -v install >/dev/null 2>&1 || die "Need cp or install" command -v cp >/dev/null 2>&1 || command -v install >/dev/null 2>&1 || die "Need cp or install"
if ! command -v setcap >/dev/null 2>&1 || ! command -v conntrack >/dev/null 2>&1; then if ! command -v setcap >/dev/null 2>&1; then
if command -v apk >/dev/null 2>&1; then if command -v apk >/dev/null 2>&1; then
$SUDO apk add --no-cache libcap-utils libcap conntrack-tools >/dev/null 2>&1 || true $SUDO apk add --no-cache libcap-utils >/dev/null 2>&1 || $SUDO apk add --no-cache libcap >/dev/null 2>&1 || true
elif command -v apt-get >/dev/null 2>&1; then elif command -v apt-get >/dev/null 2>&1; then
$SUDO env DEBIAN_FRONTEND=noninteractive apt-get install -y -q libcap2-bin conntrack >/dev/null 2>&1 || { $SUDO apt-get update -q >/dev/null 2>&1 || true
$SUDO env DEBIAN_FRONTEND=noninteractive apt-get update -q >/dev/null 2>&1 || true $SUDO apt-get install -y -q libcap2-bin >/dev/null 2>&1 || true
$SUDO env DEBIAN_FRONTEND=noninteractive apt-get install -y -q libcap2-bin conntrack >/dev/null 2>&1 || true elif command -v dnf >/dev/null 2>&1; then $SUDO dnf install -y -q libcap >/dev/null 2>&1 || true
} elif command -v yum >/dev/null 2>&1; then $SUDO yum install -y -q libcap >/dev/null 2>&1 || true
elif command -v dnf >/dev/null 2>&1; then $SUDO dnf install -y -q libcap conntrack-tools >/dev/null 2>&1 || true
elif command -v yum >/dev/null 2>&1; then $SUDO yum install -y -q libcap conntrack-tools >/dev/null 2>&1 || true
fi
fi
}
check_port_availability() {
port_info=""
if command -v ss >/dev/null 2>&1; then
port_info=$($SUDO ss -tulnp 2>/dev/null | grep -E ":${SERVER_PORT}([[:space:]]|$)" || true)
elif command -v netstat >/dev/null 2>&1; then
port_info=$($SUDO netstat -tulnp 2>/dev/null | grep -E ":${SERVER_PORT}([[:space:]]|$)" || true)
elif command -v lsof >/dev/null 2>&1; then
port_info=$($SUDO lsof -i :${SERVER_PORT} 2>/dev/null | grep LISTEN || true)
else
say "[WARNING] Network diagnostic tools (ss, netstat, lsof) not found. Skipping port check."
return 0
fi
if [ -n "$port_info" ]; then
if printf '%s\n' "$port_info" | grep -q "${BIN_NAME}"; then
say " -> Port ${SERVER_PORT} is in use by ${BIN_NAME}. Ignoring as it will be restarted."
else
say "[ERROR] Port ${SERVER_PORT} is already in use by another process:"
printf ' %s\n' "$port_info"
die "Please free the port ${SERVER_PORT} or change it and try again."
fi fi
fi fi
} }
@ -263,13 +192,7 @@ check_port_availability() {
detect_arch() { detect_arch() {
sys_arch="$(uname -m)" sys_arch="$(uname -m)"
case "$sys_arch" in case "$sys_arch" in
x86_64|amd64) x86_64|amd64) echo "x86_64" ;;
if [ -r /proc/cpuinfo ] && grep -q "avx2" /proc/cpuinfo 2>/dev/null && grep -q "bmi2" /proc/cpuinfo 2>/dev/null; then
echo "x86_64-v3"
else
echo "x86_64"
fi
;;
aarch64|arm64) echo "aarch64" ;; aarch64|arm64) echo "aarch64" ;;
*) die "Unsupported architecture: $sys_arch" ;; *) die "Unsupported architecture: $sys_arch" ;;
esac esac
@ -338,19 +261,17 @@ install_binary() {
fi fi
$SUDO mkdir -p "$INSTALL_DIR" || die "Failed to create install directory" $SUDO mkdir -p "$INSTALL_DIR" || die "Failed to create install directory"
$SUDO rm -f "$bin_dst" 2>/dev/null || true
if command -v install >/dev/null 2>&1; then if command -v install >/dev/null 2>&1; then
$SUDO install -m 0755 "$bin_src" "$bin_dst" || die "Failed to install binary" $SUDO install -m 0755 "$bin_src" "$bin_dst" || die "Failed to install binary"
else else
$SUDO rm -f "$bin_dst" 2>/dev/null || true
$SUDO cp "$bin_src" "$bin_dst" && $SUDO chmod 0755 "$bin_dst" || die "Failed to copy binary" $SUDO cp "$bin_src" "$bin_dst" && $SUDO chmod 0755 "$bin_dst" || die "Failed to copy binary"
fi fi
$SUDO sh -c '[ -x "$1" ]' _ "$bin_dst" || die "Binary not executable: $bin_dst" $SUDO sh -c '[ -x "$1" ]' _ "$bin_dst" || die "Binary not executable: $bin_dst"
if command -v setcap >/dev/null 2>&1; then if command -v setcap >/dev/null 2>&1; then
$SUDO setcap cap_net_bind_service,cap_net_admin=+ep "$bin_dst" 2>/dev/null || true $SUDO setcap cap_net_bind_service=+ep "$bin_dst" 2>/dev/null || true
fi fi
} }
@ -366,20 +287,11 @@ generate_secret() {
} }
generate_config_content() { generate_config_content() {
conf_secret="$1"
conf_tag="$2"
escaped_tls_domain="$(printf '%s\n' "$TLS_DOMAIN" | tr -d '[:cntrl:]' | sed 's/\\/\\\\/g; s/"/\\"/g')" escaped_tls_domain="$(printf '%s\n' "$TLS_DOMAIN" | tr -d '[:cntrl:]' | sed 's/\\/\\\\/g; s/"/\\"/g')"
cat <<EOF cat <<EOF
[general] [general]
use_middle_proxy = true use_middle_proxy = false
EOF
if [ -n "$conf_tag" ]; then
echo "ad_tag = \"${conf_tag}\""
fi
cat <<EOF
[general.modes] [general.modes]
classic = false classic = false
@ -387,7 +299,7 @@ secure = false
tls = true tls = true
[server] [server]
port = ${SERVER_PORT} port = 443
[server.api] [server.api]
enabled = true enabled = true
@ -398,73 +310,28 @@ whitelist = ["127.0.0.1/32"]
tls_domain = "${escaped_tls_domain}" tls_domain = "${escaped_tls_domain}"
[access.users] [access.users]
hello = "${conf_secret}" hello = "$1"
EOF EOF
} }
install_config() { install_config() {
if is_config_exists; then if [ -n "$SUDO" ]; then
say " -> Config already exists at $CONFIG_FILE. Updating parameters..." if $SUDO sh -c '[ -f "$1" ]' _ "$CONFIG_FILE"; then
say " -> Config already exists at $CONFIG_FILE. Skipping creation."
tmp_conf="${TEMP_DIR}/config.tmp" return 0
$SUDO cat "$CONFIG_FILE" > "$tmp_conf" fi
elif [ -f "$CONFIG_FILE" ]; then
escaped_domain="$(printf '%s\n' "$TLS_DOMAIN" | tr -d '[:cntrl:]' | sed 's/\\/\\\\/g; s/"/\\"/g')" say " -> Config already exists at $CONFIG_FILE. Skipping creation."
export AWK_PORT="$SERVER_PORT"
export AWK_SECRET="$USER_SECRET"
export AWK_DOMAIN="$escaped_domain"
export AWK_AD_TAG="$AD_TAG"
export AWK_FLAG_P="$PORT_PROVIDED"
export AWK_FLAG_S="$SECRET_PROVIDED"
export AWK_FLAG_D="$DOMAIN_PROVIDED"
export AWK_FLAG_A="$AD_TAG_PROVIDED"
awk '
BEGIN { ad_tag_handled = 0 }
ENVIRON["AWK_FLAG_P"] == "1" && /^[ \t]*port[ \t]*=/ { print "port = " ENVIRON["AWK_PORT"]; next }
ENVIRON["AWK_FLAG_S"] == "1" && /^[ \t]*hello[ \t]*=/ { print "hello = \"" ENVIRON["AWK_SECRET"] "\""; next }
ENVIRON["AWK_FLAG_D"] == "1" && /^[ \t]*tls_domain[ \t]*=/ { print "tls_domain = \"" ENVIRON["AWK_DOMAIN"] "\""; next }
ENVIRON["AWK_FLAG_A"] == "1" && /^[ \t]*ad_tag[ \t]*=/ {
if (!ad_tag_handled) {
print "ad_tag = \"" ENVIRON["AWK_AD_TAG"] "\"";
ad_tag_handled = 1;
}
next
}
ENVIRON["AWK_FLAG_A"] == "1" && /^\[general\]/ {
print;
if (!ad_tag_handled) {
print "ad_tag = \"" ENVIRON["AWK_AD_TAG"] "\"";
ad_tag_handled = 1;
}
next
}
{ print }
' "$tmp_conf" > "${tmp_conf}.new" && mv "${tmp_conf}.new" "$tmp_conf"
[ "$PORT_PROVIDED" -eq 1 ] && say " -> Updated port: $SERVER_PORT"
[ "$SECRET_PROVIDED" -eq 1 ] && say " -> Updated secret for user 'hello'"
[ "$DOMAIN_PROVIDED" -eq 1 ] && say " -> Updated tls_domain: $TLS_DOMAIN"
[ "$AD_TAG_PROVIDED" -eq 1 ] && say " -> Updated ad_tag"
write_root "$CONFIG_FILE" < "$tmp_conf"
rm -f "$tmp_conf"
return 0 return 0
fi fi
if [ -z "$USER_SECRET" ]; then toml_secret="$(generate_secret)" || die "Failed to generate secret."
USER_SECRET="$(generate_secret)" || die "Failed to generate secret."
fi
generate_config_content "$USER_SECRET" "$AD_TAG" | write_root "$CONFIG_FILE" || die "Failed to install config" generate_config_content "$toml_secret" | write_root "$CONFIG_FILE" || die "Failed to install config"
$SUDO chown root:telemt "$CONFIG_FILE" && $SUDO chmod 640 "$CONFIG_FILE" $SUDO chown root:telemt "$CONFIG_FILE" && $SUDO chmod 640 "$CONFIG_FILE"
say " -> Config created successfully." say " -> Config created successfully."
say " -> Configured secret for user 'hello': $USER_SECRET" say " -> Generated secret for default user 'hello': $toml_secret"
} }
generate_systemd_content() { generate_systemd_content() {
@ -481,10 +348,9 @@ Group=telemt
WorkingDirectory=$WORK_DIR WorkingDirectory=$WORK_DIR
ExecStart="${INSTALL_DIR}/${BIN_NAME}" "${CONFIG_FILE}" ExecStart="${INSTALL_DIR}/${BIN_NAME}" "${CONFIG_FILE}"
Restart=on-failure Restart=on-failure
RestartSec=5
LimitNOFILE=65536 LimitNOFILE=65536
AmbientCapabilities=CAP_NET_BIND_SERVICE CAP_NET_ADMIN AmbientCapabilities=CAP_NET_BIND_SERVICE
CapabilityBoundingSet=CAP_NET_BIND_SERVICE CAP_NET_ADMIN CapabilityBoundingSet=CAP_NET_BIND_SERVICE
[Install] [Install]
WantedBy=multi-user.target WantedBy=multi-user.target
@ -549,8 +415,7 @@ kill_user_procs() {
if command -v pgrep >/dev/null 2>&1; then if command -v pgrep >/dev/null 2>&1; then
pids="$(pgrep -u telemt 2>/dev/null || true)" pids="$(pgrep -u telemt 2>/dev/null || true)"
else else
pids="$(ps -ef 2>/dev/null | awk '$1=="telemt"{print $2}' || true)" pids="$(ps -u telemt -o pid= 2>/dev/null || true)"
[ -z "$pids" ] && pids="$(ps 2>/dev/null | awk '$2=="telemt"{print $1}' || true)"
fi fi
if [ -n "$pids" ]; then if [ -n "$pids" ]; then
@ -592,12 +457,11 @@ uninstall() {
say ">>> Stage 5: Purging configuration, data, and user" say ">>> Stage 5: Purging configuration, data, and user"
$SUDO rm -rf "$CONFIG_DIR" "$WORK_DIR" $SUDO rm -rf "$CONFIG_DIR" "$WORK_DIR"
$SUDO rm -f "$CONFIG_FILE" $SUDO rm -f "$CONFIG_FILE"
sleep 1 if [ "$CONFIG_PARENT_DIR" != "$CONFIG_DIR" ] && [ "$CONFIG_PARENT_DIR" != "." ] && [ "$CONFIG_PARENT_DIR" != "/" ]; then
$SUDO userdel telemt 2>/dev/null || $SUDO deluser telemt 2>/dev/null || true $SUDO rmdir "$CONFIG_PARENT_DIR" 2>/dev/null || true
if check_os_entity group telemt; then
$SUDO groupdel telemt 2>/dev/null || $SUDO delgroup telemt 2>/dev/null || true
fi fi
$SUDO userdel telemt 2>/dev/null || $SUDO deluser telemt 2>/dev/null || true
$SUDO groupdel telemt 2>/dev/null || $SUDO delgroup telemt 2>/dev/null || true
else else
say "Note: Configuration and user kept. Run with 'purge' to remove completely." say "Note: Configuration and user kept. Run with 'purge' to remove completely."
fi fi
@ -615,17 +479,7 @@ case "$ACTION" in
say "Starting installation of $BIN_NAME (Version: $TARGET_VERSION)" say "Starting installation of $BIN_NAME (Version: $TARGET_VERSION)"
say ">>> Stage 1: Verifying environment and dependencies" say ">>> Stage 1: Verifying environment and dependencies"
verify_common verify_common; verify_install_deps
verify_install_deps
if is_config_exists && [ "$PORT_PROVIDED" -eq 0 ]; then
ext_port="$($SUDO awk -F'=' '/^[ \t]*port[ \t]*=/ {gsub(/[^0-9]/, "", $2); print $2; exit}' "$CONFIG_FILE" 2>/dev/null || true)"
if [ -n "$ext_port" ]; then
SERVER_PORT="$ext_port"
fi
fi
check_port_availability
if [ "$TARGET_VERSION" != "latest" ]; then if [ "$TARGET_VERSION" != "latest" ]; then
TARGET_VERSION="${TARGET_VERSION#v}" TARGET_VERSION="${TARGET_VERSION#v}"
@ -646,21 +500,7 @@ case "$ACTION" in
die "Temp directory is invalid or was not created" die "Temp directory is invalid or was not created"
fi fi
if ! fetch_file "$DL_URL" "${TEMP_DIR}/${FILE_NAME}"; then fetch_file "$DL_URL" "${TEMP_DIR}/${FILE_NAME}" || die "Download failed"
if [ "$ARCH" = "x86_64-v3" ]; then
say " -> x86_64-v3 build not found, falling back to standard x86_64..."
ARCH="x86_64"
FILE_NAME="${BIN_NAME}-${ARCH}-linux-${LIBC}.tar.gz"
if [ "$TARGET_VERSION" = "latest" ]; then
DL_URL="https://github.com/${REPO}/releases/latest/download/${FILE_NAME}"
else
DL_URL="https://github.com/${REPO}/releases/download/${TARGET_VERSION}/${FILE_NAME}"
fi
fetch_file "$DL_URL" "${TEMP_DIR}/${FILE_NAME}" || die "Download failed"
else
die "Download failed"
fi
fi
say ">>> Stage 3: Extracting archive" say ">>> Stage 3: Extracting archive"
if ! gzip -dc "${TEMP_DIR}/${FILE_NAME}" | tar -xf - -C "$TEMP_DIR" 2>/dev/null; then if ! gzip -dc "${TEMP_DIR}/${FILE_NAME}" | tar -xf - -C "$TEMP_DIR" 2>/dev/null; then
@ -676,7 +516,7 @@ case "$ACTION" in
say ">>> Stage 5: Installing binary" say ">>> Stage 5: Installing binary"
install_binary "$EXTRACTED_BIN" "${INSTALL_DIR}/${BIN_NAME}" install_binary "$EXTRACTED_BIN" "${INSTALL_DIR}/${BIN_NAME}"
say ">>> Stage 6: Generating/Updating configuration" say ">>> Stage 6: Generating configuration"
install_config install_config
say ">>> Stage 7: Installing and starting service" say ">>> Stage 7: Installing and starting service"
@ -703,14 +543,11 @@ case "$ACTION" in
printf ' rc-service %s status\n\n' "$SERVICE_NAME" printf ' rc-service %s status\n\n' "$SERVICE_NAME"
fi fi
API_LISTEN="$($SUDO awk -F'"' '/^[ \t]*listen[ \t]*=/ {print $2; exit}' "$CONFIG_FILE" 2>/dev/null || true)"
API_LISTEN="${API_LISTEN:-127.0.0.1:9091}"
printf 'To get your user connection links (for Telegram), run:\n' printf 'To get your user connection links (for Telegram), run:\n'
if command -v jq >/dev/null 2>&1; then if command -v jq >/dev/null 2>&1; then
printf ' curl -s http://%s/v1/users | jq -r '\''.data[]? | "User: \\(.username)\\n\\(.links.tls[0] // empty)\\n"'\''\n' "$API_LISTEN" printf ' curl -s http://127.0.0.1:9091/v1/users | jq -r '\''.data[] | "User: \\(.username)\\n\\(.links.tls[0] // empty)\\n"'\''\n'
else else
printf ' curl -s http://%s/v1/users\n' "$API_LISTEN" printf ' curl -s http://127.0.0.1:9091/v1/users\n'
printf ' (Tip: Install '\''jq'\'' for a much cleaner output)\n' printf ' (Tip: Install '\''jq'\'' for a much cleaner output)\n'
fi fi

View File

@ -100,7 +100,7 @@ pub(crate) fn default_fake_cert_len() -> usize {
} }
pub(crate) fn default_tls_front_dir() -> String { pub(crate) fn default_tls_front_dir() -> String {
"/etc/telemt/tlsfront".to_string() "tlsfront".to_string()
} }
pub(crate) fn default_replay_check_len() -> usize { pub(crate) fn default_replay_check_len() -> usize {
@ -302,7 +302,7 @@ pub(crate) fn default_me2dc_fallback() -> bool {
} }
pub(crate) fn default_me2dc_fast() -> bool { pub(crate) fn default_me2dc_fast() -> bool {
true false
} }
pub(crate) fn default_keepalive_interval() -> u64 { pub(crate) fn default_keepalive_interval() -> u64 {
@ -558,7 +558,7 @@ pub(crate) fn default_beobachten_flush_secs() -> u64 {
} }
pub(crate) fn default_beobachten_file() -> String { pub(crate) fn default_beobachten_file() -> String {
"/etc/telemt/beobachten.txt".to_string() "cache/beobachten.txt".to_string()
} }
pub(crate) fn default_tls_new_session_tickets() -> u8 { pub(crate) fn default_tls_new_session_tickets() -> u8 {

View File

@ -947,11 +947,7 @@ impl ProxyConfig {
} }
if matches!(config.server.conntrack_control.mode, ConntrackMode::Hybrid) if matches!(config.server.conntrack_control.mode, ConntrackMode::Hybrid)
&& config && config.server.conntrack_control.hybrid_listener_ips.is_empty()
.server
.conntrack_control
.hybrid_listener_ips
.is_empty()
{ {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid" "server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid"
@ -2507,9 +2503,9 @@ mod tests {
let path = dir.join("telemt_conntrack_high_watermark_invalid_test.toml"); let path = dir.join("telemt_conntrack_high_watermark_invalid_test.toml");
std::fs::write(&path, toml).unwrap(); std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string(); let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains( assert!(
"server.conntrack_control.pressure_high_watermark_pct must be within [1, 100]" err.contains("server.conntrack_control.pressure_high_watermark_pct must be within [1, 100]")
)); );
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }
@ -2574,9 +2570,9 @@ mod tests {
let path = dir.join("telemt_conntrack_hybrid_requires_ips_test.toml"); let path = dir.join("telemt_conntrack_hybrid_requires_ips_test.toml");
std::fs::write(&path, toml).unwrap(); std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string(); let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains( assert!(
"server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid" err.contains("server.conntrack_control.hybrid_listener_ips must be non-empty in mode=hybrid")
)); );
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }

View File

@ -56,11 +56,7 @@ pub(crate) fn spawn_conntrack_controller(
shared: Arc<ProxySharedState>, shared: Arc<ProxySharedState>,
) { ) {
if !cfg!(target_os = "linux") { if !cfg!(target_os = "linux") {
let enabled = config_rx let enabled = config_rx.borrow().server.conntrack_control.inline_conntrack_control;
.borrow()
.server
.conntrack_control
.inline_conntrack_control;
stats.set_conntrack_control_enabled(enabled); stats.set_conntrack_control_enabled(enabled);
stats.set_conntrack_control_available(false); stats.set_conntrack_control_available(false);
stats.set_conntrack_pressure_active(false); stats.set_conntrack_pressure_active(false);
@ -69,9 +65,7 @@ pub(crate) fn spawn_conntrack_controller(
shared.disable_conntrack_close_sender(); shared.disable_conntrack_close_sender();
shared.set_conntrack_pressure_active(false); shared.set_conntrack_pressure_active(false);
if enabled { if enabled {
warn!( warn!("conntrack control is configured but unsupported on this OS; disabling runtime worker");
"conntrack control is configured but unsupported on this OS; disabling runtime worker"
);
} }
return; return;
} }
@ -94,13 +88,7 @@ async fn run_conntrack_controller(
let mut delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec; let mut delete_budget_tokens = cfg.server.conntrack_control.delete_budget_per_sec;
let mut backend = pick_backend(cfg.server.conntrack_control.backend); let mut backend = pick_backend(cfg.server.conntrack_control.backend);
apply_runtime_state( apply_runtime_state(stats.as_ref(), shared.as_ref(), &cfg, backend.is_some(), false);
stats.as_ref(),
shared.as_ref(),
&cfg,
backend.is_some(),
false,
);
reconcile_rules(&cfg, backend, stats.as_ref()).await; reconcile_rules(&cfg, backend, stats.as_ref()).await;
loop { loop {
@ -327,9 +315,7 @@ fn pick_backend(configured: ConntrackBackend) -> Option<NetfilterBackend> {
} }
} }
ConntrackBackend::Nftables => command_exists("nft").then_some(NetfilterBackend::Nftables), ConntrackBackend::Nftables => command_exists("nft").then_some(NetfilterBackend::Nftables),
ConntrackBackend::Iptables => { ConntrackBackend::Iptables => command_exists("iptables").then_some(NetfilterBackend::Iptables),
command_exists("iptables").then_some(NetfilterBackend::Iptables)
}
} }
} }
@ -410,12 +396,7 @@ fn notrack_targets(cfg: &ProxyConfig) -> (Vec<Option<IpAddr>>, Vec<Option<IpAddr
} }
async fn apply_nft_rules(cfg: &ProxyConfig) -> Result<(), String> { async fn apply_nft_rules(cfg: &ProxyConfig) -> Result<(), String> {
let _ = run_command( let _ = run_command("nft", &["delete", "table", "inet", "telemt_conntrack"], None).await;
"nft",
&["delete", "table", "inet", "telemt_conntrack"],
None,
)
.await;
if matches!(cfg.server.conntrack_control.mode, ConntrackMode::Tracked) { if matches!(cfg.server.conntrack_control.mode, ConntrackMode::Tracked) {
return Ok(()); return Ok(());
} }
@ -465,12 +446,7 @@ async fn apply_iptables_rules_for_binary(
return Ok(()); return Ok(());
} }
let chain = "TELEMT_NOTRACK"; let chain = "TELEMT_NOTRACK";
let _ = run_command( let _ = run_command(binary, &["-t", "raw", "-D", "PREROUTING", "-j", chain], None).await;
binary,
&["-t", "raw", "-D", "PREROUTING", "-j", chain],
None,
)
.await;
let _ = run_command(binary, &["-t", "raw", "-F", chain], None).await; let _ = run_command(binary, &["-t", "raw", "-F", chain], None).await;
let _ = run_command(binary, &["-t", "raw", "-X", chain], None).await; let _ = run_command(binary, &["-t", "raw", "-X", chain], None).await;
@ -480,20 +456,8 @@ async fn apply_iptables_rules_for_binary(
run_command(binary, &["-t", "raw", "-N", chain], None).await?; run_command(binary, &["-t", "raw", "-N", chain], None).await?;
run_command(binary, &["-t", "raw", "-F", chain], None).await?; run_command(binary, &["-t", "raw", "-F", chain], None).await?;
if run_command( if run_command(binary, &["-t", "raw", "-C", "PREROUTING", "-j", chain], None).await.is_err() {
binary, run_command(binary, &["-t", "raw", "-I", "PREROUTING", "1", "-j", chain], None).await?;
&["-t", "raw", "-C", "PREROUTING", "-j", chain],
None,
)
.await
.is_err()
{
run_command(
binary,
&["-t", "raw", "-I", "PREROUTING", "1", "-j", chain],
None,
)
.await?;
} }
let (v4_targets, v6_targets) = notrack_targets(cfg); let (v4_targets, v6_targets) = notrack_targets(cfg);
@ -523,26 +487,11 @@ async fn apply_iptables_rules_for_binary(
} }
async fn clear_notrack_rules_all_backends() { async fn clear_notrack_rules_all_backends() {
let _ = run_command( let _ = run_command("nft", &["delete", "table", "inet", "telemt_conntrack"], None).await;
"nft", let _ = run_command("iptables", &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], None).await;
&["delete", "table", "inet", "telemt_conntrack"],
None,
)
.await;
let _ = run_command(
"iptables",
&["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"],
None,
)
.await;
let _ = run_command("iptables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await; let _ = run_command("iptables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await;
let _ = run_command("iptables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await; let _ = run_command("iptables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await;
let _ = run_command( let _ = run_command("ip6tables", &["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"], None).await;
"ip6tables",
&["-t", "raw", "-D", "PREROUTING", "-j", "TELEMT_NOTRACK"],
None,
)
.await;
let _ = run_command("ip6tables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await; let _ = run_command("ip6tables", &["-t", "raw", "-F", "TELEMT_NOTRACK"], None).await;
let _ = run_command("ip6tables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await; let _ = run_command("ip6tables", &["-t", "raw", "-X", "TELEMT_NOTRACK"], None).await;
} }

View File

@ -88,10 +88,8 @@ pub fn init_logging(
// Use a custom fmt layer that writes to syslog // Use a custom fmt layer that writes to syslog
let fmt_layer = fmt::Layer::default() let fmt_layer = fmt::Layer::default()
.with_ansi(false) .with_ansi(false)
.with_target(false) .with_target(true)
.with_level(false) .with_writer(SyslogWriter::new);
.without_time()
.with_writer(SyslogMakeWriter::new());
tracing_subscriber::registry() tracing_subscriber::registry()
.with(filter_layer) .with(filter_layer)
@ -139,17 +137,12 @@ pub fn init_logging(
/// Syslog writer for tracing. /// Syslog writer for tracing.
#[cfg(unix)] #[cfg(unix)]
#[derive(Clone, Copy)]
struct SyslogMakeWriter;
#[cfg(unix)]
#[derive(Clone, Copy)]
struct SyslogWriter { struct SyslogWriter {
priority: libc::c_int, _private: (),
} }
#[cfg(unix)] #[cfg(unix)]
impl SyslogMakeWriter { impl SyslogWriter {
fn new() -> Self { fn new() -> Self {
// Open syslog connection on first use // Open syslog connection on first use
static INIT: std::sync::Once = std::sync::Once::new(); static INIT: std::sync::Once = std::sync::Once::new();
@ -160,18 +153,7 @@ impl SyslogMakeWriter {
libc::openlog(ident, libc::LOG_PID | libc::LOG_NDELAY, libc::LOG_DAEMON); libc::openlog(ident, libc::LOG_PID | libc::LOG_NDELAY, libc::LOG_DAEMON);
} }
}); });
Self Self { _private: () }
}
}
#[cfg(unix)]
fn syslog_priority_for_level(level: &tracing::Level) -> libc::c_int {
match *level {
tracing::Level::ERROR => libc::LOG_ERR,
tracing::Level::WARN => libc::LOG_WARNING,
tracing::Level::INFO => libc::LOG_INFO,
tracing::Level::DEBUG => libc::LOG_DEBUG,
tracing::Level::TRACE => libc::LOG_DEBUG,
} }
} }
@ -186,13 +168,26 @@ impl std::io::Write for SyslogWriter {
return Ok(buf.len()); return Ok(buf.len());
} }
// Determine priority based on log level in the message
let priority = if msg.contains(" ERROR ") || msg.contains(" error ") {
libc::LOG_ERR
} else if msg.contains(" WARN ") || msg.contains(" warn ") {
libc::LOG_WARNING
} else if msg.contains(" INFO ") || msg.contains(" info ") {
libc::LOG_INFO
} else if msg.contains(" DEBUG ") || msg.contains(" debug ") {
libc::LOG_DEBUG
} else {
libc::LOG_INFO
};
// Write to syslog // Write to syslog
let c_msg = std::ffi::CString::new(msg.as_bytes()) let c_msg = std::ffi::CString::new(msg.as_bytes())
.unwrap_or_else(|_| std::ffi::CString::new("(invalid utf8)").unwrap()); .unwrap_or_else(|_| std::ffi::CString::new("(invalid utf8)").unwrap());
unsafe { unsafe {
libc::syslog( libc::syslog(
self.priority, priority,
b"%s\0".as_ptr() as *const libc::c_char, b"%s\0".as_ptr() as *const libc::c_char,
c_msg.as_ptr(), c_msg.as_ptr(),
); );
@ -207,19 +202,11 @@ impl std::io::Write for SyslogWriter {
} }
#[cfg(unix)] #[cfg(unix)]
impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for SyslogMakeWriter { impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for SyslogWriter {
type Writer = SyslogWriter; type Writer = SyslogWriter;
fn make_writer(&'a self) -> Self::Writer { fn make_writer(&'a self) -> Self::Writer {
SyslogWriter { SyslogWriter::new()
priority: libc::LOG_INFO,
}
}
fn make_writer_for(&'a self, meta: &tracing::Metadata<'_>) -> Self::Writer {
SyslogWriter {
priority: syslog_priority_for_level(meta.level()),
}
} }
} }
@ -315,29 +302,4 @@ mod tests {
LogDestination::Syslog LogDestination::Syslog
)); ));
} }
#[cfg(unix)]
#[test]
fn test_syslog_priority_for_level_mapping() {
assert_eq!(
syslog_priority_for_level(&tracing::Level::ERROR),
libc::LOG_ERR
);
assert_eq!(
syslog_priority_for_level(&tracing::Level::WARN),
libc::LOG_WARNING
);
assert_eq!(
syslog_priority_for_level(&tracing::Level::INFO),
libc::LOG_INFO
);
assert_eq!(
syslog_priority_for_level(&tracing::Level::DEBUG),
libc::LOG_DEBUG
);
assert_eq!(
syslog_priority_for_level(&tracing::Level::TRACE),
libc::LOG_DEBUG
);
}
} }

View File

@ -18,38 +18,19 @@ use crate::transport::middle_proxy::{
pub(crate) fn resolve_runtime_config_path( pub(crate) fn resolve_runtime_config_path(
config_path_cli: &str, config_path_cli: &str,
startup_cwd: &std::path::Path, startup_cwd: &std::path::Path,
config_path_explicit: bool,
) -> PathBuf { ) -> PathBuf {
if config_path_explicit { let raw = PathBuf::from(config_path_cli);
let raw = PathBuf::from(config_path_cli); let absolute = if raw.is_absolute() {
let absolute = if raw.is_absolute() { raw
raw } else {
} else { startup_cwd.join(raw)
startup_cwd.join(raw) };
}; absolute.canonicalize().unwrap_or(absolute)
return absolute.canonicalize().unwrap_or(absolute);
}
let etc_telemt = std::path::Path::new("/etc/telemt");
let candidates = [
startup_cwd.join("config.toml"),
startup_cwd.join("telemt.toml"),
etc_telemt.join("telemt.toml"),
etc_telemt.join("config.toml"),
];
for candidate in candidates {
if candidate.is_file() {
return candidate.canonicalize().unwrap_or(candidate);
}
}
startup_cwd.join("config.toml")
} }
/// Parsed CLI arguments. /// Parsed CLI arguments.
pub(crate) struct CliArgs { pub(crate) struct CliArgs {
pub config_path: String, pub config_path: String,
pub config_path_explicit: bool,
pub data_path: Option<PathBuf>, pub data_path: Option<PathBuf>,
pub silent: bool, pub silent: bool,
pub log_level: Option<String>, pub log_level: Option<String>,
@ -58,7 +39,6 @@ pub(crate) struct CliArgs {
pub(crate) fn parse_cli() -> CliArgs { pub(crate) fn parse_cli() -> CliArgs {
let mut config_path = "config.toml".to_string(); let mut config_path = "config.toml".to_string();
let mut config_path_explicit = false;
let mut data_path: Option<PathBuf> = None; let mut data_path: Option<PathBuf> = None;
let mut silent = false; let mut silent = false;
let mut log_level: Option<String> = None; let mut log_level: Option<String> = None;
@ -94,20 +74,6 @@ pub(crate) fn parse_cli() -> CliArgs {
s.trim_start_matches("--data-path=").to_string(), s.trim_start_matches("--data-path=").to_string(),
)); ));
} }
"--working-dir" => {
i += 1;
if i < args.len() {
data_path = Some(PathBuf::from(args[i].clone()));
} else {
eprintln!("Missing value for --working-dir");
std::process::exit(0);
}
}
s if s.starts_with("--working-dir=") => {
data_path = Some(PathBuf::from(
s.trim_start_matches("--working-dir=").to_string(),
));
}
"--silent" | "-s" => { "--silent" | "-s" => {
silent = true; silent = true;
} }
@ -145,12 +111,14 @@ pub(crate) fn parse_cli() -> CliArgs {
i += 1; i += 1;
} }
} }
s if !s.starts_with('-') => { s if s.starts_with("--working-dir") => {
if !matches!(s, "run" | "start" | "stop" | "reload" | "status") { if !s.contains('=') {
config_path = s.to_string(); i += 1;
config_path_explicit = true;
} }
} }
s if !s.starts_with('-') => {
config_path = s.to_string();
}
other => { other => {
eprintln!("Unknown option: {}", other); eprintln!("Unknown option: {}", other);
} }
@ -160,7 +128,6 @@ pub(crate) fn parse_cli() -> CliArgs {
CliArgs { CliArgs {
config_path, config_path,
config_path_explicit,
data_path, data_path,
silent, silent,
log_level, log_level,
@ -185,7 +152,6 @@ fn print_help() {
eprintln!( eprintln!(
" --data-path <DIR> Set data directory (absolute path; overrides config value)" " --data-path <DIR> Set data directory (absolute path; overrides config value)"
); );
eprintln!(" --working-dir <DIR> Alias for --data-path");
eprintln!(" --silent, -s Suppress info logs"); eprintln!(" --silent, -s Suppress info logs");
eprintln!(" --log-level <LEVEL> debug|verbose|normal|silent"); eprintln!(" --log-level <LEVEL> debug|verbose|normal|silent");
eprintln!(" --help, -h Show this help"); eprintln!(" --help, -h Show this help");
@ -244,7 +210,7 @@ mod tests {
let target = startup_cwd.join("config.toml"); let target = startup_cwd.join("config.toml");
std::fs::write(&target, " ").unwrap(); std::fs::write(&target, " ").unwrap();
let resolved = resolve_runtime_config_path("config.toml", &startup_cwd, true); let resolved = resolve_runtime_config_path("config.toml", &startup_cwd);
assert_eq!(resolved, target.canonicalize().unwrap()); assert_eq!(resolved, target.canonicalize().unwrap());
let _ = std::fs::remove_file(&target); let _ = std::fs::remove_file(&target);
@ -260,45 +226,11 @@ mod tests {
let startup_cwd = std::env::temp_dir().join(format!("telemt_cfg_path_missing_{nonce}")); let startup_cwd = std::env::temp_dir().join(format!("telemt_cfg_path_missing_{nonce}"));
std::fs::create_dir_all(&startup_cwd).unwrap(); std::fs::create_dir_all(&startup_cwd).unwrap();
let resolved = resolve_runtime_config_path("missing.toml", &startup_cwd, true); let resolved = resolve_runtime_config_path("missing.toml", &startup_cwd);
assert_eq!(resolved, startup_cwd.join("missing.toml")); assert_eq!(resolved, startup_cwd.join("missing.toml"));
let _ = std::fs::remove_dir(&startup_cwd); let _ = std::fs::remove_dir(&startup_cwd);
} }
#[test]
fn resolve_runtime_config_path_uses_startup_candidates_when_not_explicit() {
let nonce = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let startup_cwd =
std::env::temp_dir().join(format!("telemt_cfg_startup_candidates_{nonce}"));
std::fs::create_dir_all(&startup_cwd).unwrap();
let telemt = startup_cwd.join("telemt.toml");
std::fs::write(&telemt, " ").unwrap();
let resolved = resolve_runtime_config_path("config.toml", &startup_cwd, false);
assert_eq!(resolved, telemt.canonicalize().unwrap());
let _ = std::fs::remove_file(&telemt);
let _ = std::fs::remove_dir(&startup_cwd);
}
#[test]
fn resolve_runtime_config_path_defaults_to_startup_config_when_none_found() {
let nonce = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let startup_cwd = std::env::temp_dir().join(format!("telemt_cfg_startup_default_{nonce}"));
std::fs::create_dir_all(&startup_cwd).unwrap();
let resolved = resolve_runtime_config_path("config.toml", &startup_cwd, false);
assert_eq!(resolved, startup_cwd.join("config.toml"));
let _ = std::fs::remove_dir(&startup_cwd);
}
} }
pub(crate) fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) { pub(crate) fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {

View File

@ -28,8 +28,8 @@ use tracing::{error, info, warn};
use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload}; use tracing_subscriber::{EnvFilter, fmt, prelude::*, reload};
use crate::api; use crate::api;
use crate::config::{LogLevel, ProxyConfig};
use crate::conntrack_control; use crate::conntrack_control;
use crate::config::{LogLevel, ProxyConfig};
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::ip_tracker::UserIpTracker; use crate::ip_tracker::UserIpTracker;
use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe}; use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe};
@ -112,7 +112,6 @@ async fn run_inner(
.await; .await;
let cli_args = parse_cli(); let cli_args = parse_cli();
let config_path_cli = cli_args.config_path; let config_path_cli = cli_args.config_path;
let config_path_explicit = cli_args.config_path_explicit;
let data_path = cli_args.data_path; let data_path = cli_args.data_path;
let cli_silent = cli_args.silent; let cli_silent = cli_args.silent;
let cli_log_level = cli_args.log_level; let cli_log_level = cli_args.log_level;
@ -124,8 +123,7 @@ async fn run_inner(
std::process::exit(1); std::process::exit(1);
} }
}; };
let mut config_path = let config_path = resolve_runtime_config_path(&config_path_cli, &startup_cwd);
resolve_runtime_config_path(&config_path_cli, &startup_cwd, config_path_explicit);
let mut config = match ProxyConfig::load(&config_path) { let mut config = match ProxyConfig::load(&config_path) {
Ok(c) => c, Ok(c) => c,
@ -135,99 +133,11 @@ async fn run_inner(
std::process::exit(1); std::process::exit(1);
} else { } else {
let default = ProxyConfig::default(); let default = ProxyConfig::default();
std::fs::write(&config_path, toml::to_string_pretty(&default).unwrap()).unwrap();
let serialized = eprintln!(
match toml::to_string_pretty(&default).or_else(|_| toml::to_string(&default)) { "[telemt] Created default config at {}",
Ok(value) => Some(value), config_path.display()
Err(serialize_error) => { );
eprintln!(
"[telemt] Warning: failed to serialize default config: {}",
serialize_error
);
None
}
};
if config_path_explicit {
if let Some(serialized) = serialized.as_ref() {
if let Err(write_error) = std::fs::write(&config_path, serialized) {
eprintln!(
"[telemt] Error: failed to create explicit config at {}: {}",
config_path.display(),
write_error
);
std::process::exit(1);
}
eprintln!(
"[telemt] Created default config at {}",
config_path.display()
);
} else {
eprintln!(
"[telemt] Warning: running with in-memory default config without writing to disk"
);
}
} else {
let system_dir = std::path::Path::new("/etc/telemt");
let system_config_path = system_dir.join("telemt.toml");
let startup_config_path = startup_cwd.join("config.toml");
let mut persisted = false;
if let Some(serialized) = serialized.as_ref() {
match std::fs::create_dir_all(system_dir) {
Ok(()) => match std::fs::write(&system_config_path, serialized) {
Ok(()) => {
config_path = system_config_path;
eprintln!(
"[telemt] Created default config at {}",
config_path.display()
);
persisted = true;
}
Err(write_error) => {
eprintln!(
"[telemt] Warning: failed to write default config at {}: {}",
system_config_path.display(),
write_error
);
}
},
Err(create_error) => {
eprintln!(
"[telemt] Warning: failed to create {}: {}",
system_dir.display(),
create_error
);
}
}
if !persisted {
match std::fs::write(&startup_config_path, serialized) {
Ok(()) => {
config_path = startup_config_path;
eprintln!(
"[telemt] Created default config at {}",
config_path.display()
);
persisted = true;
}
Err(write_error) => {
eprintln!(
"[telemt] Warning: failed to write default config at {}: {}",
startup_config_path.display(),
write_error
);
}
}
}
}
if !persisted {
eprintln!(
"[telemt] Warning: running with in-memory default config without writing to disk"
);
}
}
default default
} }
} }

View File

@ -2,8 +2,8 @@
mod api; mod api;
mod cli; mod cli;
mod config;
mod conntrack_control; mod conntrack_control;
mod config;
mod crypto; mod crypto;
#[cfg(unix)] #[cfg(unix)]
mod daemon; mod daemon;

View File

@ -246,9 +246,7 @@ pub fn seed_tier_for_user(user: &str) -> AdaptiveTier {
if now.saturating_duration_since(value.seen_at) <= PROFILE_TTL { if now.saturating_duration_since(value.seen_at) <= PROFILE_TTL {
return value.tier; return value.tier;
} }
profiles().remove_if(user, |_, v| { profiles().remove_if(user, |_, v| now.saturating_duration_since(v.seen_at) > PROFILE_TTL);
now.saturating_duration_since(v.seen_at) > PROFILE_TTL
});
} }
AdaptiveTier::Base AdaptiveTier::Base
} }

View File

@ -518,15 +518,15 @@ where
); );
return Err(ProxyError::Io(e)); return Err(ProxyError::Io(e));
} }
Err(_) => { Err(_) => {
debug!( debug!(
peer = %real_peer, peer = %real_peer,
idle_secs = first_byte_idle_secs, idle_secs = first_byte_idle_secs,
"Closing idle pooled connection before first client byte" "Closing idle pooled connection before first client byte"
); );
return Ok(()); return Ok(());
}
} }
}
}; };
let handshake_timeout = handshake_timeout_with_mask_grace(&config); let handshake_timeout = handshake_timeout_with_mask_grace(&config);

View File

@ -17,13 +17,13 @@ use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::protocol::constants::*; use crate::protocol::constants::*;
use crate::proxy::handshake::{HandshakeSuccess, encrypt_tg_nonce_with_ciphers, generate_tg_nonce}; use crate::proxy::handshake::{HandshakeSuccess, encrypt_tg_nonce_with_ciphers, generate_tg_nonce};
use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
};
use crate::proxy::route_mode::{ use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state, ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
cutover_stagger_delay, cutover_stagger_delay,
}; };
use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
};
use crate::stats::Stats; use crate::stats::Stats;
use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
use crate::transport::UpstreamManager; use crate::transport::UpstreamManager;

View File

@ -118,11 +118,7 @@ fn auth_probe_state_expired(state: &AuthProbeState, now: Instant) -> bool {
now.duration_since(state.last_seen) > retention now.duration_since(state.last_seen) > retention
} }
fn auth_probe_eviction_offset_in( fn auth_probe_eviction_offset_in(shared: &ProxySharedState, peer_ip: IpAddr, now: Instant) -> usize {
shared: &ProxySharedState,
peer_ip: IpAddr,
now: Instant,
) -> usize {
let hasher_state = &shared.handshake.auth_probe_eviction_hasher; let hasher_state = &shared.handshake.auth_probe_eviction_hasher;
let mut hasher = hasher_state.build_hasher(); let mut hasher = hasher_state.build_hasher();
peer_ip.hash(&mut hasher); peer_ip.hash(&mut hasher);

View File

@ -255,11 +255,7 @@ async fn wait_mask_connect_budget(started: Instant) {
// sigma is chosen so ~99% of raw samples land inside [floor, ceiling] before clamp. // sigma is chosen so ~99% of raw samples land inside [floor, ceiling] before clamp.
// When floor > ceiling (misconfiguration), returns ceiling (the smaller value). // When floor > ceiling (misconfiguration), returns ceiling (the smaller value).
// When floor == ceiling, returns that value. When both are 0, returns 0. // When floor == ceiling, returns that value. When both are 0, returns 0.
pub(crate) fn sample_lognormal_percentile_bounded( pub(crate) fn sample_lognormal_percentile_bounded(floor: u64, ceiling: u64, rng: &mut impl Rng) -> u64 {
floor: u64,
ceiling: u64,
rng: &mut impl Rng,
) -> u64 {
if ceiling == 0 && floor == 0 { if ceiling == 0 && floor == 0 {
return 0; return 0;
} }
@ -300,9 +296,7 @@ fn mask_outcome_target_budget(config: &ProxyConfig) -> Duration {
} }
if ceiling > floor { if ceiling > floor {
let mut rng = rand::rng(); let mut rng = rand::rng();
return Duration::from_millis(sample_lognormal_percentile_bounded( return Duration::from_millis(sample_lognormal_percentile_bounded(floor, ceiling, &mut rng));
floor, ceiling, &mut rng,
));
} }
// ceiling <= floor: use the larger value (fail-closed: preserve longer delay) // ceiling <= floor: use the larger value (fail-closed: preserve longer delay)
return Duration::from_millis(floor.max(ceiling)); return Duration::from_millis(floor.max(ceiling));

View File

@ -3,12 +3,12 @@ use std::collections::hash_map::DefaultHasher;
use std::collections::{BTreeSet, HashMap}; use std::collections::{BTreeSet, HashMap};
#[cfg(test)] #[cfg(test)]
use std::future::Future; use std::future::Future;
use std::hash::{BuildHasher, Hash};
#[cfg(test)] #[cfg(test)]
use std::hash::Hasher; use std::hash::Hasher;
use std::hash::{BuildHasher, Hash};
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
@ -21,13 +21,13 @@ use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::protocol::constants::{secure_padding_len, *}; use crate::protocol::constants::{secure_padding_len, *};
use crate::proxy::handshake::HandshakeSuccess; use crate::proxy::handshake::HandshakeSuccess;
use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
};
use crate::proxy::route_mode::{ use crate::proxy::route_mode::{
ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state, ROUTE_SWITCH_ERROR_MSG, RelayRouteMode, RouteCutoverState, affected_cutover_state,
cutover_stagger_delay, cutover_stagger_delay,
}; };
use crate::proxy::shared_state::{
ConntrackCloseEvent, ConntrackClosePublishResult, ConntrackCloseReason, ProxySharedState,
};
use crate::stats::{ use crate::stats::{
MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, QuotaReserveError, Stats, UserStats, MeD2cFlushReason, MeD2cQuotaRejectStage, MeD2cWriteMode, QuotaReserveError, Stats, UserStats,
}; };
@ -257,7 +257,9 @@ impl RelayClientIdlePolicy {
if self.soft_idle > self.hard_idle { if self.soft_idle > self.hard_idle {
self.soft_idle = self.hard_idle; self.soft_idle = self.hard_idle;
} }
self.legacy_frame_read_timeout = self.legacy_frame_read_timeout.min(pressure_hard_idle_cap); self.legacy_frame_read_timeout = self
.legacy_frame_read_timeout
.min(pressure_hard_idle_cap);
if self.grace_after_downstream_activity > self.hard_idle { if self.grace_after_downstream_activity > self.hard_idle {
self.grace_after_downstream_activity = self.hard_idle; self.grace_after_downstream_activity = self.hard_idle;
} }
@ -459,15 +461,12 @@ fn report_desync_frame_too_large_in(
.map(|b| matches!(b[0], b'G' | b'P' | b'H' | b'C' | b'D')) .map(|b| matches!(b[0], b'G' | b'P' | b'H' | b'C' | b'D'))
.unwrap_or(false); .unwrap_or(false);
let now = Instant::now(); let now = Instant::now();
let dedup_key = hash_value_in( let dedup_key = hash_value_in(shared, &(
shared, state.user.as_str(),
&( state.peer_hash,
state.user.as_str(), proto_tag,
state.peer_hash, DESYNC_ERROR_CLASS,
proto_tag, ));
DESYNC_ERROR_CLASS,
),
);
let emit_full = should_emit_full_desync_in(shared, dedup_key, state.desync_all_full, now); let emit_full = should_emit_full_desync_in(shared, dedup_key, state.desync_all_full, now);
let duration_ms = state.started_at.elapsed().as_millis() as u64; let duration_ms = state.started_at.elapsed().as_millis() as u64;
let bytes_me2c = state.bytes_me2c.load(Ordering::Relaxed); let bytes_me2c = state.bytes_me2c.load(Ordering::Relaxed);
@ -632,10 +631,7 @@ fn observe_me_d2c_flush_event(
} }
#[cfg(test)] #[cfg(test)]
pub(crate) fn mark_relay_idle_candidate_for_testing( pub(crate) fn mark_relay_idle_candidate_for_testing(shared: &ProxySharedState, conn_id: u64) -> bool {
shared: &ProxySharedState,
conn_id: u64,
) -> bool {
let registry = &shared.middle_relay.relay_idle_registry; let registry = &shared.middle_relay.relay_idle_registry;
let mut guard = match registry.lock() { let mut guard = match registry.lock() {
Ok(guard) => guard, Ok(guard) => guard,
@ -720,10 +716,7 @@ pub(crate) fn relay_pressure_event_seq_for_testing(shared: &ProxySharedState) ->
#[cfg(test)] #[cfg(test)]
pub(crate) fn relay_idle_mark_seq_for_testing(shared: &ProxySharedState) -> u64 { pub(crate) fn relay_idle_mark_seq_for_testing(shared: &ProxySharedState) -> u64 {
shared shared.middle_relay.relay_idle_mark_seq.load(Ordering::Relaxed)
.middle_relay
.relay_idle_mark_seq
.load(Ordering::Relaxed)
} }
#[cfg(test)] #[cfg(test)]
@ -872,7 +865,10 @@ pub(crate) fn desync_dedup_insert_for_testing(shared: &ProxySharedState, key: u6
} }
#[cfg(test)] #[cfg(test)]
pub(crate) fn desync_dedup_get_for_testing(shared: &ProxySharedState, key: u64) -> Option<Instant> { pub(crate) fn desync_dedup_get_for_testing(
shared: &ProxySharedState,
key: u64,
) -> Option<Instant> {
shared shared
.middle_relay .middle_relay
.desync_dedup .desync_dedup
@ -881,9 +877,7 @@ pub(crate) fn desync_dedup_get_for_testing(shared: &ProxySharedState, key: u64)
} }
#[cfg(test)] #[cfg(test)]
pub(crate) fn desync_dedup_keys_for_testing( pub(crate) fn desync_dedup_keys_for_testing(shared: &ProxySharedState) -> std::collections::HashSet<u64> {
shared: &ProxySharedState,
) -> std::collections::HashSet<u64> {
shared shared
.middle_relay .middle_relay
.desync_dedup .desync_dedup

View File

@ -8,7 +8,7 @@ use std::time::Instant;
use dashmap::DashMap; use dashmap::DashMap;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::proxy::handshake::{AuthProbeSaturationState, AuthProbeState}; use crate::proxy::handshake::{AuthProbeState, AuthProbeSaturationState};
use crate::proxy::middle_relay::{DesyncDedupRotationState, RelayIdleCandidateRegistry}; use crate::proxy::middle_relay::{DesyncDedupRotationState, RelayIdleCandidateRegistry};
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -136,8 +136,7 @@ impl ProxySharedState {
} }
pub(crate) fn set_conntrack_pressure_active(&self, active: bool) { pub(crate) fn set_conntrack_pressure_active(&self, active: bool) {
self.conntrack_pressure_active self.conntrack_pressure_active.store(active, Ordering::Relaxed);
.store(active, Ordering::Relaxed);
} }
pub(crate) fn conntrack_pressure_active(&self) -> bool { pub(crate) fn conntrack_pressure_active(&self) -> bool {

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
static RACE_TEST_KEY_COUNTER: AtomicUsize = AtomicUsize::new(1_000_000); static RACE_TEST_KEY_COUNTER: AtomicUsize = AtomicUsize::new(1_000_000);

View File

@ -65,15 +65,9 @@ fn adaptive_base_tier_buffers_unchanged() {
fn adaptive_tier1_buffers_within_caps() { fn adaptive_tier1_buffers_within_caps() {
let (c2s, s2c) = direct_copy_buffers_for_tier(AdaptiveTier::Tier1, 65536, 262144); let (c2s, s2c) = direct_copy_buffers_for_tier(AdaptiveTier::Tier1, 65536, 262144);
assert!(c2s > 65536, "Tier1 c2s should exceed Base"); assert!(c2s > 65536, "Tier1 c2s should exceed Base");
assert!( assert!(c2s <= 128 * 1024, "Tier1 c2s should not exceed DIRECT_C2S_CAP_BYTES");
c2s <= 128 * 1024,
"Tier1 c2s should not exceed DIRECT_C2S_CAP_BYTES"
);
assert!(s2c > 262144, "Tier1 s2c should exceed Base"); assert!(s2c > 262144, "Tier1 s2c should exceed Base");
assert!( assert!(s2c <= 512 * 1024, "Tier1 s2c should not exceed DIRECT_S2C_CAP_BYTES");
s2c <= 512 * 1024,
"Tier1 s2c should not exceed DIRECT_S2C_CAP_BYTES"
);
} }
#[test] #[test]

View File

@ -19,8 +19,7 @@ fn adversarial_large_state_offsets_escape_first_scan_window() {
((i.wrapping_mul(131)) & 0xff) as u8, ((i.wrapping_mul(131)) & 0xff) as u8,
)); ));
let now = base + Duration::from_nanos(i); let now = base + Duration::from_nanos(i);
let start = let start = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
if start >= scan_limit { if start >= scan_limit {
saw_offset_outside_first_window = true; saw_offset_outside_first_window = true;
break; break;
@ -49,8 +48,7 @@ fn stress_large_state_offsets_cover_many_scan_windows() {
((i.wrapping_mul(17)) & 0xff) as u8, ((i.wrapping_mul(17)) & 0xff) as u8,
)); ));
let now = base + Duration::from_micros(i); let now = base + Duration::from_micros(i);
let start = let start = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
covered_windows.insert(start / scan_limit); covered_windows.insert(start / scan_limit);
} }
@ -82,8 +80,7 @@ fn light_fuzz_offset_always_stays_inside_state_len() {
let state_len = ((seed >> 16) as usize % 200_000).saturating_add(1); let state_len = ((seed >> 16) as usize % 200_000).saturating_add(1);
let scan_limit = ((seed >> 40) as usize % 2_048).saturating_add(1); let scan_limit = ((seed >> 40) as usize % 2_048).saturating_add(1);
let now = base + Duration::from_nanos(seed & 0x0fff); let now = base + Duration::from_nanos(seed & 0x0fff);
let start = let start = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
assert!( assert!(
start < state_len, start < state_len,

View File

@ -87,11 +87,7 @@ fn adversarial_saturation_grace_requires_extra_failures_before_preauth_throttle(
} }
assert!( assert!(
auth_probe_should_apply_preauth_throttle_in( auth_probe_should_apply_preauth_throttle_in(shared.as_ref(), ip, now + Duration::from_millis(1)),
shared.as_ref(),
ip,
now + Duration::from_millis(1)
),
"after grace failures are exhausted, preauth throttle must activate" "after grace failures are exhausted, preauth throttle must activate"
); );
} }
@ -138,11 +134,7 @@ fn light_fuzz_randomized_failures_preserve_cap_and_nonzero_streaks() {
(seed >> 8) as u8, (seed >> 8) as u8,
seed as u8, seed as u8,
)); ));
auth_probe_record_failure_in( auth_probe_record_failure_in(shared.as_ref(), ip, now + Duration::from_millis((seed & 0x3f) as u64));
shared.as_ref(),
ip,
now + Duration::from_millis((seed & 0x3f) as u64),
);
} }
let state = auth_probe_state_for_testing_in_shared(shared.as_ref()); let state = auth_probe_state_for_testing_in_shared(shared.as_ref());
@ -170,11 +162,7 @@ async fn stress_parallel_failure_flood_keeps_state_hard_capped() {
((i >> 8) & 0xff) as u8, ((i >> 8) & 0xff) as u8,
(i & 0xff) as u8, (i & 0xff) as u8,
)); ));
auth_probe_record_failure_in( auth_probe_record_failure_in(shared.as_ref(), ip, start + Duration::from_millis((i % 4) as u64));
shared.as_ref(),
ip,
start + Duration::from_millis((i % 4) as u64),
);
} }
})); }));
} }

View File

@ -31,8 +31,7 @@ fn adversarial_large_state_must_allow_start_offset_outside_scan_budget_window()
(i & 0xff) as u8, (i & 0xff) as u8,
)); ));
let now = base + Duration::from_micros(i as u64); let now = base + Duration::from_micros(i as u64);
let start = let start = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
assert!( assert!(
start < state_len, start < state_len,
"start offset must stay within state length; start={start}, len={state_len}" "start offset must stay within state length; start={start}, len={state_len}"
@ -84,8 +83,7 @@ fn light_fuzz_scan_offset_budget_never_exceeds_effective_window() {
let state_len = ((seed >> 8) as usize % 131_072).saturating_add(1); let state_len = ((seed >> 8) as usize % 131_072).saturating_add(1);
let scan_limit = ((seed >> 32) as usize % 512).saturating_add(1); let scan_limit = ((seed >> 32) as usize % 512).saturating_add(1);
let now = base + Duration::from_nanos(seed & 0xffff); let now = base + Duration::from_nanos(seed & 0xffff);
let start = let start = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
assert!( assert!(
start < state_len, start < state_len,

View File

@ -36,13 +36,7 @@ fn adversarial_many_ips_same_time_spreads_offsets_without_bias_collapse() {
i as u8, i as u8,
(255 - (i as u8)), (255 - (i as u8)),
)); ));
uniq.insert(auth_probe_scan_start_offset_in( uniq.insert(auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, 65_536, 16));
shared.as_ref(),
ip,
now,
65_536,
16,
));
} }
assert!( assert!(
@ -69,11 +63,7 @@ async fn stress_parallel_failure_churn_under_saturation_remains_capped_and_live(
((i >> 8) & 0xff) as u8, ((i >> 8) & 0xff) as u8,
(i & 0xff) as u8, (i & 0xff) as u8,
)); ));
auth_probe_record_failure_in( auth_probe_record_failure_in(shared.as_ref(), ip, start + Duration::from_micros((i % 128) as u64));
shared.as_ref(),
ip,
start + Duration::from_micros((i % 128) as u64),
);
} }
})); }));
} }
@ -83,17 +73,12 @@ async fn stress_parallel_failure_churn_under_saturation_remains_capped_and_live(
} }
assert!( assert!(
auth_probe_state_for_testing_in_shared(shared.as_ref()).len() auth_probe_state_for_testing_in_shared(shared.as_ref()).len() <= AUTH_PROBE_TRACK_MAX_ENTRIES,
<= AUTH_PROBE_TRACK_MAX_ENTRIES,
"state must remain hard-capped under parallel saturation churn" "state must remain hard-capped under parallel saturation churn"
); );
let probe = IpAddr::V4(Ipv4Addr::new(10, 4, 1, 1)); let probe = IpAddr::V4(Ipv4Addr::new(10, 4, 1, 1));
let _ = auth_probe_should_apply_preauth_throttle_in( let _ = auth_probe_should_apply_preauth_throttle_in(shared.as_ref(), probe, start + Duration::from_millis(1));
shared.as_ref(),
probe,
start + Duration::from_millis(1),
);
} }
#[test] #[test]
@ -117,8 +102,7 @@ fn light_fuzz_scan_offset_stays_within_window_for_randomized_inputs() {
let scan_limit = ((seed >> 40) as usize % 1024).saturating_add(1); let scan_limit = ((seed >> 40) as usize % 1024).saturating_add(1);
let now = base + Duration::from_nanos(seed & 0x1fff); let now = base + Duration::from_nanos(seed & 0x1fff);
let offset = let offset = auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
auth_probe_scan_start_offset_in(shared.as_ref(), ip, now, state_len, scan_limit);
assert!( assert!(
offset < state_len, offset < state_len,
"scan offset must always remain inside state length" "scan offset must always remain inside state length"

View File

@ -116,14 +116,8 @@ async fn handshake_baseline_auth_probe_streak_increments_per_ip() {
) )
.await; .await;
assert!(matches!(res, HandshakeResult::BadClient { .. })); assert!(matches!(res, HandshakeResult::BadClient { .. }));
assert_eq!( assert_eq!(auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()), Some(expected));
auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()), assert_eq!(auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), untouched_ip), None);
Some(expected)
);
assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), untouched_ip),
None
);
} }
} }
@ -155,8 +149,7 @@ fn handshake_baseline_repeated_probes_streak_monotonic() {
for _ in 0..100 { for _ in 0..100 {
auth_probe_record_failure_in(shared.as_ref(), ip, now); auth_probe_record_failure_in(shared.as_ref(), ip, now);
let current = let current = auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), ip).unwrap_or(0);
auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), ip).unwrap_or(0);
assert!(current >= prev, "streak must be monotonic"); assert!(current >= prev, "streak must be monotonic");
prev = current; prev = current;
} }
@ -180,16 +173,8 @@ fn handshake_baseline_throttled_ip_incurs_backoff_delay() {
let before_expiry = now + delay.saturating_sub(Duration::from_millis(1)); let before_expiry = now + delay.saturating_sub(Duration::from_millis(1));
let after_expiry = now + delay + Duration::from_millis(1); let after_expiry = now + delay + Duration::from_millis(1);
assert!(auth_probe_is_throttled_in( assert!(auth_probe_is_throttled_in(shared.as_ref(), ip, before_expiry));
shared.as_ref(), assert!(!auth_probe_is_throttled_in(shared.as_ref(), ip, after_expiry));
ip,
before_expiry
));
assert!(!auth_probe_is_throttled_in(
shared.as_ref(),
ip,
after_expiry
));
} }
#[tokio::test] #[tokio::test]
@ -227,10 +212,7 @@ async fn handshake_baseline_malformed_probe_frames_fail_closed_to_masking() {
.expect("malformed probe handling must complete in bounded time"); .expect("malformed probe handling must complete in bounded time");
assert!( assert!(
matches!( matches!(res, HandshakeResult::BadClient { .. } | HandshakeResult::Error(_)),
res,
HandshakeResult::BadClient { .. } | HandshakeResult::Error(_)
),
"malformed probe must fail closed" "malformed probe must fail closed"
); );
} }

View File

@ -332,13 +332,7 @@ async fn invalid_secret_warning_lock_contention_and_bound() {
b.wait().await; b.wait().await;
for i in 0..iterations_per_task { for i in 0..iterations_per_task {
let user_name = format!("contention_user_{}_{}", t, i); let user_name = format!("contention_user_{}_{}", t, i);
warn_invalid_secret_once_in( warn_invalid_secret_once_in(shared.as_ref(), &user_name, "invalid_hex", ACCESS_SECRET_BYTES, None);
shared.as_ref(),
&user_name,
"invalid_hex",
ACCESS_SECRET_BYTES,
None,
);
} }
})); }));
} }
@ -635,8 +629,7 @@ fn auth_probe_saturation_note_resets_retention_window() {
// This call may return false if backoff has elapsed, but it must not clear // This call may return false if backoff has elapsed, but it must not clear
// the saturation state because `later` refreshed last_seen. // the saturation state because `later` refreshed last_seen.
let _ = let _ = auth_probe_saturation_is_throttled_at_for_testing_in_shared(shared.as_ref(), check_time);
auth_probe_saturation_is_throttled_at_for_testing_in_shared(shared.as_ref(), check_time);
let guard = auth_probe_saturation_state_lock_for_testing_in_shared(shared.as_ref()); let guard = auth_probe_saturation_state_lock_for_testing_in_shared(shared.as_ref());
assert!( assert!(
guard.is_some(), guard.is_some(),

View File

@ -206,12 +206,7 @@ fn auth_probe_eviction_identical_timestamps_keeps_map_bounded() {
} }
let new_ip = IpAddr::V4(Ipv4Addr::new(192, 168, 21, 21)); let new_ip = IpAddr::V4(Ipv4Addr::new(192, 168, 21, 21));
auth_probe_record_failure_with_state_in( auth_probe_record_failure_with_state_in(shared.as_ref(), state, new_ip, same + Duration::from_millis(1));
shared.as_ref(),
state,
new_ip,
same + Duration::from_millis(1),
);
assert_eq!(state.len(), AUTH_PROBE_TRACK_MAX_ENTRIES); assert_eq!(state.len(), AUTH_PROBE_TRACK_MAX_ENTRIES);
assert!(state.contains_key(&new_ip)); assert!(state.contains_key(&new_ip));
@ -330,8 +325,7 @@ async fn saturation_grace_exhaustion_under_concurrency_keeps_peer_throttled() {
final_state.fail_streak final_state.fail_streak
>= AUTH_PROBE_BACKOFF_START_FAILS + AUTH_PROBE_SATURATION_GRACE_FAILS >= AUTH_PROBE_BACKOFF_START_FAILS + AUTH_PROBE_SATURATION_GRACE_FAILS
); );
assert!(auth_probe_should_apply_preauth_throttle_in( assert!(auth_probe_should_apply_preauth_throttle_in(shared.as_ref(),
shared.as_ref(),
peer_ip, peer_ip,
Instant::now() Instant::now()
)); ));

View File

@ -54,9 +54,7 @@ fn clear_auth_probe_state_clears_saturation_even_if_poisoned() {
poison_saturation_mutex(shared.as_ref()); poison_saturation_mutex(shared.as_ref());
auth_probe_note_saturation_in(shared.as_ref(), Instant::now()); auth_probe_note_saturation_in(shared.as_ref(), Instant::now());
assert!(auth_probe_saturation_is_throttled_for_testing_in_shared( assert!(auth_probe_saturation_is_throttled_for_testing_in_shared(shared.as_ref()));
shared.as_ref()
));
clear_auth_probe_state_for_testing_in_shared(shared.as_ref()); clear_auth_probe_state_for_testing_in_shared(shared.as_ref());
assert!( assert!(

View File

@ -1427,13 +1427,7 @@ fn invalid_secret_warning_cache_is_bounded() {
for idx in 0..(WARNED_SECRET_MAX_ENTRIES + 32) { for idx in 0..(WARNED_SECRET_MAX_ENTRIES + 32) {
let user = format!("warned_user_{idx}"); let user = format!("warned_user_{idx}");
warn_invalid_secret_once_in( warn_invalid_secret_once_in(shared.as_ref(), &user, "invalid_length", ACCESS_SECRET_BYTES, Some(idx));
shared.as_ref(),
&user,
"invalid_length",
ACCESS_SECRET_BYTES,
Some(idx),
);
} }
let warned = warned_secrets_for_testing_in_shared(shared.as_ref()); let warned = warned_secrets_for_testing_in_shared(shared.as_ref());
@ -1646,15 +1640,11 @@ fn unknown_sni_warn_cooldown_first_event_is_warn_and_repeated_events_are_info_un
"first unknown SNI event must be eligible for WARN emission" "first unknown SNI event must be eligible for WARN emission"
); );
assert!( assert!(
!should_emit_unknown_sni_warn_for_testing_in_shared( !should_emit_unknown_sni_warn_for_testing_in_shared(shared.as_ref(), now + Duration::from_secs(1)),
shared.as_ref(),
now + Duration::from_secs(1)
),
"events inside cooldown window must be demoted from WARN to INFO" "events inside cooldown window must be demoted from WARN to INFO"
); );
assert!( assert!(
should_emit_unknown_sni_warn_for_testing_in_shared( should_emit_unknown_sni_warn_for_testing_in_shared(shared.as_ref(),
shared.as_ref(),
now + Duration::from_secs(UNKNOWN_SNI_WARN_COOLDOWN_SECS) now + Duration::from_secs(UNKNOWN_SNI_WARN_COOLDOWN_SECS)
), ),
"once cooldown expires, next unknown SNI event must be WARN-eligible again" "once cooldown expires, next unknown SNI event must be WARN-eligible again"
@ -1735,12 +1725,7 @@ fn auth_probe_over_cap_churn_still_tracks_newcomer_after_round_limit() {
} }
let newcomer = IpAddr::V4(Ipv4Addr::new(203, 0, 114, 77)); let newcomer = IpAddr::V4(Ipv4Addr::new(203, 0, 114, 77));
auth_probe_record_failure_with_state_in( auth_probe_record_failure_with_state_in(shared.as_ref(), &state, newcomer, now + Duration::from_secs(1));
shared.as_ref(),
&state,
newcomer,
now + Duration::from_secs(1),
);
assert!( assert!(
state.get(&newcomer).is_some(), state.get(&newcomer).is_some(),
@ -1946,18 +1931,8 @@ fn auth_probe_ipv6_is_bucketed_by_prefix_64() {
let ip_a = IpAddr::V6("2001:db8:abcd:1234:1:2:3:4".parse().unwrap()); let ip_a = IpAddr::V6("2001:db8:abcd:1234:1:2:3:4".parse().unwrap());
let ip_b = IpAddr::V6("2001:db8:abcd:1234:ffff:eeee:dddd:cccc".parse().unwrap()); let ip_b = IpAddr::V6("2001:db8:abcd:1234:ffff:eeee:dddd:cccc".parse().unwrap());
auth_probe_record_failure_with_state_in( auth_probe_record_failure_with_state_in(shared.as_ref(), &state, normalize_auth_probe_ip(ip_a), now);
shared.as_ref(), auth_probe_record_failure_with_state_in(shared.as_ref(), &state, normalize_auth_probe_ip(ip_b), now);
&state,
normalize_auth_probe_ip(ip_a),
now,
);
auth_probe_record_failure_with_state_in(
shared.as_ref(),
&state,
normalize_auth_probe_ip(ip_b),
now,
);
let normalized = normalize_auth_probe_ip(ip_a); let normalized = normalize_auth_probe_ip(ip_a);
assert_eq!( assert_eq!(
@ -1981,18 +1956,8 @@ fn auth_probe_ipv6_different_prefixes_use_distinct_buckets() {
let ip_a = IpAddr::V6("2001:db8:1111:2222:1:2:3:4".parse().unwrap()); let ip_a = IpAddr::V6("2001:db8:1111:2222:1:2:3:4".parse().unwrap());
let ip_b = IpAddr::V6("2001:db8:1111:3333:1:2:3:4".parse().unwrap()); let ip_b = IpAddr::V6("2001:db8:1111:3333:1:2:3:4".parse().unwrap());
auth_probe_record_failure_with_state_in( auth_probe_record_failure_with_state_in(shared.as_ref(), &state, normalize_auth_probe_ip(ip_a), now);
shared.as_ref(), auth_probe_record_failure_with_state_in(shared.as_ref(), &state, normalize_auth_probe_ip(ip_b), now);
&state,
normalize_auth_probe_ip(ip_a),
now,
);
auth_probe_record_failure_with_state_in(
shared.as_ref(),
&state,
normalize_auth_probe_ip(ip_b),
now,
);
assert_eq!( assert_eq!(
state.len(), state.len(),
@ -2105,12 +2070,7 @@ fn auth_probe_round_limited_overcap_eviction_marks_saturation_and_keeps_newcomer
} }
let newcomer = IpAddr::V4(Ipv4Addr::new(203, 0, 113, 40)); let newcomer = IpAddr::V4(Ipv4Addr::new(203, 0, 113, 40));
auth_probe_record_failure_with_state_in( auth_probe_record_failure_with_state_in(shared.as_ref(), &state, newcomer, now + Duration::from_millis(1));
shared.as_ref(),
&state,
newcomer,
now + Duration::from_millis(1),
);
assert!( assert!(
state.get(&newcomer).is_some(), state.get(&newcomer).is_some(),
@ -2121,10 +2081,7 @@ fn auth_probe_round_limited_overcap_eviction_marks_saturation_and_keeps_newcomer
"high fail-streak sentinel must survive round-limited eviction" "high fail-streak sentinel must survive round-limited eviction"
); );
assert!( assert!(
auth_probe_saturation_is_throttled_at_for_testing_in_shared( auth_probe_saturation_is_throttled_at_for_testing_in_shared(shared.as_ref(), now + Duration::from_millis(1)),
shared.as_ref(),
now + Duration::from_millis(1)
),
"round-limited over-cap path must activate saturation throttle marker" "round-limited over-cap path must activate saturation throttle marker"
); );
} }
@ -2206,8 +2163,7 @@ fn stress_auth_probe_overcap_churn_does_not_starve_high_threat_sentinel_bucket()
((step >> 8) & 0xff) as u8, ((step >> 8) & 0xff) as u8,
(step & 0xff) as u8, (step & 0xff) as u8,
)); ));
auth_probe_record_failure_with_state_in( auth_probe_record_failure_with_state_in(shared.as_ref(),
shared.as_ref(),
&state, &state,
newcomer, newcomer,
base_now + Duration::from_millis(step as u64 + 1), base_now + Duration::from_millis(step as u64 + 1),
@ -2270,8 +2226,7 @@ fn light_fuzz_auth_probe_overcap_eviction_prefers_less_threatening_entries() {
((round >> 8) & 0xff) as u8, ((round >> 8) & 0xff) as u8,
(round & 0xff) as u8, (round & 0xff) as u8,
)); ));
auth_probe_record_failure_with_state_in( auth_probe_record_failure_with_state_in(shared.as_ref(),
shared.as_ref(),
&state, &state,
newcomer, newcomer,
now + Duration::from_millis(round as u64 + 1), now + Duration::from_millis(round as u64 + 1),
@ -3150,10 +3105,7 @@ async fn saturation_grace_boundary_still_admits_valid_tls_before_exhaustion() {
matches!(result, HandshakeResult::Success(_)), matches!(result, HandshakeResult::Success(_)),
"valid TLS should still pass while peer remains within saturation grace budget" "valid TLS should still pass while peer remains within saturation grace budget"
); );
assert_eq!( assert_eq!(auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()), None);
auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()),
None
);
} }
#[tokio::test] #[tokio::test]
@ -3219,10 +3171,7 @@ async fn saturation_grace_exhaustion_blocks_valid_tls_until_backoff_expires() {
matches!(allowed, HandshakeResult::Success(_)), matches!(allowed, HandshakeResult::Success(_)),
"valid TLS should recover after peer-specific pre-auth backoff has elapsed" "valid TLS should recover after peer-specific pre-auth backoff has elapsed"
); );
assert_eq!( assert_eq!(auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()), None);
auth_probe_fail_streak_for_testing_in_shared(shared.as_ref(), peer.ip()),
None
);
} }
#[tokio::test] #[tokio::test]

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
use rand::SeedableRng;
use rand::rngs::StdRng; use rand::rngs::StdRng;
use rand::SeedableRng;
fn seeded_rng(seed: u64) -> StdRng { fn seeded_rng(seed: u64) -> StdRng {
StdRng::seed_from_u64(seed) StdRng::seed_from_u64(seed)
@ -57,10 +57,7 @@ fn masking_lognormal_degenerate_floor_eq_ceiling_returns_floor() {
let mut rng = seeded_rng(99); let mut rng = seeded_rng(99);
for _ in 0..100 { for _ in 0..100 {
let val = sample_lognormal_percentile_bounded(1000, 1000, &mut rng); let val = sample_lognormal_percentile_bounded(1000, 1000, &mut rng);
assert_eq!( assert_eq!(val, 1000, "floor == ceiling must always return exactly that value");
val, 1000,
"floor == ceiling must always return exactly that value"
);
} }
} }

View File

@ -7,22 +7,13 @@ fn middle_relay_baseline_public_api_idle_roundtrip_contract() {
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 7001)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 7001));
assert_eq!( assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(7001));
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(7001)
);
clear_relay_idle_candidate_for_testing(shared.as_ref(), 7001); clear_relay_idle_candidate_for_testing(shared.as_ref(), 7001);
assert_ne!( assert_ne!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(7001));
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(7001)
);
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 7001)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 7001));
assert_eq!( assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(7001));
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(7001)
);
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
} }
@ -35,12 +26,7 @@ fn middle_relay_baseline_public_api_desync_window_contract() {
let key = 0xDEAD_BEEF_0000_0001u64; let key = 0xDEAD_BEEF_0000_0001u64;
let t0 = Instant::now(); let t0 = Instant::now();
assert!(should_emit_full_desync_for_testing( assert!(should_emit_full_desync_for_testing(shared.as_ref(), key, false, t0));
shared.as_ref(),
key,
false,
t0
));
assert!(!should_emit_full_desync_for_testing( assert!(!should_emit_full_desync_for_testing(
shared.as_ref(), shared.as_ref(),
key, key,
@ -49,12 +35,7 @@ fn middle_relay_baseline_public_api_desync_window_contract() {
)); ));
let t1 = t0 + DESYNC_DEDUP_WINDOW + Duration::from_millis(10); let t1 = t0 + DESYNC_DEDUP_WINDOW + Duration::from_millis(10);
assert!(should_emit_full_desync_for_testing( assert!(should_emit_full_desync_for_testing(shared.as_ref(), key, false, t1));
shared.as_ref(),
key,
false,
t1
));
clear_desync_dedup_for_testing_in_shared(shared.as_ref()); clear_desync_dedup_for_testing_in_shared(shared.as_ref());
} }

View File

@ -13,12 +13,7 @@ fn desync_all_full_bypass_does_not_initialize_or_grow_dedup_cache() {
for i in 0..20_000u64 { for i in 0..20_000u64 {
assert!( assert!(
should_emit_full_desync_for_testing( should_emit_full_desync_for_testing(shared.as_ref(), 0xD35E_D000_0000_0000u64 ^ i, true, now),
shared.as_ref(),
0xD35E_D000_0000_0000u64 ^ i,
true,
now
),
"desync_all_full path must always emit" "desync_all_full path must always emit"
); );
} }
@ -42,12 +37,7 @@ fn desync_all_full_bypass_keeps_existing_dedup_entries_unchanged() {
let now = Instant::now(); let now = Instant::now();
for i in 0..2048u64 { for i in 0..2048u64 {
assert!( assert!(
should_emit_full_desync_for_testing( should_emit_full_desync_for_testing(shared.as_ref(), 0xF011_F000_0000_0000u64 ^ i, true, now),
shared.as_ref(),
0xF011_F000_0000_0000u64 ^ i,
true,
now
),
"desync_all_full must bypass suppression and dedup refresh" "desync_all_full must bypass suppression and dedup refresh"
); );
} }
@ -78,8 +68,7 @@ fn edge_all_full_burst_does_not_poison_later_false_path_tracking() {
let now = Instant::now(); let now = Instant::now();
for i in 0..8192u64 { for i in 0..8192u64 {
assert!(should_emit_full_desync_for_testing( assert!(should_emit_full_desync_for_testing(shared.as_ref(),
shared.as_ref(),
0xABCD_0000_0000_0000 ^ i, 0xABCD_0000_0000_0000 ^ i,
true, true,
now now
@ -113,12 +102,7 @@ fn adversarial_mixed_sequence_true_steps_never_change_cache_len() {
let flag_all_full = (seed & 0x1) == 1; let flag_all_full = (seed & 0x1) == 1;
let key = 0x7000_0000_0000_0000u64 ^ i ^ seed; let key = 0x7000_0000_0000_0000u64 ^ i ^ seed;
let before = desync_dedup_len_for_testing(shared.as_ref()); let before = desync_dedup_len_for_testing(shared.as_ref());
let _ = should_emit_full_desync_for_testing( let _ = should_emit_full_desync_for_testing(shared.as_ref(), key, flag_all_full, Instant::now());
shared.as_ref(),
key,
flag_all_full,
Instant::now(),
);
let after = desync_dedup_len_for_testing(shared.as_ref()); let after = desync_dedup_len_for_testing(shared.as_ref());
if flag_all_full { if flag_all_full {
@ -140,12 +124,7 @@ fn light_fuzz_all_full_mode_always_emits_and_stays_bounded() {
seed ^= seed >> 9; seed ^= seed >> 9;
seed ^= seed << 8; seed ^= seed << 8;
let key = seed ^ 0x55AA_55AA_55AA_55AAu64; let key = seed ^ 0x55AA_55AA_55AA_55AAu64;
assert!(should_emit_full_desync_for_testing( assert!(should_emit_full_desync_for_testing(shared.as_ref(), key, true, Instant::now()));
shared.as_ref(),
key,
true,
Instant::now()
));
} }
let after = desync_dedup_len_for_testing(shared.as_ref()); let after = desync_dedup_len_for_testing(shared.as_ref());

View File

@ -366,42 +366,23 @@ fn pressure_evicts_oldest_idle_candidate_with_deterministic_ordering() {
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 10)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 10));
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 11)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 11));
assert_eq!( assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(10));
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(10)
);
note_relay_pressure_event_for_testing(shared.as_ref()); note_relay_pressure_event_for_testing(shared.as_ref());
let mut seen_for_newer = 0u64; let mut seen_for_newer = 0u64;
assert!( assert!(
!maybe_evict_idle_candidate_on_pressure_for_testing( !maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 11, &mut seen_for_newer, &stats),
shared.as_ref(),
11,
&mut seen_for_newer,
&stats
),
"newer idle candidate must not be evicted while older candidate exists" "newer idle candidate must not be evicted while older candidate exists"
); );
assert_eq!( assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(10));
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(10)
);
let mut seen_for_oldest = 0u64; let mut seen_for_oldest = 0u64;
assert!( assert!(
maybe_evict_idle_candidate_on_pressure_for_testing( maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 10, &mut seen_for_oldest, &stats),
shared.as_ref(),
10,
&mut seen_for_oldest,
&stats
),
"oldest idle candidate must be evicted first under pressure" "oldest idle candidate must be evicted first under pressure"
); );
assert_eq!( assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(11));
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(11)
);
assert_eq!(stats.get_relay_pressure_evict_total(), 1); assert_eq!(stats.get_relay_pressure_evict_total(), 1);
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
@ -421,10 +402,7 @@ fn pressure_does_not_evict_without_new_pressure_signal() {
"without new pressure signal, candidate must stay" "without new pressure signal, candidate must stay"
); );
assert_eq!(stats.get_relay_pressure_evict_total(), 0); assert_eq!(stats.get_relay_pressure_evict_total(), 0);
assert_eq!( assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(21));
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(21)
);
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
} }
@ -437,10 +415,7 @@ fn stress_pressure_eviction_preserves_fifo_across_many_candidates() {
let mut seen_per_conn = std::collections::HashMap::new(); let mut seen_per_conn = std::collections::HashMap::new();
for conn_id in 1000u64..1064u64 { for conn_id in 1000u64..1064u64 {
assert!(mark_relay_idle_candidate_for_testing( assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), conn_id));
shared.as_ref(),
conn_id
));
seen_per_conn.insert(conn_id, 0u64); seen_per_conn.insert(conn_id, 0u64);
} }
@ -451,12 +426,7 @@ fn stress_pressure_eviction_preserves_fifo_across_many_candidates() {
.get(&expected) .get(&expected)
.expect("per-conn pressure cursor must exist"); .expect("per-conn pressure cursor must exist");
assert!( assert!(
maybe_evict_idle_candidate_on_pressure_for_testing( maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), expected, &mut seen, &stats),
shared.as_ref(),
expected,
&mut seen,
&stats
),
"expected conn_id {expected} must be evicted next by deterministic FIFO ordering" "expected conn_id {expected} must be evicted next by deterministic FIFO ordering"
); );
seen_per_conn.insert(expected, seen); seen_per_conn.insert(expected, seen);
@ -466,10 +436,7 @@ fn stress_pressure_eviction_preserves_fifo_across_many_candidates() {
} else { } else {
Some(expected + 1) Some(expected + 1)
}; };
assert_eq!( assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), next);
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
next
);
} }
assert_eq!(stats.get_relay_pressure_evict_total(), 64); assert_eq!(stats.get_relay_pressure_evict_total(), 64);
@ -493,24 +460,9 @@ fn blackhat_single_pressure_event_must_not_evict_more_than_one_candidate() {
// Single pressure event should authorize at most one eviction globally. // Single pressure event should authorize at most one eviction globally.
note_relay_pressure_event_for_testing(shared.as_ref()); note_relay_pressure_event_for_testing(shared.as_ref());
let evicted_301 = maybe_evict_idle_candidate_on_pressure_for_testing( let evicted_301 = maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 301, &mut seen_301, &stats);
shared.as_ref(), let evicted_302 = maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 302, &mut seen_302, &stats);
301, let evicted_303 = maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 303, &mut seen_303, &stats);
&mut seen_301,
&stats,
);
let evicted_302 = maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
302,
&mut seen_302,
&stats,
);
let evicted_303 = maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
303,
&mut seen_303,
&stats,
);
let evicted_total = [evicted_301, evicted_302, evicted_303] let evicted_total = [evicted_301, evicted_302, evicted_303]
.iter() .iter()
@ -540,22 +492,12 @@ fn blackhat_pressure_counter_must_track_global_budget_not_per_session_cursor() {
note_relay_pressure_event_for_testing(shared.as_ref()); note_relay_pressure_event_for_testing(shared.as_ref());
assert!( assert!(
maybe_evict_idle_candidate_on_pressure_for_testing( maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 401, &mut seen_oldest, &stats),
shared.as_ref(),
401,
&mut seen_oldest,
&stats
),
"oldest candidate must consume pressure budget first" "oldest candidate must consume pressure budget first"
); );
assert!( assert!(
!maybe_evict_idle_candidate_on_pressure_for_testing( !maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 402, &mut seen_next, &stats),
shared.as_ref(),
402,
&mut seen_next,
&stats
),
"next candidate must not consume the same pressure budget" "next candidate must not consume the same pressure budget"
); );
@ -580,12 +522,7 @@ fn blackhat_stale_pressure_before_idle_mark_must_not_trigger_eviction() {
let mut seen = 0u64; let mut seen = 0u64;
assert!( assert!(
!maybe_evict_idle_candidate_on_pressure_for_testing( !maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 501, &mut seen, &stats),
shared.as_ref(),
501,
&mut seen,
&stats
),
"stale pressure (before soft-idle mark) must not evict newly marked candidate" "stale pressure (before soft-idle mark) must not evict newly marked candidate"
); );
@ -608,24 +545,9 @@ fn blackhat_stale_pressure_must_not_evict_any_of_newly_marked_batch() {
let mut seen_513 = 0u64; let mut seen_513 = 0u64;
let evicted = [ let evicted = [
maybe_evict_idle_candidate_on_pressure_for_testing( maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 511, &mut seen_511, &stats),
shared.as_ref(), maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 512, &mut seen_512, &stats),
511, maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 513, &mut seen_513, &stats),
&mut seen_511,
&stats,
),
maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
512,
&mut seen_512,
&stats,
),
maybe_evict_idle_candidate_on_pressure_for_testing(
shared.as_ref(),
513,
&mut seen_513,
&stats,
),
] ]
.iter() .iter()
.filter(|value| **value) .filter(|value| **value)
@ -650,12 +572,7 @@ fn blackhat_stale_pressure_seen_without_candidates_must_be_globally_invalidated(
// Session A observed pressure while there were no candidates. // Session A observed pressure while there were no candidates.
let mut seen_a = 0u64; let mut seen_a = 0u64;
assert!( assert!(
!maybe_evict_idle_candidate_on_pressure_for_testing( !maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 999_001, &mut seen_a, &stats),
shared.as_ref(),
999_001,
&mut seen_a,
&stats
),
"no candidate existed, so no eviction is possible" "no candidate existed, so no eviction is possible"
); );
@ -663,12 +580,7 @@ fn blackhat_stale_pressure_seen_without_candidates_must_be_globally_invalidated(
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 521)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 521));
let mut seen_b = 0u64; let mut seen_b = 0u64;
assert!( assert!(
!maybe_evict_idle_candidate_on_pressure_for_testing( !maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 521, &mut seen_b, &stats),
shared.as_ref(),
521,
&mut seen_b,
&stats
),
"once pressure is observed with empty candidate set, it must not be replayed later" "once pressure is observed with empty candidate set, it must not be replayed later"
); );
@ -688,12 +600,7 @@ fn blackhat_stale_pressure_must_not_survive_candidate_churn() {
let mut seen = 0u64; let mut seen = 0u64;
assert!( assert!(
!maybe_evict_idle_candidate_on_pressure_for_testing( !maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), 532, &mut seen, &stats),
shared.as_ref(),
532,
&mut seen,
&stats
),
"stale pressure must not survive clear+remark churn cycles" "stale pressure must not survive clear+remark churn cycles"
); );
@ -756,10 +663,7 @@ async fn integration_race_single_pressure_event_allows_at_most_one_eviction_unde
let mut seen_per_session = vec![0u64; sessions]; let mut seen_per_session = vec![0u64; sessions];
for conn_id in &conn_ids { for conn_id in &conn_ids {
assert!(mark_relay_idle_candidate_for_testing( assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), *conn_id));
shared.as_ref(),
*conn_id
));
} }
for round in 0..rounds { for round in 0..rounds {
@ -772,12 +676,8 @@ async fn integration_race_single_pressure_event_allows_at_most_one_eviction_unde
let stats = stats.clone(); let stats = stats.clone();
let shared = shared.clone(); let shared = shared.clone();
joins.push(tokio::spawn(async move { joins.push(tokio::spawn(async move {
let evicted = maybe_evict_idle_candidate_on_pressure_for_testing( let evicted =
shared.as_ref(), maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), conn_id, &mut seen, stats.as_ref());
conn_id,
&mut seen,
stats.as_ref(),
);
(idx, conn_id, seen, evicted) (idx, conn_id, seen, evicted)
})); }));
} }
@ -829,10 +729,7 @@ async fn integration_race_burst_pressure_with_churn_preserves_empty_set_invalida
let mut seen_per_session = vec![0u64; sessions]; let mut seen_per_session = vec![0u64; sessions];
for conn_id in &conn_ids { for conn_id in &conn_ids {
assert!(mark_relay_idle_candidate_for_testing( assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), *conn_id));
shared.as_ref(),
*conn_id
));
} }
let mut expected_total_evictions = 0u64; let mut expected_total_evictions = 0u64;
@ -854,12 +751,8 @@ async fn integration_race_burst_pressure_with_churn_preserves_empty_set_invalida
let stats = stats.clone(); let stats = stats.clone();
let shared = shared.clone(); let shared = shared.clone();
joins.push(tokio::spawn(async move { joins.push(tokio::spawn(async move {
let evicted = maybe_evict_idle_candidate_on_pressure_for_testing( let evicted =
shared.as_ref(), maybe_evict_idle_candidate_on_pressure_for_testing(shared.as_ref(), conn_id, &mut seen, stats.as_ref());
conn_id,
&mut seen,
stats.as_ref(),
);
(idx, conn_id, seen, evicted) (idx, conn_id, seen, evicted)
})); }));
} }
@ -881,10 +774,7 @@ async fn integration_race_burst_pressure_with_churn_preserves_empty_set_invalida
"round {round}: empty candidate phase must not allow stale-pressure eviction" "round {round}: empty candidate phase must not allow stale-pressure eviction"
); );
for conn_id in &conn_ids { for conn_id in &conn_ids {
assert!(mark_relay_idle_candidate_for_testing( assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), *conn_id));
shared.as_ref(),
*conn_id
));
} }
} else { } else {
assert!( assert!(
@ -893,10 +783,7 @@ async fn integration_race_burst_pressure_with_churn_preserves_empty_set_invalida
); );
if let Some(conn_id) = evicted_conn { if let Some(conn_id) = evicted_conn {
expected_total_evictions = expected_total_evictions.saturating_add(1); expected_total_evictions = expected_total_evictions.saturating_add(1);
assert!(mark_relay_idle_candidate_for_testing( assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), conn_id));
shared.as_ref(),
conn_id
));
} }
} }
} }

View File

@ -25,10 +25,7 @@ fn blackhat_registry_poison_recovers_with_fail_closed_reset_and_pressure_account
// Helper lock must recover from poison, reset stale state, and continue. // Helper lock must recover from poison, reset stale state, and continue.
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 42)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 42));
assert_eq!( assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(42));
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(42)
);
let before = relay_pressure_event_seq_for_testing(shared.as_ref()); let before = relay_pressure_event_seq_for_testing(shared.as_ref());
note_relay_pressure_event_for_testing(shared.as_ref()); note_relay_pressure_event_for_testing(shared.as_ref());
@ -57,17 +54,11 @@ fn clear_state_helper_must_reset_poisoned_registry_for_deterministic_fifo_tests(
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
assert_eq!( assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), None);
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
None
);
assert_eq!(relay_pressure_event_seq_for_testing(shared.as_ref()), 0); assert_eq!(relay_pressure_event_seq_for_testing(shared.as_ref()), 0);
assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 7)); assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 7));
assert_eq!( assert_eq!(oldest_relay_idle_candidate_for_testing(shared.as_ref()), Some(7));
oldest_relay_idle_candidate_for_testing(shared.as_ref()),
Some(7)
);
clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref());
} }

View File

@ -1,10 +1,10 @@
use crate::proxy::client::handle_client_stream_with_shared;
use crate::proxy::handshake::{ use crate::proxy::handshake::{
auth_probe_fail_streak_for_testing_in_shared, auth_probe_is_throttled_for_testing_in_shared, auth_probe_fail_streak_for_testing_in_shared, auth_probe_is_throttled_for_testing_in_shared,
auth_probe_record_failure_for_testing, clear_auth_probe_state_for_testing_in_shared, auth_probe_record_failure_for_testing, clear_auth_probe_state_for_testing_in_shared,
clear_unknown_sni_warn_state_for_testing_in_shared, clear_warned_secrets_for_testing_in_shared, clear_unknown_sni_warn_state_for_testing_in_shared, clear_warned_secrets_for_testing_in_shared,
should_emit_unknown_sni_warn_for_testing_in_shared, warned_secrets_for_testing_in_shared, should_emit_unknown_sni_warn_for_testing_in_shared, warned_secrets_for_testing_in_shared,
}; };
use crate::proxy::client::handle_client_stream_with_shared;
use crate::proxy::middle_relay::{ use crate::proxy::middle_relay::{
clear_desync_dedup_for_testing_in_shared, clear_relay_idle_candidate_for_testing, clear_desync_dedup_for_testing_in_shared, clear_relay_idle_candidate_for_testing,
clear_relay_idle_pressure_state_for_testing_in_shared, mark_relay_idle_candidate_for_testing, clear_relay_idle_pressure_state_for_testing_in_shared, mark_relay_idle_candidate_for_testing,
@ -81,10 +81,7 @@ fn new_client_harness() -> ClientHarness {
} }
} }
async fn drive_invalid_mtproto_handshake( async fn drive_invalid_mtproto_handshake(shared: Arc<ProxySharedState>, peer: std::net::SocketAddr) {
shared: Arc<ProxySharedState>,
peer: std::net::SocketAddr,
) {
let harness = new_client_harness(); let harness = new_client_harness();
let (server_side, mut client_side) = duplex(4096); let (server_side, mut client_side) = duplex(4096);
let invalid = [0u8; 64]; let invalid = [0u8; 64];
@ -111,10 +108,7 @@ async fn drive_invalid_mtproto_handshake(
.write_all(&invalid) .write_all(&invalid)
.await .await
.expect("failed to write invalid handshake"); .expect("failed to write invalid handshake");
client_side client_side.shutdown().await.expect("failed to shutdown client");
.shutdown()
.await
.expect("failed to shutdown client");
let _ = tokio::time::timeout(Duration::from_secs(3), task) let _ = tokio::time::timeout(Duration::from_secs(3), task)
.await .await
.expect("client task timed out") .expect("client task timed out")
@ -134,10 +128,7 @@ fn proxy_shared_state_two_instances_do_not_share_auth_probe_state() {
auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip), auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip),
Some(1) Some(1)
); );
assert_eq!( assert_eq!(auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip), None);
auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip),
None
);
} }
#[test] #[test]
@ -148,18 +139,8 @@ fn proxy_shared_state_two_instances_do_not_share_desync_dedup() {
let now = Instant::now(); let now = Instant::now();
let key = 0xA5A5_u64; let key = 0xA5A5_u64;
assert!(should_emit_full_desync_for_testing( assert!(should_emit_full_desync_for_testing(a.as_ref(), key, false, now));
a.as_ref(), assert!(should_emit_full_desync_for_testing(b.as_ref(), key, false, now));
key,
false,
now
));
assert!(should_emit_full_desync_for_testing(
b.as_ref(),
key,
false,
now
));
} }
#[test] #[test]
@ -169,10 +150,7 @@ fn proxy_shared_state_two_instances_do_not_share_idle_registry() {
clear_relay_idle_pressure_state_for_testing_in_shared(a.as_ref()); clear_relay_idle_pressure_state_for_testing_in_shared(a.as_ref());
assert!(mark_relay_idle_candidate_for_testing(a.as_ref(), 111)); assert!(mark_relay_idle_candidate_for_testing(a.as_ref(), 111));
assert_eq!( assert_eq!(oldest_relay_idle_candidate_for_testing(a.as_ref()), Some(111));
oldest_relay_idle_candidate_for_testing(a.as_ref()),
Some(111)
);
assert_eq!(oldest_relay_idle_candidate_for_testing(b.as_ref()), None); assert_eq!(oldest_relay_idle_candidate_for_testing(b.as_ref()), None);
} }
@ -190,10 +168,7 @@ fn proxy_shared_state_reset_in_one_instance_does_not_affect_another() {
auth_probe_record_failure_for_testing(b.as_ref(), ip_b, now); auth_probe_record_failure_for_testing(b.as_ref(), ip_b, now);
clear_auth_probe_state_for_testing_in_shared(a.as_ref()); clear_auth_probe_state_for_testing_in_shared(a.as_ref());
assert_eq!( assert_eq!(auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a), None);
auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a),
None
);
assert_eq!( assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip_b), auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip_b),
Some(1) Some(1)
@ -216,14 +191,8 @@ fn proxy_shared_state_parallel_auth_probe_updates_stay_per_instance() {
auth_probe_record_failure_for_testing(b.as_ref(), ip, now + Duration::from_millis(1)); auth_probe_record_failure_for_testing(b.as_ref(), ip, now + Duration::from_millis(1));
} }
assert_eq!( assert_eq!(auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip), Some(5));
auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip), assert_eq!(auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip), Some(3));
Some(5)
);
assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip),
Some(3)
);
} }
#[tokio::test] #[tokio::test]
@ -348,14 +317,8 @@ fn proxy_shared_state_auth_saturation_does_not_bleed_across_instances() {
auth_probe_record_failure_for_testing(a.as_ref(), ip, future_now); auth_probe_record_failure_for_testing(a.as_ref(), ip, future_now);
} }
assert!(auth_probe_is_throttled_for_testing_in_shared( assert!(auth_probe_is_throttled_for_testing_in_shared(a.as_ref(), ip));
a.as_ref(), assert!(!auth_probe_is_throttled_for_testing_in_shared(b.as_ref(), ip));
ip
));
assert!(!auth_probe_is_throttled_for_testing_in_shared(
b.as_ref(),
ip
));
} }
#[test] #[test]
@ -385,10 +348,7 @@ fn proxy_shared_state_poison_clear_in_one_instance_does_not_affect_other_instanc
clear_auth_probe_state_for_testing_in_shared(a.as_ref()); clear_auth_probe_state_for_testing_in_shared(a.as_ref());
assert_eq!( assert_eq!(auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a), None);
auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip_a),
None
);
assert_eq!( assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip_b), auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip_b),
Some(1), Some(1),
@ -503,10 +463,7 @@ fn proxy_shared_state_warned_secret_clear_in_one_instance_does_not_clear_other()
clear_warned_secrets_for_testing_in_shared(a.as_ref()); clear_warned_secrets_for_testing_in_shared(a.as_ref());
clear_warned_secrets_for_testing_in_shared(b.as_ref()); clear_warned_secrets_for_testing_in_shared(b.as_ref());
let key = ( let key = ("clear-isolation-user".to_string(), "invalid_length".to_string());
"clear-isolation-user".to_string(),
"invalid_length".to_string(),
);
{ {
let warned_a = warned_secrets_for_testing_in_shared(a.as_ref()); let warned_a = warned_secrets_for_testing_in_shared(a.as_ref());
let mut guard_a = warned_a let mut guard_a = warned_a
@ -551,24 +508,14 @@ fn proxy_shared_state_desync_duplicate_suppression_is_instance_scoped() {
let now = Instant::now(); let now = Instant::now();
let key = 0xBEEF_0000_0000_0001u64; let key = 0xBEEF_0000_0000_0001u64;
assert!(should_emit_full_desync_for_testing( assert!(should_emit_full_desync_for_testing(a.as_ref(), key, false, now));
a.as_ref(),
key,
false,
now
));
assert!(!should_emit_full_desync_for_testing( assert!(!should_emit_full_desync_for_testing(
a.as_ref(), a.as_ref(),
key, key,
false, false,
now + Duration::from_millis(1) now + Duration::from_millis(1)
)); ));
assert!(should_emit_full_desync_for_testing( assert!(should_emit_full_desync_for_testing(b.as_ref(), key, false, now));
b.as_ref(),
key,
false,
now
));
} }
#[test] #[test]
@ -580,18 +527,8 @@ fn proxy_shared_state_desync_clear_in_one_instance_does_not_clear_other() {
let now = Instant::now(); let now = Instant::now();
let key = 0xCAFE_0000_0000_0001u64; let key = 0xCAFE_0000_0000_0001u64;
assert!(should_emit_full_desync_for_testing( assert!(should_emit_full_desync_for_testing(a.as_ref(), key, false, now));
a.as_ref(), assert!(should_emit_full_desync_for_testing(b.as_ref(), key, false, now));
key,
false,
now
));
assert!(should_emit_full_desync_for_testing(
b.as_ref(),
key,
false,
now
));
clear_desync_dedup_for_testing_in_shared(a.as_ref()); clear_desync_dedup_for_testing_in_shared(a.as_ref());
@ -621,10 +558,7 @@ fn proxy_shared_state_idle_candidate_clear_in_one_instance_does_not_affect_other
clear_relay_idle_candidate_for_testing(a.as_ref(), 1001); clear_relay_idle_candidate_for_testing(a.as_ref(), 1001);
assert_eq!(oldest_relay_idle_candidate_for_testing(a.as_ref()), None); assert_eq!(oldest_relay_idle_candidate_for_testing(a.as_ref()), None);
assert_eq!( assert_eq!(oldest_relay_idle_candidate_for_testing(b.as_ref()), Some(2002));
oldest_relay_idle_candidate_for_testing(b.as_ref()),
Some(2002)
);
} }
#[test] #[test]

View File

@ -1,17 +1,16 @@
use crate::proxy::handshake::{ use crate::proxy::handshake::{
auth_probe_fail_streak_for_testing_in_shared, auth_probe_record_failure_for_testing, auth_probe_fail_streak_for_testing_in_shared, auth_probe_record_failure_for_testing,
clear_auth_probe_state_for_testing_in_shared, clear_auth_probe_state_for_testing_in_shared, clear_unknown_sni_warn_state_for_testing_in_shared,
clear_unknown_sni_warn_state_for_testing_in_shared,
should_emit_unknown_sni_warn_for_testing_in_shared, should_emit_unknown_sni_warn_for_testing_in_shared,
}; };
use crate::proxy::middle_relay::{ use crate::proxy::middle_relay::{
clear_desync_dedup_for_testing_in_shared, clear_desync_dedup_for_testing_in_shared, clear_relay_idle_pressure_state_for_testing_in_shared,
clear_relay_idle_pressure_state_for_testing_in_shared, mark_relay_idle_candidate_for_testing, mark_relay_idle_candidate_for_testing, oldest_relay_idle_candidate_for_testing,
oldest_relay_idle_candidate_for_testing, should_emit_full_desync_for_testing, should_emit_full_desync_for_testing,
}; };
use crate::proxy::shared_state::ProxySharedState; use crate::proxy::shared_state::ProxySharedState;
use rand::RngExt;
use rand::SeedableRng; use rand::SeedableRng;
use rand::RngExt;
use rand::rngs::StdRng; use rand::rngs::StdRng;
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc; use std::sync::Arc;
@ -100,14 +99,8 @@ async fn proxy_shared_state_dual_instance_same_ip_high_contention_no_counter_ble
handle.await.expect("task join failed"); handle.await.expect("task join failed");
} }
assert_eq!( assert_eq!(auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip), Some(64));
auth_probe_fail_streak_for_testing_in_shared(a.as_ref(), ip), assert_eq!(auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip), Some(64));
Some(64)
);
assert_eq!(
auth_probe_fail_streak_for_testing_in_shared(b.as_ref(), ip),
Some(64)
);
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
@ -190,7 +183,12 @@ async fn proxy_shared_state_seed_matrix_concurrency_isolation_no_counter_bleed()
clear_auth_probe_state_for_testing_in_shared(shared_a.as_ref()); clear_auth_probe_state_for_testing_in_shared(shared_a.as_ref());
clear_auth_probe_state_for_testing_in_shared(shared_b.as_ref()); clear_auth_probe_state_for_testing_in_shared(shared_b.as_ref());
let ip = IpAddr::V4(Ipv4Addr::new(198, 51, 100, rng.random_range(1_u8..=250_u8))); let ip = IpAddr::V4(Ipv4Addr::new(
198,
51,
100,
rng.random_range(1_u8..=250_u8),
));
let workers = rng.random_range(16_usize..=48_usize); let workers = rng.random_range(16_usize..=48_usize);
let rounds = rng.random_range(4_usize..=10_usize); let rounds = rng.random_range(4_usize..=10_usize);
@ -212,11 +210,7 @@ async fn proxy_shared_state_seed_matrix_concurrency_isolation_no_counter_bleed()
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
start_a.wait().await; start_a.wait().await;
for _ in 0..a_ops { for _ in 0..a_ops {
auth_probe_record_failure_for_testing( auth_probe_record_failure_for_testing(shared_a.as_ref(), ip, Instant::now());
shared_a.as_ref(),
ip,
Instant::now(),
);
} }
})); }));
@ -225,11 +219,7 @@ async fn proxy_shared_state_seed_matrix_concurrency_isolation_no_counter_bleed()
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
start_b.wait().await; start_b.wait().await;
for _ in 0..b_ops { for _ in 0..b_ops {
auth_probe_record_failure_for_testing( auth_probe_record_failure_for_testing(shared_b.as_ref(), ip, Instant::now());
shared_b.as_ref(),
ip,
Instant::now(),
);
} }
})); }));
} }

View File

@ -69,10 +69,7 @@ async fn relay_baseline_activity_timeout_fires_after_inactivity() {
.expect("relay must complete after inactivity timeout") .expect("relay must complete after inactivity timeout")
.expect("relay task must not panic"); .expect("relay task must not panic");
assert!( assert!(done.is_ok(), "relay must return Ok(()) after inactivity timeout");
done.is_ok(),
"relay must return Ok(()) after inactivity timeout"
);
} }
#[tokio::test] #[tokio::test]
@ -158,10 +155,7 @@ async fn relay_baseline_bidirectional_bytes_counted_symmetrically() {
.expect("relay task must not panic"); .expect("relay task must not panic");
assert!(done.is_ok()); assert!(done.is_ok());
assert_eq!( assert_eq!(stats.get_user_total_octets(user), (c2s.len() + s2c.len()) as u64);
stats.get_user_total_octets(user),
(c2s.len() + s2c.len()) as u64
);
} }
#[tokio::test] #[tokio::test]
@ -228,10 +222,7 @@ async fn relay_baseline_broken_pipe_midtransfer_returns_error() {
match done { match done {
Err(ProxyError::Io(err)) => { Err(ProxyError::Io(err)) => {
assert!( assert!(
matches!( matches!(err.kind(), io::ErrorKind::BrokenPipe | io::ErrorKind::ConnectionReset),
err.kind(),
io::ErrorKind::BrokenPipe | io::ErrorKind::ConnectionReset
),
"expected BrokenPipe/ConnectionReset, got {:?}", "expected BrokenPipe/ConnectionReset, got {:?}",
err.kind() err.kind()
); );

View File

@ -1,6 +1,6 @@
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
use rand::SeedableRng;
use rand::rngs::StdRng; use rand::rngs::StdRng;
use rand::SeedableRng;
use std::io; use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
@ -18,10 +18,7 @@ mod tests {
let arc = Arc::<AtomicUsize>::from_raw(data.cast::<AtomicUsize>()); let arc = Arc::<AtomicUsize>::from_raw(data.cast::<AtomicUsize>());
let cloned = Arc::clone(&arc); let cloned = Arc::clone(&arc);
let _ = Arc::into_raw(arc); let _ = Arc::into_raw(arc);
RawWaker::new( RawWaker::new(Arc::into_raw(cloned).cast::<()>(), &WAKE_COUNTER_WAKER_VTABLE)
Arc::into_raw(cloned).cast::<()>(),
&WAKE_COUNTER_WAKER_VTABLE,
)
} }
unsafe fn wake_counter_wake(data: *const ()) { unsafe fn wake_counter_wake(data: *const ()) {

View File

@ -1593,15 +1593,13 @@ impl Stats {
self.conntrack_delete_success_total.load(Ordering::Relaxed) self.conntrack_delete_success_total.load(Ordering::Relaxed)
} }
pub fn get_conntrack_delete_not_found_total(&self) -> u64 { pub fn get_conntrack_delete_not_found_total(&self) -> u64 {
self.conntrack_delete_not_found_total self.conntrack_delete_not_found_total.load(Ordering::Relaxed)
.load(Ordering::Relaxed)
} }
pub fn get_conntrack_delete_error_total(&self) -> u64 { pub fn get_conntrack_delete_error_total(&self) -> u64 {
self.conntrack_delete_error_total.load(Ordering::Relaxed) self.conntrack_delete_error_total.load(Ordering::Relaxed)
} }
pub fn get_conntrack_close_event_drop_total(&self) -> u64 { pub fn get_conntrack_close_event_drop_total(&self) -> u64 {
self.conntrack_close_event_drop_total self.conntrack_close_event_drop_total.load(Ordering::Relaxed)
.load(Ordering::Relaxed)
} }
pub fn get_me_keepalive_sent(&self) -> u64 { pub fn get_me_keepalive_sent(&self) -> u64 {
self.me_keepalive_sent.load(Ordering::Relaxed) self.me_keepalive_sent.load(Ordering::Relaxed)