Compare commits

..

14 Commits

Author SHA1 Message Date
Alexey 67dc1e8d18
Merge pull request #498 from telemt/bump
Update Cargo.toml
2026-03-19 18:25:14 +03:00
Alexey ad8ada33c9
Update Cargo.toml 2026-03-19 18:24:01 +03:00
Alexey bbb201b433
Instadrain + Hard-remove for long draining-state: merge pull request #497 from telemt/flow-stuck-writer
Instadrain + Hard-remove for long draining-state
2026-03-19 18:23:38 +03:00
Alexey 8d1faece60
Instadrain + Hard-remove for long draining-state
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-03-19 17:45:17 +03:00
Alexey a603505f90
Merge pull request #492 from temandroid/main
fix(docker): expose port 9091 and allow external API access
2026-03-19 17:32:49 +03:00
Alexey f8c42c324f
Merge pull request #494 from Dimasssss/patch-1
Update install.sh
2026-03-19 17:32:05 +03:00
Dimasssss dc3363aa0d
Update install.sh 2026-03-19 16:23:32 +03:00
Alexey f655924323
Update health.rs
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-03-19 16:15:00 +03:00
TEMAndroid 05c066c676
fix(docker): expose port 9091 and allow external API access
Add 9091 port mapping to compose.yml to make the REST API reachable
from outside the container. Previously only port 9090 (metrics) was
published, making the documented curl commands non-functional.

fixes #434
2026-03-19 15:54:01 +03:00
Alexey 1e000c2e7e
ME Writer stuck-up in draining-state fixes: merge pull request #491 from telemt/flow-stuck-writer
ME Writer stuck-up in draining-state fixes
2026-03-19 14:44:43 +03:00
Alexey fa17e719f6
Merge pull request #490 from telemt/bump
Update Cargo.toml
2026-03-19 14:43:15 +03:00
Alexey ae3ced8e7c
Update Cargo.toml 2026-03-19 14:42:59 +03:00
Alexey 3279f6d46a
Cleanup-path as non-blocking
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-03-19 14:07:20 +03:00
Alexey 6f9aef7bb4
ME Writer stuck-up in draining-state fixes
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-03-19 13:08:35 +03:00
28 changed files with 865 additions and 352 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "3.3.23" version = "3.3.25"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -38,6 +38,7 @@ USER telemt
EXPOSE 443 EXPOSE 443
EXPOSE 9090 EXPOSE 9090
EXPOSE 9091
ENTRYPOINT ["/app/telemt"] ENTRYPOINT ["/app/telemt"]
CMD ["config.toml"] CMD ["config.toml"]

View File

@ -7,6 +7,7 @@ services:
ports: ports:
- "443:443" - "443:443"
- "127.0.0.1:9090:9090" - "127.0.0.1:9090:9090"
- "127.0.0.1:9091:9091"
# Allow caching 'proxy-secret' in read-only container # Allow caching 'proxy-secret' in read-only container
working_dir: /run/telemt working_dir: /run/telemt
volumes: volumes:

View File

@ -181,6 +181,8 @@ docker compose down
docker build -t telemt:local . docker build -t telemt:local .
docker run --name telemt --restart unless-stopped \ docker run --name telemt --restart unless-stopped \
-p 443:443 \ -p 443:443 \
-p 9090:9090 \
-p 9091:9091 \
-e RUST_LOG=info \ -e RUST_LOG=info \
-v "$PWD/config.toml:/app/config.toml:ro" \ -v "$PWD/config.toml:/app/config.toml:ro" \
--read-only \ --read-only \

View File

@ -183,6 +183,8 @@ docker compose down
docker build -t telemt:local . docker build -t telemt:local .
docker run --name telemt --restart unless-stopped \ docker run --name telemt --restart unless-stopped \
-p 443:443 \ -p 443:443 \
-p 9090:9090 \
-p 9091:9091 \
-e RUST_LOG=info \ -e RUST_LOG=info \
-v "$PWD/config.toml:/app/config.toml:ro" \ -v "$PWD/config.toml:/app/config.toml:ro" \
--read-only \ --read-only \

View File

@ -1,154 +1,190 @@
#!/bin/sh #!/bin/sh
set -eu set -eu
# --- Global Configurations ---
REPO="${REPO:-telemt/telemt}" REPO="${REPO:-telemt/telemt}"
BIN_NAME="${BIN_NAME:-telemt}" BIN_NAME="${BIN_NAME:-telemt}"
INSTALL_DIR="${INSTALL_DIR:-/bin}" INSTALL_DIR="${INSTALL_DIR:-/bin}"
CONFIG_DIR="${CONFIG_DIR:-/etc/telemt}" CONFIG_DIR="${CONFIG_DIR:-/etc/telemt}"
CONFIG_FILE="${CONFIG_FILE:-${CONFIG_DIR}/telemt.toml}" CONFIG_FILE="${CONFIG_FILE:-${CONFIG_DIR}/telemt.toml}"
WORK_DIR="${WORK_DIR:-/opt/telemt}" WORK_DIR="${WORK_DIR:-/opt/telemt}"
TLS_DOMAIN="${TLS_DOMAIN:-petrovich.ru}"
SERVICE_NAME="telemt" SERVICE_NAME="telemt"
TEMP_DIR="" TEMP_DIR=""
SUDO="" SUDO=""
CONFIG_PARENT_DIR=""
SERVICE_START_FAILED=0
# --- Argument Parsing ---
ACTION="install" ACTION="install"
TARGET_VERSION="${VERSION:-latest}" TARGET_VERSION="${VERSION:-latest}"
while [ $# -gt 0 ]; do while [ $# -gt 0 ]; do
case "$1" in case "$1" in
-h|--help) -h|--help) ACTION="help"; shift ;;
ACTION="help"
shift
;;
uninstall|--uninstall) uninstall|--uninstall)
[ "$ACTION" != "purge" ] && ACTION="uninstall" if [ "$ACTION" != "purge" ]; then ACTION="uninstall"; fi
shift shift ;;
;; purge|--purge) ACTION="purge"; shift ;;
--purge) install|--install) ACTION="install"; shift ;;
ACTION="purge" -*) printf '[ERROR] Unknown option: %s\n' "$1" >&2; exit 1 ;;
shift
;;
install|--install)
ACTION="install"
shift
;;
-*)
printf '[ERROR] Unknown option: %s\n' "$1" >&2
exit 1
;;
*) *)
if [ "$ACTION" = "install" ]; then if [ "$ACTION" = "install" ]; then TARGET_VERSION="$1"
TARGET_VERSION="$1" else printf '[WARNING] Ignoring extra argument: %s\n' "$1" >&2; fi
fi shift ;;
shift
;;
esac esac
done done
# --- Core Functions --- say() {
say() { printf '[INFO] %s\n' "$*"; } if [ "$#" -eq 0 ] || [ -z "${1:-}" ]; then
printf '\n'
else
printf '[INFO] %s\n' "$*"
fi
}
die() { printf '[ERROR] %s\n' "$*" >&2; exit 1; } die() { printf '[ERROR] %s\n' "$*" >&2; exit 1; }
write_root() { $SUDO sh -c 'cat > "$1"' _ "$1"; }
cleanup() { cleanup() {
if [ -n "${TEMP_DIR:-}" ] && [ -d "$TEMP_DIR" ]; then if [ -n "${TEMP_DIR:-}" ] && [ -d "$TEMP_DIR" ]; then
rm -rf -- "$TEMP_DIR" rm -rf -- "$TEMP_DIR"
fi fi
} }
trap cleanup EXIT INT TERM trap cleanup EXIT INT TERM
show_help() { show_help() {
say "Usage: $0 [version | install | uninstall | --purge | --help]" say "Usage: $0 [ <version> | install | uninstall | purge | --help ]"
say " version Install specific version (e.g. 1.0.0, default: latest)" say " <version> Install specific version (e.g. 3.3.15, default: latest)"
say " uninstall Remove the binary and service (keeps config)" say " install Install the latest version"
say " --purge Remove everything including configuration" say " uninstall Remove the binary and service (keeps config and user)"
say " purge Remove everything including configuration, data, and user"
exit 0 exit 0
} }
user_exists() { check_os_entity() {
if command -v getent >/dev/null 2>&1; then if command -v getent >/dev/null 2>&1; then getent "$1" "$2" >/dev/null 2>&1
getent passwd "$1" >/dev/null 2>&1 else grep -q "^${2}:" "/etc/$1" 2>/dev/null; fi
}
normalize_path() {
printf '%s\n' "$1" | tr -s '/' | sed 's|/$||; s|^$|/|'
}
get_realpath() {
path_in="$1"
case "$path_in" in /*) ;; *) path_in="$(pwd)/$path_in" ;; esac
if command -v realpath >/dev/null 2>&1; then
if realpath_out="$(realpath -m "$path_in" 2>/dev/null)"; then
printf '%s\n' "$realpath_out"
return
fi
fi
if command -v readlink >/dev/null 2>&1; then
resolved_path="$(readlink -f "$path_in" 2>/dev/null || true)"
if [ -n "$resolved_path" ]; then
printf '%s\n' "$resolved_path"
return
fi
fi
d="${path_in%/*}"; b="${path_in##*/}"
if [ -z "$d" ]; then d="/"; fi
if [ "$d" = "$path_in" ]; then d="/"; b="$path_in"; fi
if [ -d "$d" ]; then
abs_d="$(cd "$d" >/dev/null 2>&1 && pwd || true)"
if [ -n "$abs_d" ]; then
if [ "$b" = "." ] || [ -z "$b" ]; then printf '%s\n' "$abs_d"
elif [ "$abs_d" = "/" ]; then printf '/%s\n' "$b"
else printf '%s/%s\n' "$abs_d" "$b"; fi
else
normalize_path "$path_in"
fi
else else
grep -q "^${1}:" /etc/passwd 2>/dev/null normalize_path "$path_in"
fi fi
} }
group_exists() { get_svc_mgr() {
if command -v getent >/dev/null 2>&1; then if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then echo "systemd"
getent group "$1" >/dev/null 2>&1 elif command -v rc-service >/dev/null 2>&1; then echo "openrc"
else else echo "none"; fi
grep -q "^${1}:" /etc/group 2>/dev/null
fi
} }
verify_common() { verify_common() {
[ -z "$BIN_NAME" ] && die "BIN_NAME cannot be empty." [ -n "$BIN_NAME" ] || die "BIN_NAME cannot be empty."
[ -z "$INSTALL_DIR" ] && die "INSTALL_DIR cannot be empty." [ -n "$INSTALL_DIR" ] || die "INSTALL_DIR cannot be empty."
[ -z "$CONFIG_DIR" ] && die "CONFIG_DIR cannot be empty." [ -n "$CONFIG_DIR" ] || die "CONFIG_DIR cannot be empty."
[ -n "$CONFIG_FILE" ] || die "CONFIG_FILE cannot be empty."
case "${INSTALL_DIR}${CONFIG_DIR}${WORK_DIR}${CONFIG_FILE}" in
*[!a-zA-Z0-9_./-]*) die "Invalid characters in paths. Only alphanumeric, _, ., -, and / allowed." ;;
esac
case "$TARGET_VERSION" in *[!a-zA-Z0-9_.-]*) die "Invalid characters in version." ;; esac
case "$BIN_NAME" in *[!a-zA-Z0-9_-]*) die "Invalid characters in BIN_NAME." ;; esac
INSTALL_DIR="$(get_realpath "$INSTALL_DIR")"
CONFIG_DIR="$(get_realpath "$CONFIG_DIR")"
WORK_DIR="$(get_realpath "$WORK_DIR")"
CONFIG_FILE="$(get_realpath "$CONFIG_FILE")"
CONFIG_PARENT_DIR="${CONFIG_FILE%/*}"
if [ -z "$CONFIG_PARENT_DIR" ]; then CONFIG_PARENT_DIR="/"; fi
if [ "$CONFIG_PARENT_DIR" = "$CONFIG_FILE" ]; then CONFIG_PARENT_DIR="."; fi
if [ "$(id -u)" -eq 0 ]; then if [ "$(id -u)" -eq 0 ]; then
SUDO="" SUDO=""
else else
if ! command -v sudo >/dev/null 2>&1; then command -v sudo >/dev/null 2>&1 || die "This script requires root or sudo. Neither found."
die "This script requires root or sudo. Neither found."
fi
SUDO="sudo" SUDO="sudo"
say "sudo is available. Caching credentials..." if ! sudo -n true 2>/dev/null; then
if ! sudo -v; then if ! [ -t 0 ]; then
die "Failed to cache sudo credentials" die "sudo requires a password, but no TTY detected. Aborting to prevent hang."
fi
fi fi
fi fi
case "${INSTALL_DIR}${CONFIG_DIR}${WORK_DIR}" in if [ -n "$SUDO" ]; then
*[!a-zA-Z0-9_./-]*) if $SUDO sh -c '[ -d "$1" ]' _ "$CONFIG_FILE"; then
die "Invalid characters in path variables. Only alphanumeric, _, ., -, and / are allowed." die "Safety check failed: CONFIG_FILE '$CONFIG_FILE' is a directory."
;; fi
esac elif [ -d "$CONFIG_FILE" ]; then
die "Safety check failed: CONFIG_FILE '$CONFIG_FILE' is a directory."
case "$BIN_NAME" in fi
*[!a-zA-Z0-9_-]*) die "Invalid characters in BIN_NAME: $BIN_NAME" ;;
esac
for path in "$CONFIG_DIR" "$WORK_DIR"; do
check_path="$path"
while [ "$check_path" != "/" ] && [ "${check_path%"/"}" != "$check_path" ]; do
check_path="${check_path%"/"}"
done
[ -z "$check_path" ] && check_path="/"
for path in "$CONFIG_DIR" "$CONFIG_PARENT_DIR" "$WORK_DIR"; do
check_path="$(get_realpath "$path")"
case "$check_path" in case "$check_path" in
/|/bin|/sbin|/usr|/usr/bin|/usr/local|/etc|/opt|/var|/home|/root|/tmp) /|/bin|/sbin|/usr|/usr/bin|/usr/sbin|/usr/local|/usr/local/bin|/usr/local/sbin|/usr/local/etc|/usr/local/share|/etc|/var|/var/lib|/var/log|/var/run|/home|/root|/tmp|/lib|/lib64|/opt|/run|/boot|/dev|/sys|/proc)
die "Safety check failed: '$path' is a critical system directory." die "Safety check failed: '$path' (resolved to '$check_path') is a critical system directory." ;;
;;
esac esac
done done
for cmd in uname grep find rm chown chmod mv head mktemp; do check_install_dir="$(get_realpath "$INSTALL_DIR")"
case "$check_install_dir" in
/|/etc|/var|/home|/root|/tmp|/usr|/usr/local|/opt|/boot|/dev|/sys|/proc|/run)
die "Safety check failed: INSTALL_DIR '$INSTALL_DIR' is a critical system directory." ;;
esac
for cmd in id uname grep find rm chown chmod mv mktemp mkdir tr dd sed ps head sleep cat tar gzip rmdir; do
command -v "$cmd" >/dev/null 2>&1 || die "Required command not found: $cmd" command -v "$cmd" >/dev/null 2>&1 || die "Required command not found: $cmd"
done done
} }
verify_install_deps() { verify_install_deps() {
if ! command -v curl >/dev/null 2>&1 && ! command -v wget >/dev/null 2>&1; then command -v curl >/dev/null 2>&1 || command -v wget >/dev/null 2>&1 || die "Neither curl nor wget is installed."
die "Neither curl nor wget is installed."
fi
command -v tar >/dev/null 2>&1 || die "Required command not found: tar"
command -v gzip >/dev/null 2>&1 || die "Required command not found: gzip"
command -v cp >/dev/null 2>&1 || command -v install >/dev/null 2>&1 || die "Need cp or install" command -v cp >/dev/null 2>&1 || command -v install >/dev/null 2>&1 || die "Need cp or install"
if ! command -v setcap >/dev/null 2>&1; then if ! command -v setcap >/dev/null 2>&1; then
say "setcap is missing. Installing required capability tools..."
if command -v apk >/dev/null 2>&1; then if command -v apk >/dev/null 2>&1; then
$SUDO apk add --no-cache libcap || die "Failed to install libcap" $SUDO apk add --no-cache libcap-utils >/dev/null 2>&1 || $SUDO apk add --no-cache libcap >/dev/null 2>&1 || true
elif command -v apt-get >/dev/null 2>&1; then elif command -v apt-get >/dev/null 2>&1; then
$SUDO apt-get update -qq && $SUDO apt-get install -y -qq libcap2-bin || die "Failed to install libcap2-bin" $SUDO apt-get update -q >/dev/null 2>&1 || true
elif command -v dnf >/dev/null 2>&1 || command -v yum >/dev/null 2>&1; then $SUDO apt-get install -y -q libcap2-bin >/dev/null 2>&1 || true
$SUDO ${YUM_CMD:-yum} install -y -q libcap || die "Failed to install libcap" elif command -v dnf >/dev/null 2>&1; then $SUDO dnf install -y -q libcap >/dev/null 2>&1 || true
else elif command -v yum >/dev/null 2>&1; then $SUDO yum install -y -q libcap >/dev/null 2>&1 || true
die "Cannot install 'setcap'. Package manager not found. Please install libcap manually."
fi fi
fi fi
} }
@ -163,122 +199,96 @@ detect_arch() {
} }
detect_libc() { detect_libc() {
if command -v ldd >/dev/null 2>&1 && ldd --version 2>&1 | grep -qi musl; then
echo "musl"; return 0
fi
if grep -q '^ID=alpine' /etc/os-release 2>/dev/null || grep -q '^ID="alpine"' /etc/os-release 2>/dev/null; then
echo "musl"; return 0
fi
for f in /lib/ld-musl-*.so.* /lib64/ld-musl-*.so.*; do for f in /lib/ld-musl-*.so.* /lib64/ld-musl-*.so.*; do
if [ -e "$f" ]; then if [ -e "$f" ]; then echo "musl"; return 0; fi
echo "musl"; return 0
fi
done done
if grep -qE '^ID="?alpine"?' /etc/os-release 2>/dev/null; then echo "musl"; return 0; fi
if command -v ldd >/dev/null 2>&1 && (ldd --version 2>&1 || true) | grep -qi musl; then echo "musl"; return 0; fi
echo "gnu" echo "gnu"
} }
fetch_file() { fetch_file() {
fetch_url="$1" if command -v curl >/dev/null 2>&1; then curl -fsSL "$1" -o "$2"
fetch_out="$2" else wget -q -O "$2" "$1"; fi
if command -v curl >/dev/null 2>&1; then
curl -fsSL "$fetch_url" -o "$fetch_out" || return 1
elif command -v wget >/dev/null 2>&1; then
wget -qO "$fetch_out" "$fetch_url" || return 1
else
die "curl or wget required"
fi
} }
ensure_user_group() { ensure_user_group() {
nologin_bin="/bin/false" nologin_bin="$(command -v nologin 2>/dev/null || command -v false 2>/dev/null || echo /bin/false)"
cmd_nologin="$(command -v nologin 2>/dev/null || true)" if ! check_os_entity group telemt; then
if [ -n "$cmd_nologin" ] && [ -x "$cmd_nologin" ]; then if command -v groupadd >/dev/null 2>&1; then $SUDO groupadd -r telemt
nologin_bin="$cmd_nologin" elif command -v addgroup >/dev/null 2>&1; then $SUDO addgroup -S telemt
else else die "Cannot create group"; fi
for bin in /sbin/nologin /usr/sbin/nologin; do
if [ -x "$bin" ]; then
nologin_bin="$bin"
break
fi
done
fi fi
if ! group_exists telemt; then if ! check_os_entity passwd telemt; then
if command -v groupadd >/dev/null 2>&1; then
$SUDO groupadd -r telemt || die "Failed to create group via groupadd"
elif command -v addgroup >/dev/null 2>&1; then
$SUDO addgroup -S telemt || die "Failed to create group via addgroup"
else
die "Cannot create group: neither groupadd nor addgroup found"
fi
fi
if ! user_exists telemt; then
if command -v useradd >/dev/null 2>&1; then if command -v useradd >/dev/null 2>&1; then
$SUDO useradd -r -g telemt -d "$WORK_DIR" -s "$nologin_bin" -c "Telemt Proxy" telemt || die "Failed to create user via useradd" $SUDO useradd -r -g telemt -d "$WORK_DIR" -s "$nologin_bin" -c "Telemt Proxy" telemt
elif command -v adduser >/dev/null 2>&1; then elif command -v adduser >/dev/null 2>&1; then
$SUDO adduser -S -D -H -h "$WORK_DIR" -s "$nologin_bin" -G telemt telemt || die "Failed to create user via adduser" if adduser --help 2>&1 | grep -q -- '-S'; then
else $SUDO adduser -S -D -H -h "$WORK_DIR" -s "$nologin_bin" -G telemt telemt
die "Cannot create user: neither useradd nor adduser found" else
fi $SUDO adduser --system --home "$WORK_DIR" --shell "$nologin_bin" --no-create-home --ingroup telemt --disabled-password telemt
fi
else die "Cannot create user"; fi
fi fi
} }
setup_dirs() { setup_dirs() {
say "Setting up directories..." $SUDO mkdir -p "$WORK_DIR" "$CONFIG_DIR" "$CONFIG_PARENT_DIR" || die "Failed to create directories"
$SUDO mkdir -p "$WORK_DIR" "$CONFIG_DIR" || die "Failed to create directories"
$SUDO chown telemt:telemt "$WORK_DIR" || die "Failed to set owner on WORK_DIR" $SUDO chown telemt:telemt "$WORK_DIR" && $SUDO chmod 750 "$WORK_DIR"
$SUDO chmod 750 "$WORK_DIR" || die "Failed to set permissions on WORK_DIR" $SUDO chown root:telemt "$CONFIG_DIR" && $SUDO chmod 750 "$CONFIG_DIR"
if [ "$CONFIG_PARENT_DIR" != "$CONFIG_DIR" ] && [ "$CONFIG_PARENT_DIR" != "." ] && [ "$CONFIG_PARENT_DIR" != "/" ]; then
$SUDO chown root:telemt "$CONFIG_PARENT_DIR" && $SUDO chmod 750 "$CONFIG_PARENT_DIR"
fi
} }
stop_service() { stop_service() {
say "Stopping service if running..." svc="$(get_svc_mgr)"
if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then if [ "$svc" = "systemd" ] && systemctl is-active --quiet "$SERVICE_NAME" 2>/dev/null; then
$SUDO systemctl stop "$SERVICE_NAME" 2>/dev/null || true $SUDO systemctl stop "$SERVICE_NAME" 2>/dev/null || true
elif command -v rc-service >/dev/null 2>&1; then elif [ "$svc" = "openrc" ] && rc-service "$SERVICE_NAME" status >/dev/null 2>&1; then
$SUDO rc-service "$SERVICE_NAME" stop 2>/dev/null || true $SUDO rc-service "$SERVICE_NAME" stop 2>/dev/null || true
fi fi
} }
install_binary() { install_binary() {
bin_src="$1" bin_src="$1"; bin_dst="$2"
bin_dst="$2" if [ -e "$INSTALL_DIR" ] && [ ! -d "$INSTALL_DIR" ]; then
die "'$INSTALL_DIR' is not a directory."
fi
$SUDO mkdir -p "$INSTALL_DIR" || die "Failed to create install directory" $SUDO mkdir -p "$INSTALL_DIR" || die "Failed to create install directory"
if command -v install >/dev/null 2>&1; then if command -v install >/dev/null 2>&1; then
$SUDO install -m 0755 "$bin_src" "$bin_dst" || die "Failed to install binary" $SUDO install -m 0755 "$bin_src" "$bin_dst" || die "Failed to install binary"
else else
$SUDO rm -f "$bin_dst" $SUDO rm -f "$bin_dst" 2>/dev/null || true
$SUDO cp "$bin_src" "$bin_dst" || die "Failed to copy binary" $SUDO cp "$bin_src" "$bin_dst" && $SUDO chmod 0755 "$bin_dst" || die "Failed to copy binary"
$SUDO chmod 0755 "$bin_dst" || die "Failed to set permissions"
fi fi
if [ ! -x "$bin_dst" ]; then $SUDO sh -c '[ -x "$1" ]' _ "$bin_dst" || die "Binary not executable: $bin_dst"
die "Failed to install binary or it is not executable: $bin_dst"
fi
say "Granting network bind capabilities to bind port 443..." if command -v setcap >/dev/null 2>&1; then
if ! $SUDO setcap cap_net_bind_service=+ep "$bin_dst" 2>/dev/null; then $SUDO setcap cap_net_bind_service=+ep "$bin_dst" 2>/dev/null || true
say "[WARNING] Failed to apply setcap. The service will NOT be able to open port 443!"
say "[WARNING] This usually happens inside unprivileged Docker/LXC containers."
fi fi
} }
generate_secret() { generate_secret() {
if command -v openssl >/dev/null 2>&1; then secret="$(command -v openssl >/dev/null 2>&1 && openssl rand -hex 16 2>/dev/null || true)"
secret="$(openssl rand -hex 16 2>/dev/null)" && [ -n "$secret" ] && { echo "$secret"; return 0; } if [ -z "$secret" ] || [ "${#secret}" -ne 32 ]; then
if command -v od >/dev/null 2>&1; then secret="$(dd if=/dev/urandom bs=16 count=1 2>/dev/null | od -An -tx1 | tr -d ' \n')"
elif command -v hexdump >/dev/null 2>&1; then secret="$(dd if=/dev/urandom bs=16 count=1 2>/dev/null | hexdump -e '1/1 "%02x"')"
elif command -v xxd >/dev/null 2>&1; then secret="$(dd if=/dev/urandom bs=16 count=1 2>/dev/null | xxd -p | tr -d '\n')"
fi
fi fi
if command -v xxd >/dev/null 2>&1; then if [ "${#secret}" -eq 32 ]; then echo "$secret"; else return 1; fi
secret="$(dd if=/dev/urandom bs=1 count=16 2>/dev/null | xxd -p | tr -d '\n')" && [ -n "$secret" ] && { echo "$secret"; return 0; }
fi
secret="$(dd if=/dev/urandom bs=1 count=16 2>/dev/null | od -An -tx1 | tr -d ' \n')" && [ -n "$secret" ] && { echo "$secret"; return 0; }
return 1
} }
generate_config_content() { generate_config_content() {
escaped_tls_domain="$(printf '%s\n' "$TLS_DOMAIN" | tr -d '[:cntrl:]' | sed 's/\\/\\\\/g; s/"/\\"/g')"
cat <<EOF cat <<EOF
[general] [general]
use_middle_proxy = false use_middle_proxy = false
@ -297,7 +307,7 @@ listen = "127.0.0.1:9091"
whitelist = ["127.0.0.1/32"] whitelist = ["127.0.0.1/32"]
[censorship] [censorship]
tls_domain = "petrovich.ru" tls_domain = "${escaped_tls_domain}"
[access.users] [access.users]
hello = "$1" hello = "$1"
@ -305,44 +315,38 @@ EOF
} }
install_config() { install_config() {
config_exists=0
if [ -n "$SUDO" ]; then if [ -n "$SUDO" ]; then
$SUDO sh -c "[ -f '$CONFIG_FILE' ]" 2>/dev/null && config_exists=1 || true if $SUDO sh -c '[ -f "$1" ]' _ "$CONFIG_FILE"; then
else say " -> Config already exists at $CONFIG_FILE. Skipping creation."
[ -f "$CONFIG_FILE" ] && config_exists=1 || true return 0
fi fi
elif [ -f "$CONFIG_FILE" ]; then
if [ "$config_exists" -eq 1 ]; then say " -> Config already exists at $CONFIG_FILE. Skipping creation."
say "Config already exists, skipping generation."
return 0 return 0
fi fi
toml_secret="$(generate_secret)" || die "Failed to generate secret" toml_secret="$(generate_secret)" || die "Failed to generate secret."
say "Creating config at $CONFIG_FILE..."
tmp_conf="$(mktemp "${TEMP_DIR:-/tmp}/telemt_conf.XXXXXX")" || die "Failed to create temp config" generate_config_content "$toml_secret" | write_root "$CONFIG_FILE" || die "Failed to install config"
generate_config_content "$toml_secret" > "$tmp_conf" || die "Failed to write temp config" $SUDO chown root:telemt "$CONFIG_FILE" && $SUDO chmod 640 "$CONFIG_FILE"
$SUDO mv "$tmp_conf" "$CONFIG_FILE" || die "Failed to install config file" say " -> Config created successfully."
$SUDO chown root:telemt "$CONFIG_FILE" || die "Failed to set owner" say " -> Generated secret for default user 'hello': $toml_secret"
$SUDO chmod 640 "$CONFIG_FILE" || die "Failed to set config permissions"
say "Secret for user 'hello': $toml_secret"
} }
generate_systemd_content() { generate_systemd_content() {
cat <<EOF cat <<EOF
[Unit] [Unit]
Description=Telemt Proxy Service Description=Telemt
After=network-online.target After=network-online.target
Wants=network-online.target
[Service] [Service]
Type=simple Type=simple
User=telemt User=telemt
Group=telemt Group=telemt
WorkingDirectory=$WORK_DIR WorkingDirectory=$WORK_DIR
ExecStart=${INSTALL_DIR}/${BIN_NAME} ${CONFIG_FILE} ExecStart="${INSTALL_DIR}/${BIN_NAME}" "${CONFIG_FILE}"
Restart=on-failure Restart=on-failure
LimitNOFILE=65536 LimitNOFILE=65536
AmbientCapabilities=CAP_NET_BIND_SERVICE AmbientCapabilities=CAP_NET_BIND_SERVICE
@ -370,156 +374,183 @@ EOF
} }
install_service() { install_service() {
if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then svc="$(get_svc_mgr)"
say "Installing systemd service..." if [ "$svc" = "systemd" ]; then
tmp_svc="$(mktemp "${TEMP_DIR:-/tmp}/${SERVICE_NAME}.service.XXXXXX")" || die "Failed to create temp service" generate_systemd_content | write_root "/etc/systemd/system/${SERVICE_NAME}.service"
generate_systemd_content > "$tmp_svc" || die "Failed to generate service content" $SUDO chown root:root "/etc/systemd/system/${SERVICE_NAME}.service" && $SUDO chmod 644 "/etc/systemd/system/${SERVICE_NAME}.service"
$SUDO mv "$tmp_svc" "/etc/systemd/system/${SERVICE_NAME}.service" || die "Failed to move service file" $SUDO systemctl daemon-reload || true
$SUDO chown root:root "/etc/systemd/system/${SERVICE_NAME}.service" $SUDO systemctl enable "$SERVICE_NAME" || true
$SUDO chmod 644 "/etc/systemd/system/${SERVICE_NAME}.service"
if ! $SUDO systemctl start "$SERVICE_NAME"; then
say "[WARNING] Failed to start service"
SERVICE_START_FAILED=1
fi
elif [ "$svc" = "openrc" ]; then
generate_openrc_content | write_root "/etc/init.d/${SERVICE_NAME}"
$SUDO chown root:root "/etc/init.d/${SERVICE_NAME}" && $SUDO chmod 0755 "/etc/init.d/${SERVICE_NAME}"
$SUDO systemctl daemon-reload || die "Failed to reload systemd" $SUDO rc-update add "$SERVICE_NAME" default 2>/dev/null || true
$SUDO systemctl enable "$SERVICE_NAME" || die "Failed to enable service"
$SUDO systemctl start "$SERVICE_NAME" || die "Failed to start service" if ! $SUDO rc-service "$SERVICE_NAME" start 2>/dev/null; then
say "[WARNING] Failed to start service"
elif command -v rc-update >/dev/null 2>&1; then SERVICE_START_FAILED=1
say "Installing OpenRC service..." fi
tmp_svc="$(mktemp "${TEMP_DIR:-/tmp}/${SERVICE_NAME}.init.XXXXXX")" || die "Failed to create temp file"
generate_openrc_content > "$tmp_svc" || die "Failed to generate init content"
$SUDO mv "$tmp_svc" "/etc/init.d/${SERVICE_NAME}" || die "Failed to move service file"
$SUDO chown root:root "/etc/init.d/${SERVICE_NAME}"
$SUDO chmod 0755 "/etc/init.d/${SERVICE_NAME}"
$SUDO rc-update add "$SERVICE_NAME" default 2>/dev/null || die "Failed to register service"
$SUDO rc-service "$SERVICE_NAME" start 2>/dev/null || die "Failed to start OpenRC service"
else else
say "No service manager found. You can start it manually with:" cmd="\"${INSTALL_DIR}/${BIN_NAME}\" \"${CONFIG_FILE}\""
if [ -n "$SUDO" ]; then if [ -n "$SUDO" ]; then
say " sudo -u telemt ${INSTALL_DIR}/${BIN_NAME} ${CONFIG_FILE}" say " -> Service manager not found. Start manually: sudo -u telemt $cmd"
else else
say " su -s /bin/sh telemt -c '${INSTALL_DIR}/${BIN_NAME} ${CONFIG_FILE}'" say " -> Service manager not found. Start manually: su -s /bin/sh telemt -c '$cmd'"
fi fi
fi fi
} }
kill_user_procs() { kill_user_procs() {
say "Ensuring $BIN_NAME processes are killed..." if command -v pkill >/dev/null 2>&1; then
$SUDO pkill -u telemt "$BIN_NAME" 2>/dev/null || true
if pkill_cmd="$(command -v pkill 2>/dev/null)"; then
$SUDO "$pkill_cmd" -u telemt "$BIN_NAME" 2>/dev/null || true
sleep 1 sleep 1
$SUDO "$pkill_cmd" -9 -u telemt "$BIN_NAME" 2>/dev/null || true $SUDO pkill -9 -u telemt "$BIN_NAME" 2>/dev/null || true
elif killall_cmd="$(command -v killall 2>/dev/null)"; then else
$SUDO "$killall_cmd" "$BIN_NAME" 2>/dev/null || true if command -v pgrep >/dev/null 2>&1; then
sleep 1 pids="$(pgrep -u telemt 2>/dev/null || true)"
$SUDO "$killall_cmd" -9 "$BIN_NAME" 2>/dev/null || true else
pids="$(ps -u telemt -o pid= 2>/dev/null || true)"
fi
if [ -n "$pids" ]; then
for pid in $pids; do
case "$pid" in ''|*[!0-9]*) continue ;; *) $SUDO kill "$pid" 2>/dev/null || true ;; esac
done
sleep 1
for pid in $pids; do
case "$pid" in ''|*[!0-9]*) continue ;; *) $SUDO kill -9 "$pid" 2>/dev/null || true ;; esac
done
fi
fi fi
} }
uninstall() { uninstall() {
purge_data=0 say "Starting uninstallation of $BIN_NAME..."
[ "$ACTION" = "purge" ] && purge_data=1
say "Uninstalling $BIN_NAME..." say ">>> Stage 1: Stopping services"
stop_service stop_service
if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then say ">>> Stage 2: Removing service configuration"
svc="$(get_svc_mgr)"
if [ "$svc" = "systemd" ]; then
$SUDO systemctl disable "$SERVICE_NAME" 2>/dev/null || true $SUDO systemctl disable "$SERVICE_NAME" 2>/dev/null || true
$SUDO rm -f "/etc/systemd/system/${SERVICE_NAME}.service" $SUDO rm -f "/etc/systemd/system/${SERVICE_NAME}.service"
$SUDO systemctl daemon-reload || true $SUDO systemctl daemon-reload 2>/dev/null || true
elif command -v rc-update >/dev/null 2>&1; then elif [ "$svc" = "openrc" ]; then
$SUDO rc-update del "$SERVICE_NAME" 2>/dev/null || true $SUDO rc-update del "$SERVICE_NAME" 2>/dev/null || true
$SUDO rm -f "/etc/init.d/${SERVICE_NAME}" $SUDO rm -f "/etc/init.d/${SERVICE_NAME}"
fi fi
say ">>> Stage 3: Terminating user processes"
kill_user_procs kill_user_procs
say ">>> Stage 4: Removing binary"
$SUDO rm -f "${INSTALL_DIR}/${BIN_NAME}" $SUDO rm -f "${INSTALL_DIR}/${BIN_NAME}"
$SUDO userdel telemt 2>/dev/null || $SUDO deluser telemt 2>/dev/null || true if [ "$ACTION" = "purge" ]; then
$SUDO groupdel telemt 2>/dev/null || $SUDO delgroup telemt 2>/dev/null || true say ">>> Stage 5: Purging configuration, data, and user"
if [ "$purge_data" -eq 1 ]; then
say "Purging configuration and data..."
$SUDO rm -rf "$CONFIG_DIR" "$WORK_DIR" $SUDO rm -rf "$CONFIG_DIR" "$WORK_DIR"
$SUDO rm -f "$CONFIG_FILE"
if [ "$CONFIG_PARENT_DIR" != "$CONFIG_DIR" ] && [ "$CONFIG_PARENT_DIR" != "." ] && [ "$CONFIG_PARENT_DIR" != "/" ]; then
$SUDO rmdir "$CONFIG_PARENT_DIR" 2>/dev/null || true
fi
$SUDO userdel telemt 2>/dev/null || $SUDO deluser telemt 2>/dev/null || true
$SUDO groupdel telemt 2>/dev/null || $SUDO delgroup telemt 2>/dev/null || true
else else
say "Note: Configuration in $CONFIG_DIR was kept. Run with '--purge' to remove it." say "Note: Configuration and user kept. Run with 'purge' to remove completely."
fi fi
say "Uninstallation complete." printf '\n====================================================================\n'
printf ' UNINSTALLATION COMPLETE\n'
printf '====================================================================\n\n'
exit 0 exit 0
} }
# ============================================================================
# Main Entry Point
# ============================================================================
case "$ACTION" in case "$ACTION" in
help) help) show_help ;;
show_help uninstall|purge) verify_common; uninstall ;;
;;
uninstall|purge)
verify_common
uninstall
;;
install) install)
say "Starting installation..." say "Starting installation of $BIN_NAME (Version: $TARGET_VERSION)"
verify_common
verify_install_deps
ARCH="$(detect_arch)" say ">>> Stage 1: Verifying environment and dependencies"
LIBC="$(detect_libc)" verify_common; verify_install_deps
say "Detected system: $ARCH-linux-$LIBC"
if [ "$TARGET_VERSION" != "latest" ]; then
TARGET_VERSION="${TARGET_VERSION#v}"
fi
ARCH="$(detect_arch)"; LIBC="$(detect_libc)"
FILE_NAME="${BIN_NAME}-${ARCH}-linux-${LIBC}.tar.gz" FILE_NAME="${BIN_NAME}-${ARCH}-linux-${LIBC}.tar.gz"
FILE_NAME="$(printf '%s' "$FILE_NAME" | tr -d ' \t\n\r')"
if [ "$TARGET_VERSION" = "latest" ]; then if [ "$TARGET_VERSION" = "latest" ]; then
DL_URL="https://github.com/${REPO}/releases/latest/download/${FILE_NAME}" DL_URL="https://github.com/${REPO}/releases/latest/download/${FILE_NAME}"
else else
DL_URL="https://github.com/${REPO}/releases/download/${TARGET_VERSION}/${FILE_NAME}" DL_URL="https://github.com/${REPO}/releases/download/${TARGET_VERSION}/${FILE_NAME}"
fi fi
TEMP_DIR="$(mktemp -d)" || die "Failed to create temp directory" say ">>> Stage 2: Downloading archive"
TEMP_DIR="$(mktemp -d)" || die "Temp directory creation failed"
if [ -z "$TEMP_DIR" ] || [ ! -d "$TEMP_DIR" ]; then if [ -z "$TEMP_DIR" ] || [ ! -d "$TEMP_DIR" ]; then
die "Temp directory creation failed" die "Temp directory is invalid or was not created"
fi fi
say "Downloading from $DL_URL..." fetch_file "$DL_URL" "${TEMP_DIR}/${FILE_NAME}" || die "Download failed"
fetch_file "$DL_URL" "${TEMP_DIR}/archive.tar.gz" || die "Download failed (check version or network)"
gzip -dc "${TEMP_DIR}/archive.tar.gz" | tar -xf - -C "$TEMP_DIR" || die "Extraction failed" say ">>> Stage 3: Extracting archive"
if ! gzip -dc "${TEMP_DIR}/${FILE_NAME}" | tar -xf - -C "$TEMP_DIR" 2>/dev/null; then
die "Extraction failed (downloaded archive might be invalid or 404)."
fi
EXTRACTED_BIN="$(find "$TEMP_DIR" -type f -name "$BIN_NAME" -print 2>/dev/null | head -n 1)" EXTRACTED_BIN="$(find "$TEMP_DIR" -type f -name "$BIN_NAME" -print 2>/dev/null | head -n 1 || true)"
[ -z "$EXTRACTED_BIN" ] && die "Binary '$BIN_NAME' not found in archive" [ -n "$EXTRACTED_BIN" ] || die "Binary '$BIN_NAME' not found in archive"
ensure_user_group say ">>> Stage 4: Setting up environment (User, Group, Directories)"
setup_dirs ensure_user_group; setup_dirs; stop_service
stop_service
say ">>> Stage 5: Installing binary"
say "Installing binary..."
install_binary "$EXTRACTED_BIN" "${INSTALL_DIR}/${BIN_NAME}" install_binary "$EXTRACTED_BIN" "${INSTALL_DIR}/${BIN_NAME}"
say ">>> Stage 6: Generating configuration"
install_config install_config
say ">>> Stage 7: Installing and starting service"
install_service install_service
say "" if [ "${SERVICE_START_FAILED:-0}" -eq 1 ]; then
say "=============================================" printf '\n====================================================================\n'
say "Installation complete!" printf ' INSTALLATION COMPLETED WITH WARNINGS\n'
say "=============================================" printf '====================================================================\n\n'
if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then printf 'The service was installed but failed to start automatically.\n'
say "To check the logs, run:" printf 'Please check the logs to determine the issue.\n\n'
say " journalctl -u $SERVICE_NAME -f"
say ""
fi
say "To get user connection links, run:"
if command -v jq >/dev/null 2>&1; then
say " curl -s http://127.0.0.1:9091/v1/users | jq -r '.data[] | \"User: \\(.username)\\n\\(.links.tls[0] // empty)\"'"
else else
say " curl -s http://127.0.0.1:9091/v1/users" printf '\n====================================================================\n'
say " (Note: Install 'jq' package to see the links nicely formatted)" printf ' INSTALLATION SUCCESS\n'
printf '====================================================================\n\n'
fi fi
svc="$(get_svc_mgr)"
if [ "$svc" = "systemd" ]; then
printf 'To check the status of your proxy service, run:\n'
printf ' systemctl status %s\n\n' "$SERVICE_NAME"
elif [ "$svc" = "openrc" ]; then
printf 'To check the status of your proxy service, run:\n'
printf ' rc-service %s status\n\n' "$SERVICE_NAME"
fi
printf 'To get your user connection links (for Telegram), run:\n'
if command -v jq >/dev/null 2>&1; then
printf ' curl -s http://127.0.0.1:9091/v1/users | jq -r '\''.data[] | "User: \\(.username)\\n\\(.links.tls[0] // empty)\\n"'\''\n'
else
printf ' curl -s http://127.0.0.1:9091/v1/users\n'
printf ' (Tip: Install '\''jq'\'' for a much cleaner output)\n'
fi
printf '\n====================================================================\n'
;; ;;
esac esac

View File

@ -364,6 +364,7 @@ pub(super) struct MinimalMeRuntimeData {
pub(super) me_reconnect_backoff_cap_ms: u64, pub(super) me_reconnect_backoff_cap_ms: u64,
pub(super) me_reconnect_fast_retry_count: u32, pub(super) me_reconnect_fast_retry_count: u32,
pub(super) me_pool_drain_ttl_secs: u64, pub(super) me_pool_drain_ttl_secs: u64,
pub(super) me_instadrain: bool,
pub(super) me_pool_drain_soft_evict_enabled: bool, pub(super) me_pool_drain_soft_evict_enabled: bool,
pub(super) me_pool_drain_soft_evict_grace_secs: u64, pub(super) me_pool_drain_soft_evict_grace_secs: u64,
pub(super) me_pool_drain_soft_evict_per_writer: u8, pub(super) me_pool_drain_soft_evict_per_writer: u8,

View File

@ -431,6 +431,7 @@ async fn get_minimal_payload_cached(
me_reconnect_backoff_cap_ms: runtime.me_reconnect_backoff_cap_ms, me_reconnect_backoff_cap_ms: runtime.me_reconnect_backoff_cap_ms,
me_reconnect_fast_retry_count: runtime.me_reconnect_fast_retry_count, me_reconnect_fast_retry_count: runtime.me_reconnect_fast_retry_count,
me_pool_drain_ttl_secs: runtime.me_pool_drain_ttl_secs, me_pool_drain_ttl_secs: runtime.me_pool_drain_ttl_secs,
me_instadrain: runtime.me_instadrain,
me_pool_drain_soft_evict_enabled: runtime.me_pool_drain_soft_evict_enabled, me_pool_drain_soft_evict_enabled: runtime.me_pool_drain_soft_evict_enabled,
me_pool_drain_soft_evict_grace_secs: runtime.me_pool_drain_soft_evict_grace_secs, me_pool_drain_soft_evict_grace_secs: runtime.me_pool_drain_soft_evict_grace_secs,
me_pool_drain_soft_evict_per_writer: runtime.me_pool_drain_soft_evict_per_writer, me_pool_drain_soft_evict_per_writer: runtime.me_pool_drain_soft_evict_per_writer,

View File

@ -198,6 +198,7 @@ desync_all_full = false
update_every = 43200 update_every = 43200
hardswap = false hardswap = false
me_pool_drain_ttl_secs = 90 me_pool_drain_ttl_secs = 90
me_instadrain = false
me_pool_min_fresh_ratio = 0.8 me_pool_min_fresh_ratio = 0.8
me_reinit_drain_timeout_secs = 120 me_reinit_drain_timeout_secs = 120

View File

@ -613,6 +613,10 @@ pub(crate) fn default_me_pool_drain_ttl_secs() -> u64 {
90 90
} }
pub(crate) fn default_me_instadrain() -> bool {
false
}
pub(crate) fn default_me_pool_drain_threshold() -> u64 { pub(crate) fn default_me_pool_drain_threshold() -> u64 {
128 128
} }

View File

@ -56,6 +56,7 @@ pub struct HotFields {
pub me_reinit_coalesce_window_ms: u64, pub me_reinit_coalesce_window_ms: u64,
pub hardswap: bool, pub hardswap: bool,
pub me_pool_drain_ttl_secs: u64, pub me_pool_drain_ttl_secs: u64,
pub me_instadrain: bool,
pub me_pool_drain_threshold: u64, pub me_pool_drain_threshold: u64,
pub me_pool_drain_soft_evict_enabled: bool, pub me_pool_drain_soft_evict_enabled: bool,
pub me_pool_drain_soft_evict_grace_secs: u64, pub me_pool_drain_soft_evict_grace_secs: u64,
@ -143,6 +144,7 @@ impl HotFields {
me_reinit_coalesce_window_ms: cfg.general.me_reinit_coalesce_window_ms, me_reinit_coalesce_window_ms: cfg.general.me_reinit_coalesce_window_ms,
hardswap: cfg.general.hardswap, hardswap: cfg.general.hardswap,
me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs, me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs,
me_instadrain: cfg.general.me_instadrain,
me_pool_drain_threshold: cfg.general.me_pool_drain_threshold, me_pool_drain_threshold: cfg.general.me_pool_drain_threshold,
me_pool_drain_soft_evict_enabled: cfg.general.me_pool_drain_soft_evict_enabled, me_pool_drain_soft_evict_enabled: cfg.general.me_pool_drain_soft_evict_enabled,
me_pool_drain_soft_evict_grace_secs: cfg.general.me_pool_drain_soft_evict_grace_secs, me_pool_drain_soft_evict_grace_secs: cfg.general.me_pool_drain_soft_evict_grace_secs,
@ -477,6 +479,7 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
cfg.general.me_reinit_coalesce_window_ms = new.general.me_reinit_coalesce_window_ms; cfg.general.me_reinit_coalesce_window_ms = new.general.me_reinit_coalesce_window_ms;
cfg.general.hardswap = new.general.hardswap; cfg.general.hardswap = new.general.hardswap;
cfg.general.me_pool_drain_ttl_secs = new.general.me_pool_drain_ttl_secs; cfg.general.me_pool_drain_ttl_secs = new.general.me_pool_drain_ttl_secs;
cfg.general.me_instadrain = new.general.me_instadrain;
cfg.general.me_pool_drain_threshold = new.general.me_pool_drain_threshold; cfg.general.me_pool_drain_threshold = new.general.me_pool_drain_threshold;
cfg.general.me_pool_drain_soft_evict_enabled = new.general.me_pool_drain_soft_evict_enabled; cfg.general.me_pool_drain_soft_evict_enabled = new.general.me_pool_drain_soft_evict_enabled;
cfg.general.me_pool_drain_soft_evict_grace_secs = cfg.general.me_pool_drain_soft_evict_grace_secs =
@ -869,6 +872,12 @@ fn log_changes(
old_hot.me_pool_drain_ttl_secs, new_hot.me_pool_drain_ttl_secs, old_hot.me_pool_drain_ttl_secs, new_hot.me_pool_drain_ttl_secs,
); );
} }
if old_hot.me_instadrain != new_hot.me_instadrain {
info!(
"config reload: me_instadrain: {} → {}",
old_hot.me_instadrain, new_hot.me_instadrain,
);
}
if old_hot.me_pool_drain_threshold != new_hot.me_pool_drain_threshold { if old_hot.me_pool_drain_threshold != new_hot.me_pool_drain_threshold {
info!( info!(

View File

@ -612,6 +612,11 @@ impl ProxyConfig {
"general.me_route_backpressure_base_timeout_ms must be > 0".to_string(), "general.me_route_backpressure_base_timeout_ms must be > 0".to_string(),
)); ));
} }
if config.general.me_route_backpressure_base_timeout_ms > 5000 {
return Err(ProxyError::Config(
"general.me_route_backpressure_base_timeout_ms must be within [1, 5000]".to_string(),
));
}
if config.general.me_route_backpressure_high_timeout_ms if config.general.me_route_backpressure_high_timeout_ms
< config.general.me_route_backpressure_base_timeout_ms < config.general.me_route_backpressure_base_timeout_ms
@ -620,6 +625,11 @@ impl ProxyConfig {
"general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(), "general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(),
)); ));
} }
if config.general.me_route_backpressure_high_timeout_ms > 5000 {
return Err(ProxyError::Config(
"general.me_route_backpressure_high_timeout_ms must be within [1, 5000]".to_string(),
));
}
if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) { if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) {
return Err(ProxyError::Config( return Err(ProxyError::Config(
@ -1624,6 +1634,47 @@ mod tests {
let _ = std::fs::remove_file(path_valid); let _ = std::fs::remove_file(path_valid);
} }
#[test]
fn me_route_backpressure_base_timeout_ms_out_of_range_is_rejected() {
let toml = r#"
[general]
me_route_backpressure_base_timeout_ms = 5001
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_route_backpressure_base_timeout_ms_out_of_range_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.me_route_backpressure_base_timeout_ms must be within [1, 5000]"));
let _ = std::fs::remove_file(path);
}
#[test]
fn me_route_backpressure_high_timeout_ms_out_of_range_is_rejected() {
let toml = r#"
[general]
me_route_backpressure_base_timeout_ms = 100
me_route_backpressure_high_timeout_ms = 5001
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_route_backpressure_high_timeout_ms_out_of_range_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.me_route_backpressure_high_timeout_ms must be within [1, 5000]"));
let _ = std::fs::remove_file(path);
}
#[test] #[test]
fn me_route_no_writer_wait_ms_out_of_range_is_rejected() { fn me_route_no_writer_wait_ms_out_of_range_is_rejected() {
let toml = r#" let toml = r#"

View File

@ -812,6 +812,10 @@ pub struct GeneralConfig {
#[serde(default = "default_me_pool_drain_ttl_secs")] #[serde(default = "default_me_pool_drain_ttl_secs")]
pub me_pool_drain_ttl_secs: u64, pub me_pool_drain_ttl_secs: u64,
/// Force-remove any draining writer on the next cleanup tick, regardless of age/deadline.
#[serde(default = "default_me_instadrain")]
pub me_instadrain: bool,
/// Maximum allowed number of draining ME writers before oldest ones are force-closed in batches. /// Maximum allowed number of draining ME writers before oldest ones are force-closed in batches.
/// Set to 0 to disable threshold-based draining cleanup and keep timeout-only behavior. /// Set to 0 to disable threshold-based draining cleanup and keep timeout-only behavior.
#[serde(default = "default_me_pool_drain_threshold")] #[serde(default = "default_me_pool_drain_threshold")]
@ -1020,6 +1024,7 @@ impl Default for GeneralConfig {
me_secret_atomic_snapshot: default_me_secret_atomic_snapshot(), me_secret_atomic_snapshot: default_me_secret_atomic_snapshot(),
proxy_secret_len_max: default_proxy_secret_len_max(), proxy_secret_len_max: default_proxy_secret_len_max(),
me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(), me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(),
me_instadrain: default_me_instadrain(),
me_pool_drain_threshold: default_me_pool_drain_threshold(), me_pool_drain_threshold: default_me_pool_drain_threshold(),
me_pool_drain_soft_evict_enabled: default_me_pool_drain_soft_evict_enabled(), me_pool_drain_soft_evict_enabled: default_me_pool_drain_soft_evict_enabled(),
me_pool_drain_soft_evict_grace_secs: default_me_pool_drain_soft_evict_grace_secs(), me_pool_drain_soft_evict_grace_secs: default_me_pool_drain_soft_evict_grace_secs(),

View File

@ -237,6 +237,7 @@ pub(crate) async fn initialize_me_pool(
config.general.me_adaptive_floor_max_warm_writers_global, config.general.me_adaptive_floor_max_warm_writers_global,
config.general.hardswap, config.general.hardswap,
config.general.me_pool_drain_ttl_secs, config.general.me_pool_drain_ttl_secs,
config.general.me_instadrain,
config.general.me_pool_drain_threshold, config.general.me_pool_drain_threshold,
config.general.me_pool_drain_soft_evict_enabled, config.general.me_pool_drain_soft_evict_enabled,
config.general.me_pool_drain_soft_evict_grace_secs, config.general.me_pool_drain_soft_evict_grace_secs,
@ -342,6 +343,13 @@ pub(crate) async fn initialize_me_pool(
) )
.await; .await;
}); });
let pool_drain_enforcer = pool_bg.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(
pool_drain_enforcer,
)
.await;
});
break; break;
} }
Err(e) => { Err(e) => {
@ -409,6 +417,13 @@ pub(crate) async fn initialize_me_pool(
) )
.await; .await;
}); });
let pool_drain_enforcer = pool.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(
pool_drain_enforcer,
)
.await;
});
break Some(pool); break Some(pool);
} }

View File

@ -1692,6 +1692,57 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
} }
); );
let _ = writeln!(
out,
"# HELP telemt_me_writer_close_signal_drop_total Close-signal drops for already-removed ME writers"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_close_signal_drop_total counter");
let _ = writeln!(
out,
"telemt_me_writer_close_signal_drop_total {}",
if me_allows_normal {
stats.get_me_writer_close_signal_drop_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_close_signal_channel_full_total Close-signal drops caused by full writer command channels"
);
let _ = writeln!(
out,
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
);
let _ = writeln!(
out,
"telemt_me_writer_close_signal_channel_full_total {}",
if me_allows_normal {
stats.get_me_writer_close_signal_channel_full_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_draining_writers_reap_progress_total Draining-writer removals processed by reap cleanup"
);
let _ = writeln!(
out,
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
);
let _ = writeln!(
out,
"telemt_me_draining_writers_reap_progress_total {}",
if me_allows_normal {
stats.get_me_draining_writers_reap_progress_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals"); let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals");
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter"); let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter");
let _ = writeln!( let _ = writeln!(
@ -2124,6 +2175,13 @@ mod tests {
assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter")); assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"));
assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter")); assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter"));
assert!(output.contains(
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
));
assert!(output.contains(
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
));
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter")); assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter"));
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter")); assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter"));
assert!(output.contains( assert!(output.contains(

View File

@ -123,6 +123,9 @@ pub struct Stats {
pool_drain_soft_evict_total: AtomicU64, pool_drain_soft_evict_total: AtomicU64,
pool_drain_soft_evict_writer_total: AtomicU64, pool_drain_soft_evict_writer_total: AtomicU64,
pool_stale_pick_total: AtomicU64, pool_stale_pick_total: AtomicU64,
me_writer_close_signal_drop_total: AtomicU64,
me_writer_close_signal_channel_full_total: AtomicU64,
me_draining_writers_reap_progress_total: AtomicU64,
me_writer_removed_total: AtomicU64, me_writer_removed_total: AtomicU64,
me_writer_removed_unexpected_total: AtomicU64, me_writer_removed_unexpected_total: AtomicU64,
me_refill_triggered_total: AtomicU64, me_refill_triggered_total: AtomicU64,
@ -734,6 +737,24 @@ impl Stats {
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed); self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
} }
} }
pub fn increment_me_writer_close_signal_drop_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_close_signal_drop_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_close_signal_channel_full_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_close_signal_channel_full_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_draining_writers_reap_progress_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_draining_writers_reap_progress_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_removed_total(&self) { pub fn increment_me_writer_removed_total(&self) {
if self.telemetry_me_allows_debug() { if self.telemetry_me_allows_debug() {
self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed); self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed);
@ -1259,6 +1280,17 @@ impl Stats {
pub fn get_pool_stale_pick_total(&self) -> u64 { pub fn get_pool_stale_pick_total(&self) -> u64 {
self.pool_stale_pick_total.load(Ordering::Relaxed) self.pool_stale_pick_total.load(Ordering::Relaxed)
} }
pub fn get_me_writer_close_signal_drop_total(&self) -> u64 {
self.me_writer_close_signal_drop_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_close_signal_channel_full_total(&self) -> u64 {
self.me_writer_close_signal_channel_full_total
.load(Ordering::Relaxed)
}
pub fn get_me_draining_writers_reap_progress_total(&self) -> u64 {
self.me_draining_writers_reap_progress_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_removed_total(&self) -> u64 { pub fn get_me_writer_removed_total(&self) -> u64 {
self.me_writer_removed_total.load(Ordering::Relaxed) self.me_writer_removed_total.load(Ordering::Relaxed)
} }

View File

@ -298,6 +298,7 @@ async fn run_update_cycle(
pool.update_runtime_reinit_policy( pool.update_runtime_reinit_policy(
cfg.general.hardswap, cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs, cfg.general.me_pool_drain_ttl_secs,
cfg.general.me_instadrain,
cfg.general.me_pool_drain_threshold, cfg.general.me_pool_drain_threshold,
cfg.general.me_pool_drain_soft_evict_enabled, cfg.general.me_pool_drain_soft_evict_enabled,
cfg.general.me_pool_drain_soft_evict_grace_secs, cfg.general.me_pool_drain_soft_evict_grace_secs,
@ -530,6 +531,7 @@ pub async fn me_config_updater(
pool.update_runtime_reinit_policy( pool.update_runtime_reinit_policy(
cfg.general.hardswap, cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs, cfg.general.me_pool_drain_ttl_secs,
cfg.general.me_instadrain,
cfg.general.me_pool_drain_threshold, cfg.general.me_pool_drain_threshold,
cfg.general.me_pool_drain_soft_evict_enabled, cfg.general.me_pool_drain_soft_evict_enabled,
cfg.general.me_pool_drain_soft_evict_grace_secs, cfg.general.me_pool_drain_soft_evict_grace_secs,

View File

@ -12,6 +12,7 @@ use crate::crypto::SecureRandom;
use crate::network::IpFamily; use crate::network::IpFamily;
use super::MePool; use super::MePool;
use super::pool::MeWriter;
const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff
#[allow(dead_code)] #[allow(dead_code)]
@ -30,6 +31,8 @@ const HEALTH_DRAIN_CLOSE_BUDGET_MIN: usize = 16;
const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256; const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256;
const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MIN: usize = 8; const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MIN: usize = 8;
const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MAX: usize = 256; const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MAX: usize = 256;
const HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS: u64 = 1;
const HEALTH_DRAIN_TIMEOUT_ENFORCER_INTERVAL_SECS: u64 = 1;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct DcFloorPlanEntry { struct DcFloorPlanEntry {
@ -99,6 +102,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut adaptive_idle_since, &mut adaptive_idle_since,
&mut adaptive_recover_until, &mut adaptive_recover_until,
&mut floor_warn_next_allowed, &mut floor_warn_next_allowed,
&mut drain_warn_next_allowed,
&mut drain_soft_evict_next_allowed,
) )
.await; .await;
let v6_degraded = check_family( let v6_degraded = check_family(
@ -116,12 +121,63 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut adaptive_idle_since, &mut adaptive_idle_since,
&mut adaptive_recover_until, &mut adaptive_recover_until,
&mut floor_warn_next_allowed, &mut floor_warn_next_allowed,
&mut drain_warn_next_allowed,
&mut drain_soft_evict_next_allowed,
) )
.await; .await;
degraded_interval = v4_degraded || v6_degraded; degraded_interval = v4_degraded || v6_degraded;
} }
} }
pub async fn me_drain_timeout_enforcer(pool: Arc<MePool>) {
let mut drain_warn_next_allowed: HashMap<u64, Instant> = HashMap::new();
let mut drain_soft_evict_next_allowed: HashMap<u64, Instant> = HashMap::new();
loop {
tokio::time::sleep(Duration::from_secs(
HEALTH_DRAIN_TIMEOUT_ENFORCER_INTERVAL_SECS,
))
.await;
reap_draining_writers(
&pool,
&mut drain_warn_next_allowed,
&mut drain_soft_evict_next_allowed,
)
.await;
}
}
fn draining_writer_timeout_expired(
pool: &MePool,
writer: &MeWriter,
now_epoch_secs: u64,
drain_ttl_secs: u64,
) -> bool {
if pool
.me_instadrain
.load(std::sync::atomic::Ordering::Relaxed)
{
return true;
}
let deadline_epoch_secs = writer
.drain_deadline_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if deadline_epoch_secs != 0 {
return now_epoch_secs >= deadline_epoch_secs;
}
if drain_ttl_secs == 0 {
return false;
}
let drain_started_at_epoch_secs = writer
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if drain_started_at_epoch_secs == 0 {
return false;
}
now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > drain_ttl_secs
}
pub(super) async fn reap_draining_writers( pub(super) async fn reap_draining_writers(
pool: &Arc<MePool>, pool: &Arc<MePool>,
warn_next_allowed: &mut HashMap<u64, Instant>, warn_next_allowed: &mut HashMap<u64, Instant>,
@ -137,11 +193,16 @@ pub(super) async fn reap_draining_writers(
let activity = pool.registry.writer_activity_snapshot().await; let activity = pool.registry.writer_activity_snapshot().await;
let mut draining_writers = Vec::new(); let mut draining_writers = Vec::new();
let mut empty_writer_ids = Vec::<u64>::new(); let mut empty_writer_ids = Vec::<u64>::new();
let mut timeout_expired_writer_ids = Vec::<u64>::new();
let mut force_close_writer_ids = Vec::<u64>::new(); let mut force_close_writer_ids = Vec::<u64>::new();
for writer in writers { for writer in writers {
if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) { if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) {
continue; continue;
} }
if draining_writer_timeout_expired(pool, &writer, now_epoch_secs, drain_ttl_secs) {
timeout_expired_writer_ids.push(writer.id);
continue;
}
if activity if activity
.bound_clients_by_writer .bound_clients_by_writer
.get(&writer.id) .get(&writer.id)
@ -207,14 +268,6 @@ pub(super) async fn reap_draining_writers(
"ME draining writer remains non-empty past drain TTL" "ME draining writer remains non-empty past drain TTL"
); );
} }
let deadline_epoch_secs = writer
.drain_deadline_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if deadline_epoch_secs != 0 && now_epoch_secs >= deadline_epoch_secs {
warn!(writer_id = writer.id, "Drain timeout, force-closing");
force_close_writer_ids.push(writer.id);
active_draining_writer_ids.remove(&writer.id);
}
} }
warn_next_allowed.retain(|writer_id, _| active_draining_writer_ids.contains(writer_id)); warn_next_allowed.retain(|writer_id, _| active_draining_writer_ids.contains(writer_id));
@ -299,11 +352,21 @@ pub(super) async fn reap_draining_writers(
} }
} }
let close_budget = health_drain_close_budget(); let mut closed_writer_ids = HashSet::<u64>::new();
for writer_id in timeout_expired_writer_ids {
if !closed_writer_ids.insert(writer_id) {
continue;
}
pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer_id).await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
}
let requested_force_close = force_close_writer_ids.len(); let requested_force_close = force_close_writer_ids.len();
let requested_empty_close = empty_writer_ids.len(); let requested_empty_close = empty_writer_ids.len();
let requested_close_total = requested_force_close.saturating_add(requested_empty_close); let requested_close_total = requested_force_close.saturating_add(requested_empty_close);
let mut closed_writer_ids = HashSet::<u64>::new(); let close_budget = health_drain_close_budget();
let mut closed_total = 0usize; let mut closed_total = 0usize;
for writer_id in force_close_writer_ids { for writer_id in force_close_writer_ids {
if closed_total >= close_budget { if closed_total >= close_budget {
@ -314,6 +377,8 @@ pub(super) async fn reap_draining_writers(
} }
pool.stats.increment_pool_force_close_total(); pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer_id).await; pool.remove_writer_and_close_clients(writer_id).await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
closed_total = closed_total.saturating_add(1); closed_total = closed_total.saturating_add(1);
} }
for writer_id in empty_writer_ids { for writer_id in empty_writer_ids {
@ -324,6 +389,8 @@ pub(super) async fn reap_draining_writers(
continue; continue;
} }
pool.remove_writer_and_close_clients(writer_id).await; pool.remove_writer_and_close_clients(writer_id).await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
closed_total = closed_total.saturating_add(1); closed_total = closed_total.saturating_add(1);
} }
@ -392,6 +459,8 @@ async fn check_family(
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>, floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>,
drain_warn_next_allowed: &mut HashMap<u64, Instant>,
drain_soft_evict_next_allowed: &mut HashMap<u64, Instant>,
) -> bool { ) -> bool {
let enabled = match family { let enabled = match family {
IpFamily::V4 => pool.decision.ipv4_me, IpFamily::V4 => pool.decision.ipv4_me,
@ -472,8 +541,15 @@ async fn check_family(
floor_plan.active_writers_current, floor_plan.active_writers_current,
floor_plan.warm_writers_current, floor_plan.warm_writers_current,
); );
let mut next_drain_reap_at = Instant::now();
for (dc, endpoints) in dc_endpoints { for (dc, endpoints) in dc_endpoints {
if Instant::now() >= next_drain_reap_at {
reap_draining_writers(pool, drain_warn_next_allowed, drain_soft_evict_next_allowed)
.await;
next_drain_reap_at = Instant::now()
+ Duration::from_secs(HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS);
}
if endpoints.is_empty() { if endpoints.is_empty() {
continue; continue;
} }
@ -617,6 +693,12 @@ async fn check_family(
let mut restored = 0usize; let mut restored = 0usize;
for _ in 0..missing { for _ in 0..missing {
if Instant::now() >= next_drain_reap_at {
reap_draining_writers(pool, drain_warn_next_allowed, drain_soft_evict_next_allowed)
.await;
next_drain_reap_at = Instant::now()
+ Duration::from_secs(HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS);
}
if reconnect_budget == 0 { if reconnect_budget == 0 {
break; break;
} }
@ -1544,6 +1626,7 @@ mod tests {
general.me_adaptive_floor_max_warm_writers_global, general.me_adaptive_floor_max_warm_writers_global,
general.hardswap, general.hardswap,
general.me_pool_drain_ttl_secs, general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold, general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs, general.me_pool_drain_soft_evict_grace_secs,

View File

@ -81,6 +81,7 @@ async fn make_pool(
general.me_adaptive_floor_max_warm_writers_global, general.me_adaptive_floor_max_warm_writers_global,
general.hardswap, general.hardswap,
general.me_pool_drain_ttl_secs, general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold, general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs, general.me_pool_drain_soft_evict_grace_secs,
@ -213,7 +214,7 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle
insert_draining_writer( insert_draining_writer(
&pool, &pool,
writer_id, writer_id,
now_epoch_secs.saturating_sub(600).saturating_add(writer_id), now_epoch_secs.saturating_sub(20),
1, 1,
0, 0,
) )
@ -230,7 +231,7 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle
} }
assert_eq!(writer_count(&pool).await, threshold as usize); assert_eq!(writer_count(&pool).await, threshold as usize);
assert_eq!(sorted_writer_ids(&pool).await, vec![58, 59, 60]); assert_eq!(sorted_writer_ids(&pool).await, vec![1, 2, 3]);
} }
#[tokio::test] #[tokio::test]

View File

@ -80,6 +80,7 @@ async fn make_pool(
general.me_adaptive_floor_max_warm_writers_global, general.me_adaptive_floor_max_warm_writers_global,
general.hardswap, general.hardswap,
general.me_pool_drain_ttl_secs, general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold, general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs, general.me_pool_drain_soft_evict_grace_secs,

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use bytes::Bytes;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -73,6 +74,7 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
general.me_adaptive_floor_max_warm_writers_global, general.me_adaptive_floor_max_warm_writers_global,
general.hardswap, general.hardswap,
general.me_pool_drain_ttl_secs, general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold, general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs, general.me_pool_drain_soft_evict_grace_secs,
@ -179,8 +181,14 @@ async fn current_writer_ids(pool: &Arc<MePool>) -> Vec<u64> {
async fn reap_draining_writers_drops_warn_state_for_removed_writer() { async fn reap_draining_writers_drops_warn_state_for_removed_writer() {
let pool = make_pool(128).await; let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs(); let now_epoch_secs = MePool::now_epoch_secs();
let conn_ids = let conn_ids = insert_draining_writer(
insert_draining_writer(&pool, 7, now_epoch_secs.saturating_sub(180), 1, 0).await; &pool,
7,
now_epoch_secs.saturating_sub(180),
1,
now_epoch_secs.saturating_add(3_600),
)
.await;
let mut warn_next_allowed = HashMap::new(); let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new();
@ -209,6 +217,89 @@ async fn reap_draining_writers_removes_empty_draining_writers() {
assert_eq!(current_writer_ids(&pool).await, vec![3]); assert_eq!(current_writer_ids(&pool).await, vec![3]);
} }
#[tokio::test]
async fn reap_draining_writers_does_not_block_on_stuck_writer_close_signal() {
let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs();
let (blocked_tx, blocked_rx) = mpsc::channel::<WriterCommand>(1);
assert!(
blocked_tx
.try_send(WriterCommand::Data(Bytes::from_static(b"stuck")))
.is_ok()
);
let blocked_rx_guard = tokio::spawn(async move {
let _hold_rx = blocked_rx;
tokio::time::sleep(Duration::from_secs(30)).await;
});
let blocked_writer_id = 90u64;
let blocked_writer = MeWriter {
id: blocked_writer_id,
addr: SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
4500 + blocked_writer_id as u16,
),
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
writer_dc: 2,
generation: 1,
contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())),
created_at: Instant::now() - Duration::from_secs(blocked_writer_id),
tx: blocked_tx.clone(),
cancel: CancellationToken::new(),
degraded: Arc::new(AtomicBool::new(false)),
rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)),
draining: Arc::new(AtomicBool::new(true)),
draining_started_at_epoch_secs: Arc::new(AtomicU64::new(
now_epoch_secs.saturating_sub(120),
)),
drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)),
allow_drain_fallback: Arc::new(AtomicBool::new(false)),
};
pool.writers.write().await.push(blocked_writer);
pool.registry
.register_writer(blocked_writer_id, blocked_tx)
.await;
pool.conn_count.fetch_add(1, Ordering::Relaxed);
insert_draining_writer(&pool, 91, now_epoch_secs.saturating_sub(110), 0, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
let reap_res = tokio::time::timeout(
Duration::from_millis(500),
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed),
)
.await;
blocked_rx_guard.abort();
assert!(reap_res.is_ok(), "reap should not block on close signal");
assert!(current_writer_ids(&pool).await.is_empty());
assert_eq!(pool.stats.get_me_writer_close_signal_drop_total(), 2);
assert_eq!(pool.stats.get_me_writer_close_signal_channel_full_total(), 1);
assert_eq!(pool.stats.get_me_draining_writers_reap_progress_total(), 2);
let activity = pool.registry.writer_activity_snapshot().await;
assert!(!activity.bound_clients_by_writer.contains_key(&blocked_writer_id));
assert!(!activity.bound_clients_by_writer.contains_key(&91));
let (probe_conn_id, _rx) = pool.registry.register().await;
assert!(
!pool.registry
.bind_writer(
probe_conn_id,
blocked_writer_id,
ConnMeta {
target_dc: 2,
client_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6400),
our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443),
proto_flags: 0,
},
)
.await
);
let _ = pool.registry.unregister(probe_conn_id).await;
}
#[tokio::test] #[tokio::test]
async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() { async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() {
let pool = make_pool(2).await; let pool = make_pool(2).await;
@ -247,17 +338,17 @@ async fn reap_draining_writers_deadline_force_close_applies_under_threshold() {
#[tokio::test] #[tokio::test]
async fn reap_draining_writers_limits_closes_per_health_tick() { async fn reap_draining_writers_limits_closes_per_health_tick() {
let pool = make_pool(128).await; let pool = make_pool(1).await;
let now_epoch_secs = MePool::now_epoch_secs(); let now_epoch_secs = MePool::now_epoch_secs();
let close_budget = health_drain_close_budget(); let close_budget = health_drain_close_budget();
let writer_total = close_budget.saturating_add(19); let writer_total = close_budget.saturating_add(20);
for writer_id in 1..=writer_total as u64 { for writer_id in 1..=writer_total as u64 {
insert_draining_writer( insert_draining_writer(
&pool, &pool,
writer_id, writer_id,
now_epoch_secs.saturating_sub(20), now_epoch_secs.saturating_sub(20),
1, 1,
now_epoch_secs.saturating_sub(1), 0,
) )
.await; .await;
} }
@ -280,8 +371,8 @@ async fn reap_draining_writers_backlog_drains_across_ticks() {
&pool, &pool,
writer_id, writer_id,
now_epoch_secs.saturating_sub(20), now_epoch_secs.saturating_sub(20),
1, 0,
now_epoch_secs.saturating_sub(1), 0,
) )
.await; .await;
} }
@ -309,7 +400,7 @@ async fn reap_draining_writers_threshold_backlog_converges_to_threshold() {
insert_draining_writer( insert_draining_writer(
&pool, &pool,
writer_id, writer_id,
now_epoch_secs.saturating_sub(200).saturating_add(writer_id), now_epoch_secs.saturating_sub(20),
1, 1,
0, 0,
) )
@ -345,27 +436,27 @@ async fn reap_draining_writers_threshold_zero_preserves_non_expired_non_empty_wr
#[tokio::test] #[tokio::test]
async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() { async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() {
let pool = make_pool(128).await; let pool = make_pool(1).await;
let now_epoch_secs = MePool::now_epoch_secs(); let now_epoch_secs = MePool::now_epoch_secs();
let close_budget = health_drain_close_budget(); let close_budget = health_drain_close_budget();
for writer_id in 1..=close_budget as u64 { for writer_id in 1..=close_budget.saturating_add(1) as u64 {
insert_draining_writer( insert_draining_writer(
&pool, &pool,
writer_id, writer_id,
now_epoch_secs.saturating_sub(20), now_epoch_secs.saturating_sub(20),
1, 1,
now_epoch_secs.saturating_sub(1), 0,
) )
.await; .await;
} }
let empty_writer_id = close_budget as u64 + 1; let empty_writer_id = close_budget.saturating_add(2) as u64;
insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await; insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await;
let mut warn_next_allowed = HashMap::new(); let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert_eq!(current_writer_ids(&pool).await, vec![empty_writer_id]); assert_eq!(current_writer_ids(&pool).await, vec![1, empty_writer_id]);
} }
#[tokio::test] #[tokio::test]
@ -487,7 +578,14 @@ async fn reap_draining_writers_soft_evicts_stuck_writer_with_per_writer_cap() {
.store(1, Ordering::Relaxed); .store(1, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs(); let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 77, now_epoch_secs.saturating_sub(240), 3, 0).await; insert_draining_writer(
&pool,
77,
now_epoch_secs.saturating_sub(240),
3,
now_epoch_secs.saturating_add(3_600),
)
.await;
let mut warn_next_allowed = HashMap::new(); let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new();
@ -511,7 +609,14 @@ async fn reap_draining_writers_soft_evict_respects_cooldown_per_writer() {
.store(60_000, Ordering::Relaxed); .store(60_000, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs(); let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 88, now_epoch_secs.saturating_sub(240), 3, 0).await; insert_draining_writer(
&pool,
88,
now_epoch_secs.saturating_sub(240),
3,
now_epoch_secs.saturating_add(3_600),
)
.await;
let mut warn_next_allowed = HashMap::new(); let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new();
@ -524,6 +629,21 @@ async fn reap_draining_writers_soft_evict_respects_cooldown_per_writer() {
assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1); assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1);
} }
#[tokio::test]
async fn reap_draining_writers_instadrain_removes_non_expired_writers_immediately() {
let pool = make_pool(0).await;
pool.me_instadrain.store(true, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 101, now_epoch_secs.saturating_sub(5), 1, 0).await;
insert_draining_writer(&pool, 102, now_epoch_secs.saturating_sub(4), 1, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(current_writer_ids(&pool).await.is_empty());
}
#[test] #[test]
fn general_config_default_drain_threshold_remains_enabled() { fn general_config_default_drain_threshold_remains_enabled() {
assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128); assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128);

View File

@ -30,7 +30,7 @@ mod health_adversarial_tests;
use bytes::Bytes; use bytes::Bytes;
pub use health::me_health_monitor; pub use health::{me_drain_timeout_enforcer, me_health_monitor};
#[allow(unused_imports)] #[allow(unused_imports)]
pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily}; pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily};
pub use pool::MePool; pub use pool::MePool;

View File

@ -171,6 +171,7 @@ pub struct MePool {
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>, pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>, pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>,
pub(super) me_pool_drain_ttl_secs: AtomicU64, pub(super) me_pool_drain_ttl_secs: AtomicU64,
pub(super) me_instadrain: AtomicBool,
pub(super) me_pool_drain_threshold: AtomicU64, pub(super) me_pool_drain_threshold: AtomicU64,
pub(super) me_pool_drain_soft_evict_enabled: AtomicBool, pub(super) me_pool_drain_soft_evict_enabled: AtomicBool,
pub(super) me_pool_drain_soft_evict_grace_secs: AtomicU64, pub(super) me_pool_drain_soft_evict_grace_secs: AtomicU64,
@ -279,6 +280,7 @@ impl MePool {
me_adaptive_floor_max_warm_writers_global: u32, me_adaptive_floor_max_warm_writers_global: u32,
hardswap: bool, hardswap: bool,
me_pool_drain_ttl_secs: u64, me_pool_drain_ttl_secs: u64,
me_instadrain: bool,
me_pool_drain_threshold: u64, me_pool_drain_threshold: u64,
me_pool_drain_soft_evict_enabled: bool, me_pool_drain_soft_evict_enabled: bool,
me_pool_drain_soft_evict_grace_secs: u64, me_pool_drain_soft_evict_grace_secs: u64,
@ -462,6 +464,7 @@ impl MePool {
endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())), endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())),
kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())), kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())),
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs), me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
me_instadrain: AtomicBool::new(me_instadrain),
me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold), me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold),
me_pool_drain_soft_evict_enabled: AtomicBool::new(me_pool_drain_soft_evict_enabled), me_pool_drain_soft_evict_enabled: AtomicBool::new(me_pool_drain_soft_evict_enabled),
me_pool_drain_soft_evict_grace_secs: AtomicU64::new(me_pool_drain_soft_evict_grace_secs), me_pool_drain_soft_evict_grace_secs: AtomicU64::new(me_pool_drain_soft_evict_grace_secs),
@ -524,6 +527,7 @@ impl MePool {
&self, &self,
hardswap: bool, hardswap: bool,
drain_ttl_secs: u64, drain_ttl_secs: u64,
instadrain: bool,
pool_drain_threshold: u64, pool_drain_threshold: u64,
pool_drain_soft_evict_enabled: bool, pool_drain_soft_evict_enabled: bool,
pool_drain_soft_evict_grace_secs: u64, pool_drain_soft_evict_grace_secs: u64,
@ -568,6 +572,7 @@ impl MePool {
self.hardswap.store(hardswap, Ordering::Relaxed); self.hardswap.store(hardswap, Ordering::Relaxed);
self.me_pool_drain_ttl_secs self.me_pool_drain_ttl_secs
.store(drain_ttl_secs, Ordering::Relaxed); .store(drain_ttl_secs, Ordering::Relaxed);
self.me_instadrain.store(instadrain, Ordering::Relaxed);
self.me_pool_drain_threshold self.me_pool_drain_threshold
.store(pool_drain_threshold, Ordering::Relaxed); .store(pool_drain_threshold, Ordering::Relaxed);
self.me_pool_drain_soft_evict_enabled self.me_pool_drain_soft_evict_enabled

View File

@ -126,6 +126,7 @@ pub(crate) struct MeApiRuntimeSnapshot {
pub me_reconnect_backoff_cap_ms: u64, pub me_reconnect_backoff_cap_ms: u64,
pub me_reconnect_fast_retry_count: u32, pub me_reconnect_fast_retry_count: u32,
pub me_pool_drain_ttl_secs: u64, pub me_pool_drain_ttl_secs: u64,
pub me_instadrain: bool,
pub me_pool_drain_soft_evict_enabled: bool, pub me_pool_drain_soft_evict_enabled: bool,
pub me_pool_drain_soft_evict_grace_secs: u64, pub me_pool_drain_soft_evict_grace_secs: u64,
pub me_pool_drain_soft_evict_per_writer: u8, pub me_pool_drain_soft_evict_per_writer: u8,
@ -583,6 +584,7 @@ impl MePool {
me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64, me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64,
me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count, me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count,
me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed), me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed),
me_instadrain: self.me_instadrain.load(Ordering::Relaxed),
me_pool_drain_soft_evict_enabled: self me_pool_drain_soft_evict_enabled: self
.me_pool_drain_soft_evict_enabled .me_pool_drain_soft_evict_enabled
.load(Ordering::Relaxed), .load(Ordering::Relaxed),

View File

@ -8,6 +8,7 @@ use bytes::Bytes;
use bytes::BytesMut; use bytes::BytesMut;
use rand::Rng; use rand::Rng;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
@ -491,11 +492,9 @@ impl MePool {
} }
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) { pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
let conns = self.remove_writer_only(writer_id).await; // Full client cleanup now happens inside `registry.writer_lost` to keep
for bound in conns { // writer reap/remove paths strictly non-blocking per connection.
let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await; let _ = self.remove_writer_only(writer_id).await;
let _ = self.registry.unregister(bound.conn_id).await;
}
} }
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> { async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> {
@ -525,6 +524,11 @@ impl MePool {
self.conn_count.fetch_sub(1, Ordering::Relaxed); self.conn_count.fetch_sub(1, Ordering::Relaxed);
} }
} }
// State invariant:
// - writer is removed from `self.writers` (pool visibility),
// - writer is removed from registry routing/binding maps via `writer_lost`.
// The close command below is only a best-effort accelerator for task shutdown.
// Cleanup progress must never depend on command-channel availability.
let conns = self.registry.writer_lost(writer_id).await; let conns = self.registry.writer_lost(writer_id).await;
{ {
let mut tracker = self.ping_tracker.lock().await; let mut tracker = self.ping_tracker.lock().await;
@ -532,7 +536,25 @@ impl MePool {
} }
self.rtt_stats.lock().await.remove(&writer_id); self.rtt_stats.lock().await.remove(&writer_id);
if let Some(tx) = close_tx { if let Some(tx) = close_tx {
let _ = tx.send(WriterCommand::Close).await; match tx.try_send(WriterCommand::Close) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
self.stats.increment_me_writer_close_signal_drop_total();
self.stats
.increment_me_writer_close_signal_channel_full_total();
debug!(
writer_id,
"Skipping close signal for removed writer: command channel is full"
);
}
Err(TrySendError::Closed(_)) => {
self.stats.increment_me_writer_close_signal_drop_total();
debug!(
writer_id,
"Skipping close signal for removed writer: command channel is closed"
);
}
}
} }
if trigger_refill if trigger_refill
&& let Some(addr) = removed_addr && let Some(addr) = removed_addr

View File

@ -8,6 +8,7 @@ use bytes::{Bytes, BytesMut};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::sync::{Mutex, mpsc}; use tokio::sync::{Mutex, mpsc};
use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn}; use tracing::{debug, trace, warn};
@ -173,12 +174,12 @@ pub(crate) async fn reader_loop(
} else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 { } else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
debug!(cid, "RPC_CLOSE_EXT from ME"); debug!(cid, "RPC_CLOSE_EXT from ME");
reg.route(cid, MeResponse::Close).await; let _ = reg.route_nowait(cid, MeResponse::Close).await;
reg.unregister(cid).await; reg.unregister(cid).await;
} else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 { } else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
debug!(cid, "RPC_CLOSE_CONN from ME"); debug!(cid, "RPC_CLOSE_CONN from ME");
reg.route(cid, MeResponse::Close).await; let _ = reg.route_nowait(cid, MeResponse::Close).await;
reg.unregister(cid).await; reg.unregister(cid).await;
} else if pt == RPC_PING_U32 && body.len() >= 8 { } else if pt == RPC_PING_U32 && body.len() >= 8 {
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap()); let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
@ -186,13 +187,15 @@ pub(crate) async fn reader_loop(
let mut pong = Vec::with_capacity(12); let mut pong = Vec::with_capacity(12);
pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes()); pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes());
pong.extend_from_slice(&ping_id.to_le_bytes()); pong.extend_from_slice(&ping_id.to_le_bytes());
if tx match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(pong))) {
.send(WriterCommand::DataAndFlush(Bytes::from(pong))) Ok(()) => {}
.await Err(TrySendError::Full(_)) => {
.is_err() debug!(ping_id, "PONG dropped: writer command channel is full");
{ }
warn!("PONG send failed"); Err(TrySendError::Closed(_)) => {
break; warn!("PONG send failed: writer channel closed");
break;
}
} }
} else if pt == RPC_PONG_U32 && body.len() >= 8 { } else if pt == RPC_PONG_U32 && body.len() >= 8 {
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap()); let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
@ -232,6 +235,13 @@ async fn send_close_conn(tx: &mpsc::Sender<WriterCommand>, conn_id: u64) {
let mut p = Vec::with_capacity(12); let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes()); p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes());
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
let _ = tx.send(WriterCommand::DataAndFlush(Bytes::from(p))).await; Ok(()) => {}
Err(TrySendError::Full(_)) => {
debug!(conn_id, "ME close_conn signal skipped: writer command channel is full");
}
Err(TrySendError::Closed(_)) => {
debug!(conn_id, "ME close_conn signal skipped: writer command channel is closed");
}
}
} }

View File

@ -169,6 +169,7 @@ impl ConnRegistry {
None None
} }
#[allow(dead_code)]
pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult { pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult {
let tx = { let tx = {
let inner = self.inner.read().await; let inner = self.inner.read().await;
@ -445,30 +446,38 @@ impl ConnRegistry {
} }
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> { pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
let mut inner = self.inner.write().await; let mut close_txs = Vec::<mpsc::Sender<MeResponse>>::new();
inner.writers.remove(&writer_id);
inner.last_meta_for_writer.remove(&writer_id);
inner.writer_idle_since_epoch_secs.remove(&writer_id);
let conns = inner
.conns_for_writer
.remove(&writer_id)
.unwrap_or_default()
.into_iter()
.collect::<Vec<_>>();
let mut out = Vec::new(); let mut out = Vec::new();
for conn_id in conns { {
if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) { let mut inner = self.inner.write().await;
continue; inner.writers.remove(&writer_id);
} inner.last_meta_for_writer.remove(&writer_id);
inner.writer_for_conn.remove(&conn_id); inner.writer_idle_since_epoch_secs.remove(&writer_id);
if let Some(m) = inner.meta.get(&conn_id) { let conns = inner
out.push(BoundConn { .conns_for_writer
conn_id, .remove(&writer_id)
meta: m.clone(), .unwrap_or_default()
}); .into_iter()
.collect::<Vec<_>>();
for conn_id in conns {
if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) {
continue;
}
inner.writer_for_conn.remove(&conn_id);
if let Some(client_tx) = inner.map.remove(&conn_id) {
close_txs.push(client_tx);
}
if let Some(meta) = inner.meta.remove(&conn_id) {
out.push(BoundConn { conn_id, meta });
}
} }
} }
for client_tx in close_txs {
let _ = client_tx.try_send(MeResponse::Close);
}
out out
} }
@ -491,6 +500,7 @@ impl ConnRegistry {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use super::ConnMeta; use super::ConnMeta;
use super::ConnRegistry; use super::ConnRegistry;
@ -663,6 +673,39 @@ mod tests {
assert!(registry.is_writer_empty(20).await); assert!(registry.is_writer_empty(20).await);
} }
#[tokio::test]
async fn writer_lost_removes_bound_conn_from_registry_and_signals_close() {
let registry = ConnRegistry::new();
let (conn_id, mut rx) = registry.register().await;
let (writer_tx, _writer_rx) = tokio::sync::mpsc::channel(8);
registry.register_writer(10, writer_tx).await;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
assert!(
registry
.bind_writer(
conn_id,
10,
ConnMeta {
target_dc: 2,
client_addr: addr,
our_addr: addr,
proto_flags: 0,
},
)
.await
);
let lost = registry.writer_lost(10).await;
assert_eq!(lost.len(), 1);
assert_eq!(lost[0].conn_id, conn_id);
assert!(registry.get_writer(conn_id).await.is_none());
assert!(registry.get_meta(conn_id).await.is_none());
assert_eq!(registry.unregister(conn_id).await, None);
let close = tokio::time::timeout(Duration::from_millis(50), rx.recv()).await;
assert!(matches!(close, Ok(Some(MeResponse::Close))));
}
#[tokio::test] #[tokio::test]
async fn bind_writer_rejects_unregistered_writer() { async fn bind_writer_rejects_unregistered_writer() {
let registry = ConnRegistry::new(); let registry = ConnRegistry::new();

View File

@ -643,13 +643,19 @@ impl MePool {
let mut p = Vec::with_capacity(12); let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes());
if w.tx match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
.send(WriterCommand::DataAndFlush(Bytes::from(p))) Ok(()) => {}
.await Err(TrySendError::Full(_)) => {
.is_err() debug!(
{ conn_id,
debug!("ME close write failed"); writer_id = w.writer_id,
self.remove_writer_and_close_clients(w.writer_id).await; "ME close skipped: writer command channel is full"
);
}
Err(TrySendError::Closed(_)) => {
debug!("ME close write failed");
self.remove_writer_and_close_clients(w.writer_id).await;
}
} }
} else { } else {
debug!(conn_id, "ME close skipped (writer missing)"); debug!(conn_id, "ME close skipped (writer missing)");
@ -666,8 +672,12 @@ impl MePool {
p.extend_from_slice(&conn_id.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes());
match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) { match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
Ok(()) => {} Ok(()) => {}
Err(TrySendError::Full(cmd)) => { Err(TrySendError::Full(_)) => {
let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await; debug!(
conn_id,
writer_id = w.writer_id,
"ME close_conn skipped: writer command channel is full"
);
} }
Err(TrySendError::Closed(_)) => { Err(TrySendError::Closed(_)) => {
debug!(conn_id, "ME close_conn skipped: writer channel closed"); debug!(conn_id, "ME close_conn skipped: writer channel closed");