Merge remote-tracking branch 'upstream/main' into test/main-into-flow-sec

# Conflicts:
#	Cargo.toml
#	src/api/model.rs
#	src/api/runtime_stats.rs
#	src/transport/middle_proxy/health.rs
#	src/transport/middle_proxy/health_regression_tests.rs
#	src/transport/middle_proxy/pool_status.rs
This commit is contained in:
David Osipov 2026-03-19 23:48:40 +04:00
commit 7416829e89
No known key found for this signature in database
GPG Key ID: 0E55C4A47454E82E
21 changed files with 396 additions and 294 deletions

2
Cargo.lock generated
View File

@ -2131,7 +2131,7 @@ dependencies = [
[[package]] [[package]]
name = "telemt" name = "telemt"
version = "3.3.20" version = "3.3.25"
dependencies = [ dependencies = [
"aes", "aes",
"anyhow", "anyhow",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "3.3.20" version = "3.3.25"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -38,6 +38,7 @@ USER telemt
EXPOSE 443 EXPOSE 443
EXPOSE 9090 EXPOSE 9090
EXPOSE 9091
ENTRYPOINT ["/app/telemt"] ENTRYPOINT ["/app/telemt"]
CMD ["config.toml"] CMD ["config.toml"]

View File

@ -7,6 +7,7 @@ services:
ports: ports:
- "443:443" - "443:443"
- "127.0.0.1:9090:9090" - "127.0.0.1:9090:9090"
- "127.0.0.1:9091:9091"
# Allow caching 'proxy-secret' in read-only container # Allow caching 'proxy-secret' in read-only container
working_dir: /run/telemt working_dir: /run/telemt
volumes: volumes:

View File

@ -181,6 +181,8 @@ docker compose down
docker build -t telemt:local . docker build -t telemt:local .
docker run --name telemt --restart unless-stopped \ docker run --name telemt --restart unless-stopped \
-p 443:443 \ -p 443:443 \
-p 9090:9090 \
-p 9091:9091 \
-e RUST_LOG=info \ -e RUST_LOG=info \
-v "$PWD/config.toml:/app/config.toml:ro" \ -v "$PWD/config.toml:/app/config.toml:ro" \
--read-only \ --read-only \

View File

@ -183,6 +183,8 @@ docker compose down
docker build -t telemt:local . docker build -t telemt:local .
docker run --name telemt --restart unless-stopped \ docker run --name telemt --restart unless-stopped \
-p 443:443 \ -p 443:443 \
-p 9090:9090 \
-p 9091:9091 \
-e RUST_LOG=info \ -e RUST_LOG=info \
-v "$PWD/config.toml:/app/config.toml:ro" \ -v "$PWD/config.toml:/app/config.toml:ro" \
--read-only \ --read-only \

View File

@ -1,154 +1,190 @@
#!/bin/sh #!/bin/sh
set -eu set -eu
# --- Global Configurations ---
REPO="${REPO:-telemt/telemt}" REPO="${REPO:-telemt/telemt}"
BIN_NAME="${BIN_NAME:-telemt}" BIN_NAME="${BIN_NAME:-telemt}"
INSTALL_DIR="${INSTALL_DIR:-/bin}" INSTALL_DIR="${INSTALL_DIR:-/bin}"
CONFIG_DIR="${CONFIG_DIR:-/etc/telemt}" CONFIG_DIR="${CONFIG_DIR:-/etc/telemt}"
CONFIG_FILE="${CONFIG_FILE:-${CONFIG_DIR}/telemt.toml}" CONFIG_FILE="${CONFIG_FILE:-${CONFIG_DIR}/telemt.toml}"
WORK_DIR="${WORK_DIR:-/opt/telemt}" WORK_DIR="${WORK_DIR:-/opt/telemt}"
TLS_DOMAIN="${TLS_DOMAIN:-petrovich.ru}"
SERVICE_NAME="telemt" SERVICE_NAME="telemt"
TEMP_DIR="" TEMP_DIR=""
SUDO="" SUDO=""
CONFIG_PARENT_DIR=""
SERVICE_START_FAILED=0
# --- Argument Parsing ---
ACTION="install" ACTION="install"
TARGET_VERSION="${VERSION:-latest}" TARGET_VERSION="${VERSION:-latest}"
while [ $# -gt 0 ]; do while [ $# -gt 0 ]; do
case "$1" in case "$1" in
-h|--help) -h|--help) ACTION="help"; shift ;;
ACTION="help"
shift
;;
uninstall|--uninstall) uninstall|--uninstall)
[ "$ACTION" != "purge" ] && ACTION="uninstall" if [ "$ACTION" != "purge" ]; then ACTION="uninstall"; fi
shift shift ;;
;; purge|--purge) ACTION="purge"; shift ;;
--purge) install|--install) ACTION="install"; shift ;;
ACTION="purge" -*) printf '[ERROR] Unknown option: %s\n' "$1" >&2; exit 1 ;;
shift
;;
install|--install)
ACTION="install"
shift
;;
-*)
printf '[ERROR] Unknown option: %s\n' "$1" >&2
exit 1
;;
*) *)
if [ "$ACTION" = "install" ]; then if [ "$ACTION" = "install" ]; then TARGET_VERSION="$1"
TARGET_VERSION="$1" else printf '[WARNING] Ignoring extra argument: %s\n' "$1" >&2; fi
fi shift ;;
shift
;;
esac esac
done done
# --- Core Functions --- say() {
say() { printf '[INFO] %s\n' "$*"; } if [ "$#" -eq 0 ] || [ -z "${1:-}" ]; then
printf '\n'
else
printf '[INFO] %s\n' "$*"
fi
}
die() { printf '[ERROR] %s\n' "$*" >&2; exit 1; } die() { printf '[ERROR] %s\n' "$*" >&2; exit 1; }
write_root() { $SUDO sh -c 'cat > "$1"' _ "$1"; }
cleanup() { cleanup() {
if [ -n "${TEMP_DIR:-}" ] && [ -d "$TEMP_DIR" ]; then if [ -n "${TEMP_DIR:-}" ] && [ -d "$TEMP_DIR" ]; then
rm -rf -- "$TEMP_DIR" rm -rf -- "$TEMP_DIR"
fi fi
} }
trap cleanup EXIT INT TERM trap cleanup EXIT INT TERM
show_help() { show_help() {
say "Usage: $0 [version | install | uninstall | --purge | --help]" say "Usage: $0 [ <version> | install | uninstall | purge | --help ]"
say " version Install specific version (e.g. 1.0.0, default: latest)" say " <version> Install specific version (e.g. 3.3.15, default: latest)"
say " uninstall Remove the binary and service (keeps config)" say " install Install the latest version"
say " --purge Remove everything including configuration" say " uninstall Remove the binary and service (keeps config and user)"
say " purge Remove everything including configuration, data, and user"
exit 0 exit 0
} }
user_exists() { check_os_entity() {
if command -v getent >/dev/null 2>&1; then if command -v getent >/dev/null 2>&1; then getent "$1" "$2" >/dev/null 2>&1
getent passwd "$1" >/dev/null 2>&1 else grep -q "^${2}:" "/etc/$1" 2>/dev/null; fi
}
normalize_path() {
printf '%s\n' "$1" | tr -s '/' | sed 's|/$||; s|^$|/|'
}
get_realpath() {
path_in="$1"
case "$path_in" in /*) ;; *) path_in="$(pwd)/$path_in" ;; esac
if command -v realpath >/dev/null 2>&1; then
if realpath_out="$(realpath -m "$path_in" 2>/dev/null)"; then
printf '%s\n' "$realpath_out"
return
fi
fi
if command -v readlink >/dev/null 2>&1; then
resolved_path="$(readlink -f "$path_in" 2>/dev/null || true)"
if [ -n "$resolved_path" ]; then
printf '%s\n' "$resolved_path"
return
fi
fi
d="${path_in%/*}"; b="${path_in##*/}"
if [ -z "$d" ]; then d="/"; fi
if [ "$d" = "$path_in" ]; then d="/"; b="$path_in"; fi
if [ -d "$d" ]; then
abs_d="$(cd "$d" >/dev/null 2>&1 && pwd || true)"
if [ -n "$abs_d" ]; then
if [ "$b" = "." ] || [ -z "$b" ]; then printf '%s\n' "$abs_d"
elif [ "$abs_d" = "/" ]; then printf '/%s\n' "$b"
else printf '%s/%s\n' "$abs_d" "$b"; fi
else
normalize_path "$path_in"
fi
else else
grep -q "^${1}:" /etc/passwd 2>/dev/null normalize_path "$path_in"
fi fi
} }
group_exists() { get_svc_mgr() {
if command -v getent >/dev/null 2>&1; then if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then echo "systemd"
getent group "$1" >/dev/null 2>&1 elif command -v rc-service >/dev/null 2>&1; then echo "openrc"
else else echo "none"; fi
grep -q "^${1}:" /etc/group 2>/dev/null
fi
} }
verify_common() { verify_common() {
[ -z "$BIN_NAME" ] && die "BIN_NAME cannot be empty." [ -n "$BIN_NAME" ] || die "BIN_NAME cannot be empty."
[ -z "$INSTALL_DIR" ] && die "INSTALL_DIR cannot be empty." [ -n "$INSTALL_DIR" ] || die "INSTALL_DIR cannot be empty."
[ -z "$CONFIG_DIR" ] && die "CONFIG_DIR cannot be empty." [ -n "$CONFIG_DIR" ] || die "CONFIG_DIR cannot be empty."
[ -n "$CONFIG_FILE" ] || die "CONFIG_FILE cannot be empty."
case "${INSTALL_DIR}${CONFIG_DIR}${WORK_DIR}${CONFIG_FILE}" in
*[!a-zA-Z0-9_./-]*) die "Invalid characters in paths. Only alphanumeric, _, ., -, and / allowed." ;;
esac
case "$TARGET_VERSION" in *[!a-zA-Z0-9_.-]*) die "Invalid characters in version." ;; esac
case "$BIN_NAME" in *[!a-zA-Z0-9_-]*) die "Invalid characters in BIN_NAME." ;; esac
INSTALL_DIR="$(get_realpath "$INSTALL_DIR")"
CONFIG_DIR="$(get_realpath "$CONFIG_DIR")"
WORK_DIR="$(get_realpath "$WORK_DIR")"
CONFIG_FILE="$(get_realpath "$CONFIG_FILE")"
CONFIG_PARENT_DIR="${CONFIG_FILE%/*}"
if [ -z "$CONFIG_PARENT_DIR" ]; then CONFIG_PARENT_DIR="/"; fi
if [ "$CONFIG_PARENT_DIR" = "$CONFIG_FILE" ]; then CONFIG_PARENT_DIR="."; fi
if [ "$(id -u)" -eq 0 ]; then if [ "$(id -u)" -eq 0 ]; then
SUDO="" SUDO=""
else else
if ! command -v sudo >/dev/null 2>&1; then command -v sudo >/dev/null 2>&1 || die "This script requires root or sudo. Neither found."
die "This script requires root or sudo. Neither found."
fi
SUDO="sudo" SUDO="sudo"
say "sudo is available. Caching credentials..." if ! sudo -n true 2>/dev/null; then
if ! sudo -v; then if ! [ -t 0 ]; then
die "Failed to cache sudo credentials" die "sudo requires a password, but no TTY detected. Aborting to prevent hang."
fi
fi fi
fi fi
case "${INSTALL_DIR}${CONFIG_DIR}${WORK_DIR}" in if [ -n "$SUDO" ]; then
*[!a-zA-Z0-9_./-]*) if $SUDO sh -c '[ -d "$1" ]' _ "$CONFIG_FILE"; then
die "Invalid characters in path variables. Only alphanumeric, _, ., -, and / are allowed." die "Safety check failed: CONFIG_FILE '$CONFIG_FILE' is a directory."
;; fi
esac elif [ -d "$CONFIG_FILE" ]; then
die "Safety check failed: CONFIG_FILE '$CONFIG_FILE' is a directory."
case "$BIN_NAME" in fi
*[!a-zA-Z0-9_-]*) die "Invalid characters in BIN_NAME: $BIN_NAME" ;;
esac
for path in "$CONFIG_DIR" "$WORK_DIR"; do
check_path="$path"
while [ "$check_path" != "/" ] && [ "${check_path%"/"}" != "$check_path" ]; do
check_path="${check_path%"/"}"
done
[ -z "$check_path" ] && check_path="/"
for path in "$CONFIG_DIR" "$CONFIG_PARENT_DIR" "$WORK_DIR"; do
check_path="$(get_realpath "$path")"
case "$check_path" in case "$check_path" in
/|/bin|/sbin|/usr|/usr/bin|/usr/local|/etc|/opt|/var|/home|/root|/tmp) /|/bin|/sbin|/usr|/usr/bin|/usr/sbin|/usr/local|/usr/local/bin|/usr/local/sbin|/usr/local/etc|/usr/local/share|/etc|/var|/var/lib|/var/log|/var/run|/home|/root|/tmp|/lib|/lib64|/opt|/run|/boot|/dev|/sys|/proc)
die "Safety check failed: '$path' is a critical system directory." die "Safety check failed: '$path' (resolved to '$check_path') is a critical system directory." ;;
;;
esac esac
done done
for cmd in uname grep find rm chown chmod mv head mktemp; do check_install_dir="$(get_realpath "$INSTALL_DIR")"
case "$check_install_dir" in
/|/etc|/var|/home|/root|/tmp|/usr|/usr/local|/opt|/boot|/dev|/sys|/proc|/run)
die "Safety check failed: INSTALL_DIR '$INSTALL_DIR' is a critical system directory." ;;
esac
for cmd in id uname grep find rm chown chmod mv mktemp mkdir tr dd sed ps head sleep cat tar gzip rmdir; do
command -v "$cmd" >/dev/null 2>&1 || die "Required command not found: $cmd" command -v "$cmd" >/dev/null 2>&1 || die "Required command not found: $cmd"
done done
} }
verify_install_deps() { verify_install_deps() {
if ! command -v curl >/dev/null 2>&1 && ! command -v wget >/dev/null 2>&1; then command -v curl >/dev/null 2>&1 || command -v wget >/dev/null 2>&1 || die "Neither curl nor wget is installed."
die "Neither curl nor wget is installed."
fi
command -v tar >/dev/null 2>&1 || die "Required command not found: tar"
command -v gzip >/dev/null 2>&1 || die "Required command not found: gzip"
command -v cp >/dev/null 2>&1 || command -v install >/dev/null 2>&1 || die "Need cp or install" command -v cp >/dev/null 2>&1 || command -v install >/dev/null 2>&1 || die "Need cp or install"
if ! command -v setcap >/dev/null 2>&1; then if ! command -v setcap >/dev/null 2>&1; then
say "setcap is missing. Installing required capability tools..."
if command -v apk >/dev/null 2>&1; then if command -v apk >/dev/null 2>&1; then
$SUDO apk add --no-cache libcap || die "Failed to install libcap" $SUDO apk add --no-cache libcap-utils >/dev/null 2>&1 || $SUDO apk add --no-cache libcap >/dev/null 2>&1 || true
elif command -v apt-get >/dev/null 2>&1; then elif command -v apt-get >/dev/null 2>&1; then
$SUDO apt-get update -qq && $SUDO apt-get install -y -qq libcap2-bin || die "Failed to install libcap2-bin" $SUDO apt-get update -q >/dev/null 2>&1 || true
elif command -v dnf >/dev/null 2>&1 || command -v yum >/dev/null 2>&1; then $SUDO apt-get install -y -q libcap2-bin >/dev/null 2>&1 || true
$SUDO ${YUM_CMD:-yum} install -y -q libcap || die "Failed to install libcap" elif command -v dnf >/dev/null 2>&1; then $SUDO dnf install -y -q libcap >/dev/null 2>&1 || true
else elif command -v yum >/dev/null 2>&1; then $SUDO yum install -y -q libcap >/dev/null 2>&1 || true
die "Cannot install 'setcap'. Package manager not found. Please install libcap manually."
fi fi
fi fi
} }
@ -163,122 +199,96 @@ detect_arch() {
} }
detect_libc() { detect_libc() {
if command -v ldd >/dev/null 2>&1 && ldd --version 2>&1 | grep -qi musl; then
echo "musl"; return 0
fi
if grep -q '^ID=alpine' /etc/os-release 2>/dev/null || grep -q '^ID="alpine"' /etc/os-release 2>/dev/null; then
echo "musl"; return 0
fi
for f in /lib/ld-musl-*.so.* /lib64/ld-musl-*.so.*; do for f in /lib/ld-musl-*.so.* /lib64/ld-musl-*.so.*; do
if [ -e "$f" ]; then if [ -e "$f" ]; then echo "musl"; return 0; fi
echo "musl"; return 0
fi
done done
if grep -qE '^ID="?alpine"?' /etc/os-release 2>/dev/null; then echo "musl"; return 0; fi
if command -v ldd >/dev/null 2>&1 && (ldd --version 2>&1 || true) | grep -qi musl; then echo "musl"; return 0; fi
echo "gnu" echo "gnu"
} }
fetch_file() { fetch_file() {
fetch_url="$1" if command -v curl >/dev/null 2>&1; then curl -fsSL "$1" -o "$2"
fetch_out="$2" else wget -q -O "$2" "$1"; fi
if command -v curl >/dev/null 2>&1; then
curl -fsSL "$fetch_url" -o "$fetch_out" || return 1
elif command -v wget >/dev/null 2>&1; then
wget -qO "$fetch_out" "$fetch_url" || return 1
else
die "curl or wget required"
fi
} }
ensure_user_group() { ensure_user_group() {
nologin_bin="/bin/false" nologin_bin="$(command -v nologin 2>/dev/null || command -v false 2>/dev/null || echo /bin/false)"
cmd_nologin="$(command -v nologin 2>/dev/null || true)" if ! check_os_entity group telemt; then
if [ -n "$cmd_nologin" ] && [ -x "$cmd_nologin" ]; then if command -v groupadd >/dev/null 2>&1; then $SUDO groupadd -r telemt
nologin_bin="$cmd_nologin" elif command -v addgroup >/dev/null 2>&1; then $SUDO addgroup -S telemt
else else die "Cannot create group"; fi
for bin in /sbin/nologin /usr/sbin/nologin; do
if [ -x "$bin" ]; then
nologin_bin="$bin"
break
fi
done
fi fi
if ! group_exists telemt; then if ! check_os_entity passwd telemt; then
if command -v groupadd >/dev/null 2>&1; then
$SUDO groupadd -r telemt || die "Failed to create group via groupadd"
elif command -v addgroup >/dev/null 2>&1; then
$SUDO addgroup -S telemt || die "Failed to create group via addgroup"
else
die "Cannot create group: neither groupadd nor addgroup found"
fi
fi
if ! user_exists telemt; then
if command -v useradd >/dev/null 2>&1; then if command -v useradd >/dev/null 2>&1; then
$SUDO useradd -r -g telemt -d "$WORK_DIR" -s "$nologin_bin" -c "Telemt Proxy" telemt || die "Failed to create user via useradd" $SUDO useradd -r -g telemt -d "$WORK_DIR" -s "$nologin_bin" -c "Telemt Proxy" telemt
elif command -v adduser >/dev/null 2>&1; then elif command -v adduser >/dev/null 2>&1; then
$SUDO adduser -S -D -H -h "$WORK_DIR" -s "$nologin_bin" -G telemt telemt || die "Failed to create user via adduser" if adduser --help 2>&1 | grep -q -- '-S'; then
else $SUDO adduser -S -D -H -h "$WORK_DIR" -s "$nologin_bin" -G telemt telemt
die "Cannot create user: neither useradd nor adduser found" else
fi $SUDO adduser --system --home "$WORK_DIR" --shell "$nologin_bin" --no-create-home --ingroup telemt --disabled-password telemt
fi
else die "Cannot create user"; fi
fi fi
} }
setup_dirs() { setup_dirs() {
say "Setting up directories..." $SUDO mkdir -p "$WORK_DIR" "$CONFIG_DIR" "$CONFIG_PARENT_DIR" || die "Failed to create directories"
$SUDO mkdir -p "$WORK_DIR" "$CONFIG_DIR" || die "Failed to create directories"
$SUDO chown telemt:telemt "$WORK_DIR" || die "Failed to set owner on WORK_DIR" $SUDO chown telemt:telemt "$WORK_DIR" && $SUDO chmod 750 "$WORK_DIR"
$SUDO chmod 750 "$WORK_DIR" || die "Failed to set permissions on WORK_DIR" $SUDO chown root:telemt "$CONFIG_DIR" && $SUDO chmod 750 "$CONFIG_DIR"
if [ "$CONFIG_PARENT_DIR" != "$CONFIG_DIR" ] && [ "$CONFIG_PARENT_DIR" != "." ] && [ "$CONFIG_PARENT_DIR" != "/" ]; then
$SUDO chown root:telemt "$CONFIG_PARENT_DIR" && $SUDO chmod 750 "$CONFIG_PARENT_DIR"
fi
} }
stop_service() { stop_service() {
say "Stopping service if running..." svc="$(get_svc_mgr)"
if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then if [ "$svc" = "systemd" ] && systemctl is-active --quiet "$SERVICE_NAME" 2>/dev/null; then
$SUDO systemctl stop "$SERVICE_NAME" 2>/dev/null || true $SUDO systemctl stop "$SERVICE_NAME" 2>/dev/null || true
elif command -v rc-service >/dev/null 2>&1; then elif [ "$svc" = "openrc" ] && rc-service "$SERVICE_NAME" status >/dev/null 2>&1; then
$SUDO rc-service "$SERVICE_NAME" stop 2>/dev/null || true $SUDO rc-service "$SERVICE_NAME" stop 2>/dev/null || true
fi fi
} }
install_binary() { install_binary() {
bin_src="$1" bin_src="$1"; bin_dst="$2"
bin_dst="$2" if [ -e "$INSTALL_DIR" ] && [ ! -d "$INSTALL_DIR" ]; then
die "'$INSTALL_DIR' is not a directory."
fi
$SUDO mkdir -p "$INSTALL_DIR" || die "Failed to create install directory" $SUDO mkdir -p "$INSTALL_DIR" || die "Failed to create install directory"
if command -v install >/dev/null 2>&1; then if command -v install >/dev/null 2>&1; then
$SUDO install -m 0755 "$bin_src" "$bin_dst" || die "Failed to install binary" $SUDO install -m 0755 "$bin_src" "$bin_dst" || die "Failed to install binary"
else else
$SUDO rm -f "$bin_dst" $SUDO rm -f "$bin_dst" 2>/dev/null || true
$SUDO cp "$bin_src" "$bin_dst" || die "Failed to copy binary" $SUDO cp "$bin_src" "$bin_dst" && $SUDO chmod 0755 "$bin_dst" || die "Failed to copy binary"
$SUDO chmod 0755 "$bin_dst" || die "Failed to set permissions"
fi fi
if [ ! -x "$bin_dst" ]; then $SUDO sh -c '[ -x "$1" ]' _ "$bin_dst" || die "Binary not executable: $bin_dst"
die "Failed to install binary or it is not executable: $bin_dst"
fi
say "Granting network bind capabilities to bind port 443..." if command -v setcap >/dev/null 2>&1; then
if ! $SUDO setcap cap_net_bind_service=+ep "$bin_dst" 2>/dev/null; then $SUDO setcap cap_net_bind_service=+ep "$bin_dst" 2>/dev/null || true
say "[WARNING] Failed to apply setcap. The service will NOT be able to open port 443!"
say "[WARNING] This usually happens inside unprivileged Docker/LXC containers."
fi fi
} }
generate_secret() { generate_secret() {
if command -v openssl >/dev/null 2>&1; then secret="$(command -v openssl >/dev/null 2>&1 && openssl rand -hex 16 2>/dev/null || true)"
secret="$(openssl rand -hex 16 2>/dev/null)" && [ -n "$secret" ] && { echo "$secret"; return 0; } if [ -z "$secret" ] || [ "${#secret}" -ne 32 ]; then
if command -v od >/dev/null 2>&1; then secret="$(dd if=/dev/urandom bs=16 count=1 2>/dev/null | od -An -tx1 | tr -d ' \n')"
elif command -v hexdump >/dev/null 2>&1; then secret="$(dd if=/dev/urandom bs=16 count=1 2>/dev/null | hexdump -e '1/1 "%02x"')"
elif command -v xxd >/dev/null 2>&1; then secret="$(dd if=/dev/urandom bs=16 count=1 2>/dev/null | xxd -p | tr -d '\n')"
fi
fi fi
if command -v xxd >/dev/null 2>&1; then if [ "${#secret}" -eq 32 ]; then echo "$secret"; else return 1; fi
secret="$(dd if=/dev/urandom bs=1 count=16 2>/dev/null | xxd -p | tr -d '\n')" && [ -n "$secret" ] && { echo "$secret"; return 0; }
fi
secret="$(dd if=/dev/urandom bs=1 count=16 2>/dev/null | od -An -tx1 | tr -d ' \n')" && [ -n "$secret" ] && { echo "$secret"; return 0; }
return 1
} }
generate_config_content() { generate_config_content() {
escaped_tls_domain="$(printf '%s\n' "$TLS_DOMAIN" | tr -d '[:cntrl:]' | sed 's/\\/\\\\/g; s/"/\\"/g')"
cat <<EOF cat <<EOF
[general] [general]
use_middle_proxy = false use_middle_proxy = false
@ -297,7 +307,7 @@ listen = "127.0.0.1:9091"
whitelist = ["127.0.0.1/32"] whitelist = ["127.0.0.1/32"]
[censorship] [censorship]
tls_domain = "petrovich.ru" tls_domain = "${escaped_tls_domain}"
[access.users] [access.users]
hello = "$1" hello = "$1"
@ -305,44 +315,38 @@ EOF
} }
install_config() { install_config() {
config_exists=0
if [ -n "$SUDO" ]; then if [ -n "$SUDO" ]; then
$SUDO sh -c "[ -f '$CONFIG_FILE' ]" 2>/dev/null && config_exists=1 || true if $SUDO sh -c '[ -f "$1" ]' _ "$CONFIG_FILE"; then
else say " -> Config already exists at $CONFIG_FILE. Skipping creation."
[ -f "$CONFIG_FILE" ] && config_exists=1 || true return 0
fi fi
elif [ -f "$CONFIG_FILE" ]; then
if [ "$config_exists" -eq 1 ]; then say " -> Config already exists at $CONFIG_FILE. Skipping creation."
say "Config already exists, skipping generation."
return 0 return 0
fi fi
toml_secret="$(generate_secret)" || die "Failed to generate secret" toml_secret="$(generate_secret)" || die "Failed to generate secret."
say "Creating config at $CONFIG_FILE..."
tmp_conf="$(mktemp "${TEMP_DIR:-/tmp}/telemt_conf.XXXXXX")" || die "Failed to create temp config" generate_config_content "$toml_secret" | write_root "$CONFIG_FILE" || die "Failed to install config"
generate_config_content "$toml_secret" > "$tmp_conf" || die "Failed to write temp config" $SUDO chown root:telemt "$CONFIG_FILE" && $SUDO chmod 640 "$CONFIG_FILE"
$SUDO mv "$tmp_conf" "$CONFIG_FILE" || die "Failed to install config file" say " -> Config created successfully."
$SUDO chown root:telemt "$CONFIG_FILE" || die "Failed to set owner" say " -> Generated secret for default user 'hello': $toml_secret"
$SUDO chmod 640 "$CONFIG_FILE" || die "Failed to set config permissions"
say "Secret for user 'hello': $toml_secret"
} }
generate_systemd_content() { generate_systemd_content() {
cat <<EOF cat <<EOF
[Unit] [Unit]
Description=Telemt Proxy Service Description=Telemt
After=network-online.target After=network-online.target
Wants=network-online.target
[Service] [Service]
Type=simple Type=simple
User=telemt User=telemt
Group=telemt Group=telemt
WorkingDirectory=$WORK_DIR WorkingDirectory=$WORK_DIR
ExecStart=${INSTALL_DIR}/${BIN_NAME} ${CONFIG_FILE} ExecStart="${INSTALL_DIR}/${BIN_NAME}" "${CONFIG_FILE}"
Restart=on-failure Restart=on-failure
LimitNOFILE=65536 LimitNOFILE=65536
AmbientCapabilities=CAP_NET_BIND_SERVICE AmbientCapabilities=CAP_NET_BIND_SERVICE
@ -370,156 +374,183 @@ EOF
} }
install_service() { install_service() {
if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then svc="$(get_svc_mgr)"
say "Installing systemd service..." if [ "$svc" = "systemd" ]; then
tmp_svc="$(mktemp "${TEMP_DIR:-/tmp}/${SERVICE_NAME}.service.XXXXXX")" || die "Failed to create temp service" generate_systemd_content | write_root "/etc/systemd/system/${SERVICE_NAME}.service"
generate_systemd_content > "$tmp_svc" || die "Failed to generate service content" $SUDO chown root:root "/etc/systemd/system/${SERVICE_NAME}.service" && $SUDO chmod 644 "/etc/systemd/system/${SERVICE_NAME}.service"
$SUDO mv "$tmp_svc" "/etc/systemd/system/${SERVICE_NAME}.service" || die "Failed to move service file" $SUDO systemctl daemon-reload || true
$SUDO chown root:root "/etc/systemd/system/${SERVICE_NAME}.service" $SUDO systemctl enable "$SERVICE_NAME" || true
$SUDO chmod 644 "/etc/systemd/system/${SERVICE_NAME}.service"
if ! $SUDO systemctl start "$SERVICE_NAME"; then
say "[WARNING] Failed to start service"
SERVICE_START_FAILED=1
fi
elif [ "$svc" = "openrc" ]; then
generate_openrc_content | write_root "/etc/init.d/${SERVICE_NAME}"
$SUDO chown root:root "/etc/init.d/${SERVICE_NAME}" && $SUDO chmod 0755 "/etc/init.d/${SERVICE_NAME}"
$SUDO systemctl daemon-reload || die "Failed to reload systemd" $SUDO rc-update add "$SERVICE_NAME" default 2>/dev/null || true
$SUDO systemctl enable "$SERVICE_NAME" || die "Failed to enable service"
$SUDO systemctl start "$SERVICE_NAME" || die "Failed to start service" if ! $SUDO rc-service "$SERVICE_NAME" start 2>/dev/null; then
say "[WARNING] Failed to start service"
elif command -v rc-update >/dev/null 2>&1; then SERVICE_START_FAILED=1
say "Installing OpenRC service..." fi
tmp_svc="$(mktemp "${TEMP_DIR:-/tmp}/${SERVICE_NAME}.init.XXXXXX")" || die "Failed to create temp file"
generate_openrc_content > "$tmp_svc" || die "Failed to generate init content"
$SUDO mv "$tmp_svc" "/etc/init.d/${SERVICE_NAME}" || die "Failed to move service file"
$SUDO chown root:root "/etc/init.d/${SERVICE_NAME}"
$SUDO chmod 0755 "/etc/init.d/${SERVICE_NAME}"
$SUDO rc-update add "$SERVICE_NAME" default 2>/dev/null || die "Failed to register service"
$SUDO rc-service "$SERVICE_NAME" start 2>/dev/null || die "Failed to start OpenRC service"
else else
say "No service manager found. You can start it manually with:" cmd="\"${INSTALL_DIR}/${BIN_NAME}\" \"${CONFIG_FILE}\""
if [ -n "$SUDO" ]; then if [ -n "$SUDO" ]; then
say " sudo -u telemt ${INSTALL_DIR}/${BIN_NAME} ${CONFIG_FILE}" say " -> Service manager not found. Start manually: sudo -u telemt $cmd"
else else
say " su -s /bin/sh telemt -c '${INSTALL_DIR}/${BIN_NAME} ${CONFIG_FILE}'" say " -> Service manager not found. Start manually: su -s /bin/sh telemt -c '$cmd'"
fi fi
fi fi
} }
kill_user_procs() { kill_user_procs() {
say "Ensuring $BIN_NAME processes are killed..." if command -v pkill >/dev/null 2>&1; then
$SUDO pkill -u telemt "$BIN_NAME" 2>/dev/null || true
if pkill_cmd="$(command -v pkill 2>/dev/null)"; then
$SUDO "$pkill_cmd" -u telemt "$BIN_NAME" 2>/dev/null || true
sleep 1 sleep 1
$SUDO "$pkill_cmd" -9 -u telemt "$BIN_NAME" 2>/dev/null || true $SUDO pkill -9 -u telemt "$BIN_NAME" 2>/dev/null || true
elif killall_cmd="$(command -v killall 2>/dev/null)"; then else
$SUDO "$killall_cmd" "$BIN_NAME" 2>/dev/null || true if command -v pgrep >/dev/null 2>&1; then
sleep 1 pids="$(pgrep -u telemt 2>/dev/null || true)"
$SUDO "$killall_cmd" -9 "$BIN_NAME" 2>/dev/null || true else
pids="$(ps -u telemt -o pid= 2>/dev/null || true)"
fi
if [ -n "$pids" ]; then
for pid in $pids; do
case "$pid" in ''|*[!0-9]*) continue ;; *) $SUDO kill "$pid" 2>/dev/null || true ;; esac
done
sleep 1
for pid in $pids; do
case "$pid" in ''|*[!0-9]*) continue ;; *) $SUDO kill -9 "$pid" 2>/dev/null || true ;; esac
done
fi
fi fi
} }
uninstall() { uninstall() {
purge_data=0 say "Starting uninstallation of $BIN_NAME..."
[ "$ACTION" = "purge" ] && purge_data=1
say "Uninstalling $BIN_NAME..." say ">>> Stage 1: Stopping services"
stop_service stop_service
if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then say ">>> Stage 2: Removing service configuration"
svc="$(get_svc_mgr)"
if [ "$svc" = "systemd" ]; then
$SUDO systemctl disable "$SERVICE_NAME" 2>/dev/null || true $SUDO systemctl disable "$SERVICE_NAME" 2>/dev/null || true
$SUDO rm -f "/etc/systemd/system/${SERVICE_NAME}.service" $SUDO rm -f "/etc/systemd/system/${SERVICE_NAME}.service"
$SUDO systemctl daemon-reload || true $SUDO systemctl daemon-reload 2>/dev/null || true
elif command -v rc-update >/dev/null 2>&1; then elif [ "$svc" = "openrc" ]; then
$SUDO rc-update del "$SERVICE_NAME" 2>/dev/null || true $SUDO rc-update del "$SERVICE_NAME" 2>/dev/null || true
$SUDO rm -f "/etc/init.d/${SERVICE_NAME}" $SUDO rm -f "/etc/init.d/${SERVICE_NAME}"
fi fi
say ">>> Stage 3: Terminating user processes"
kill_user_procs kill_user_procs
say ">>> Stage 4: Removing binary"
$SUDO rm -f "${INSTALL_DIR}/${BIN_NAME}" $SUDO rm -f "${INSTALL_DIR}/${BIN_NAME}"
$SUDO userdel telemt 2>/dev/null || $SUDO deluser telemt 2>/dev/null || true if [ "$ACTION" = "purge" ]; then
$SUDO groupdel telemt 2>/dev/null || $SUDO delgroup telemt 2>/dev/null || true say ">>> Stage 5: Purging configuration, data, and user"
if [ "$purge_data" -eq 1 ]; then
say "Purging configuration and data..."
$SUDO rm -rf "$CONFIG_DIR" "$WORK_DIR" $SUDO rm -rf "$CONFIG_DIR" "$WORK_DIR"
$SUDO rm -f "$CONFIG_FILE"
if [ "$CONFIG_PARENT_DIR" != "$CONFIG_DIR" ] && [ "$CONFIG_PARENT_DIR" != "." ] && [ "$CONFIG_PARENT_DIR" != "/" ]; then
$SUDO rmdir "$CONFIG_PARENT_DIR" 2>/dev/null || true
fi
$SUDO userdel telemt 2>/dev/null || $SUDO deluser telemt 2>/dev/null || true
$SUDO groupdel telemt 2>/dev/null || $SUDO delgroup telemt 2>/dev/null || true
else else
say "Note: Configuration in $CONFIG_DIR was kept. Run with '--purge' to remove it." say "Note: Configuration and user kept. Run with 'purge' to remove completely."
fi fi
say "Uninstallation complete." printf '\n====================================================================\n'
printf ' UNINSTALLATION COMPLETE\n'
printf '====================================================================\n\n'
exit 0 exit 0
} }
# ============================================================================
# Main Entry Point
# ============================================================================
case "$ACTION" in case "$ACTION" in
help) help) show_help ;;
show_help uninstall|purge) verify_common; uninstall ;;
;;
uninstall|purge)
verify_common
uninstall
;;
install) install)
say "Starting installation..." say "Starting installation of $BIN_NAME (Version: $TARGET_VERSION)"
verify_common
verify_install_deps
ARCH="$(detect_arch)" say ">>> Stage 1: Verifying environment and dependencies"
LIBC="$(detect_libc)" verify_common; verify_install_deps
say "Detected system: $ARCH-linux-$LIBC"
if [ "$TARGET_VERSION" != "latest" ]; then
TARGET_VERSION="${TARGET_VERSION#v}"
fi
ARCH="$(detect_arch)"; LIBC="$(detect_libc)"
FILE_NAME="${BIN_NAME}-${ARCH}-linux-${LIBC}.tar.gz" FILE_NAME="${BIN_NAME}-${ARCH}-linux-${LIBC}.tar.gz"
FILE_NAME="$(printf '%s' "$FILE_NAME" | tr -d ' \t\n\r')"
if [ "$TARGET_VERSION" = "latest" ]; then if [ "$TARGET_VERSION" = "latest" ]; then
DL_URL="https://github.com/${REPO}/releases/latest/download/${FILE_NAME}" DL_URL="https://github.com/${REPO}/releases/latest/download/${FILE_NAME}"
else else
DL_URL="https://github.com/${REPO}/releases/download/${TARGET_VERSION}/${FILE_NAME}" DL_URL="https://github.com/${REPO}/releases/download/${TARGET_VERSION}/${FILE_NAME}"
fi fi
TEMP_DIR="$(mktemp -d)" || die "Failed to create temp directory" say ">>> Stage 2: Downloading archive"
TEMP_DIR="$(mktemp -d)" || die "Temp directory creation failed"
if [ -z "$TEMP_DIR" ] || [ ! -d "$TEMP_DIR" ]; then if [ -z "$TEMP_DIR" ] || [ ! -d "$TEMP_DIR" ]; then
die "Temp directory creation failed" die "Temp directory is invalid or was not created"
fi fi
say "Downloading from $DL_URL..." fetch_file "$DL_URL" "${TEMP_DIR}/${FILE_NAME}" || die "Download failed"
fetch_file "$DL_URL" "${TEMP_DIR}/archive.tar.gz" || die "Download failed (check version or network)"
gzip -dc "${TEMP_DIR}/archive.tar.gz" | tar -xf - -C "$TEMP_DIR" || die "Extraction failed" say ">>> Stage 3: Extracting archive"
if ! gzip -dc "${TEMP_DIR}/${FILE_NAME}" | tar -xf - -C "$TEMP_DIR" 2>/dev/null; then
die "Extraction failed (downloaded archive might be invalid or 404)."
fi
EXTRACTED_BIN="$(find "$TEMP_DIR" -type f -name "$BIN_NAME" -print 2>/dev/null | head -n 1)" EXTRACTED_BIN="$(find "$TEMP_DIR" -type f -name "$BIN_NAME" -print 2>/dev/null | head -n 1 || true)"
[ -z "$EXTRACTED_BIN" ] && die "Binary '$BIN_NAME' not found in archive" [ -n "$EXTRACTED_BIN" ] || die "Binary '$BIN_NAME' not found in archive"
ensure_user_group say ">>> Stage 4: Setting up environment (User, Group, Directories)"
setup_dirs ensure_user_group; setup_dirs; stop_service
stop_service
say ">>> Stage 5: Installing binary"
say "Installing binary..."
install_binary "$EXTRACTED_BIN" "${INSTALL_DIR}/${BIN_NAME}" install_binary "$EXTRACTED_BIN" "${INSTALL_DIR}/${BIN_NAME}"
say ">>> Stage 6: Generating configuration"
install_config install_config
say ">>> Stage 7: Installing and starting service"
install_service install_service
say "" if [ "${SERVICE_START_FAILED:-0}" -eq 1 ]; then
say "=============================================" printf '\n====================================================================\n'
say "Installation complete!" printf ' INSTALLATION COMPLETED WITH WARNINGS\n'
say "=============================================" printf '====================================================================\n\n'
if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then printf 'The service was installed but failed to start automatically.\n'
say "To check the logs, run:" printf 'Please check the logs to determine the issue.\n\n'
say " journalctl -u $SERVICE_NAME -f"
say ""
fi
say "To get user connection links, run:"
if command -v jq >/dev/null 2>&1; then
say " curl -s http://127.0.0.1:9091/v1/users | jq -r '.data[] | \"User: \\(.username)\\n\\(.links.tls[0] // empty)\"'"
else else
say " curl -s http://127.0.0.1:9091/v1/users" printf '\n====================================================================\n'
say " (Note: Install 'jq' package to see the links nicely formatted)" printf ' INSTALLATION SUCCESS\n'
printf '====================================================================\n\n'
fi fi
svc="$(get_svc_mgr)"
if [ "$svc" = "systemd" ]; then
printf 'To check the status of your proxy service, run:\n'
printf ' systemctl status %s\n\n' "$SERVICE_NAME"
elif [ "$svc" = "openrc" ]; then
printf 'To check the status of your proxy service, run:\n'
printf ' rc-service %s status\n\n' "$SERVICE_NAME"
fi
printf 'To get your user connection links (for Telegram), run:\n'
if command -v jq >/dev/null 2>&1; then
printf ' curl -s http://127.0.0.1:9091/v1/users | jq -r '\''.data[] | "User: \\(.username)\\n\\(.links.tls[0] // empty)\\n"'\''\n'
else
printf ' curl -s http://127.0.0.1:9091/v1/users\n'
printf ' (Tip: Install '\''jq'\'' for a much cleaner output)\n'
fi
printf '\n====================================================================\n'
;; ;;
esac esac

View File

@ -198,6 +198,7 @@ desync_all_full = false
update_every = 43200 update_every = 43200
hardswap = false hardswap = false
me_pool_drain_ttl_secs = 90 me_pool_drain_ttl_secs = 90
me_instadrain = false
me_pool_min_fresh_ratio = 0.8 me_pool_min_fresh_ratio = 0.8
me_reinit_drain_timeout_secs = 120 me_reinit_drain_timeout_secs = 120

View File

@ -615,6 +615,10 @@ pub(crate) fn default_me_pool_drain_ttl_secs() -> u64 {
90 90
} }
pub(crate) fn default_me_instadrain() -> bool {
false
}
pub(crate) fn default_me_pool_drain_threshold() -> u64 { pub(crate) fn default_me_pool_drain_threshold() -> u64 {
128 128
} }

View File

@ -54,6 +54,7 @@ pub struct HotFields {
pub me_reinit_coalesce_window_ms: u64, pub me_reinit_coalesce_window_ms: u64,
pub hardswap: bool, pub hardswap: bool,
pub me_pool_drain_ttl_secs: u64, pub me_pool_drain_ttl_secs: u64,
pub me_instadrain: bool,
pub me_pool_drain_threshold: u64, pub me_pool_drain_threshold: u64,
pub me_pool_min_fresh_ratio: f32, pub me_pool_min_fresh_ratio: f32,
pub me_reinit_drain_timeout_secs: u64, pub me_reinit_drain_timeout_secs: u64,
@ -136,6 +137,7 @@ impl HotFields {
me_reinit_coalesce_window_ms: cfg.general.me_reinit_coalesce_window_ms, me_reinit_coalesce_window_ms: cfg.general.me_reinit_coalesce_window_ms,
hardswap: cfg.general.hardswap, hardswap: cfg.general.hardswap,
me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs, me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs,
me_instadrain: cfg.general.me_instadrain,
me_pool_drain_threshold: cfg.general.me_pool_drain_threshold, me_pool_drain_threshold: cfg.general.me_pool_drain_threshold,
me_pool_min_fresh_ratio: cfg.general.me_pool_min_fresh_ratio, me_pool_min_fresh_ratio: cfg.general.me_pool_min_fresh_ratio,
me_reinit_drain_timeout_secs: cfg.general.me_reinit_drain_timeout_secs, me_reinit_drain_timeout_secs: cfg.general.me_reinit_drain_timeout_secs,
@ -431,6 +433,7 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
cfg.general.me_reinit_coalesce_window_ms = new.general.me_reinit_coalesce_window_ms; cfg.general.me_reinit_coalesce_window_ms = new.general.me_reinit_coalesce_window_ms;
cfg.general.hardswap = new.general.hardswap; cfg.general.hardswap = new.general.hardswap;
cfg.general.me_pool_drain_ttl_secs = new.general.me_pool_drain_ttl_secs; cfg.general.me_pool_drain_ttl_secs = new.general.me_pool_drain_ttl_secs;
cfg.general.me_instadrain = new.general.me_instadrain;
cfg.general.me_pool_drain_threshold = new.general.me_pool_drain_threshold; cfg.general.me_pool_drain_threshold = new.general.me_pool_drain_threshold;
cfg.general.me_pool_min_fresh_ratio = new.general.me_pool_min_fresh_ratio; cfg.general.me_pool_min_fresh_ratio = new.general.me_pool_min_fresh_ratio;
cfg.general.me_reinit_drain_timeout_secs = new.general.me_reinit_drain_timeout_secs; cfg.general.me_reinit_drain_timeout_secs = new.general.me_reinit_drain_timeout_secs;
@ -805,6 +808,12 @@ fn log_changes(
old_hot.me_pool_drain_ttl_secs, new_hot.me_pool_drain_ttl_secs, old_hot.me_pool_drain_ttl_secs, new_hot.me_pool_drain_ttl_secs,
); );
} }
if old_hot.me_instadrain != new_hot.me_instadrain {
info!(
"config reload: me_instadrain: {} → {}",
old_hot.me_instadrain, new_hot.me_instadrain,
);
}
if old_hot.me_pool_drain_threshold != new_hot.me_pool_drain_threshold { if old_hot.me_pool_drain_threshold != new_hot.me_pool_drain_threshold {
info!( info!(

View File

@ -812,6 +812,10 @@ pub struct GeneralConfig {
#[serde(default = "default_me_pool_drain_ttl_secs")] #[serde(default = "default_me_pool_drain_ttl_secs")]
pub me_pool_drain_ttl_secs: u64, pub me_pool_drain_ttl_secs: u64,
/// Force-remove any draining writer on the next cleanup tick, regardless of age/deadline.
#[serde(default = "default_me_instadrain")]
pub me_instadrain: bool,
/// Maximum allowed number of draining ME writers before oldest ones are force-closed in batches. /// Maximum allowed number of draining ME writers before oldest ones are force-closed in batches.
/// Set to 0 to disable threshold-based draining cleanup and keep timeout-only behavior. /// Set to 0 to disable threshold-based draining cleanup and keep timeout-only behavior.
#[serde(default = "default_me_pool_drain_threshold")] #[serde(default = "default_me_pool_drain_threshold")]
@ -1020,6 +1024,7 @@ impl Default for GeneralConfig {
me_secret_atomic_snapshot: default_me_secret_atomic_snapshot(), me_secret_atomic_snapshot: default_me_secret_atomic_snapshot(),
proxy_secret_len_max: default_proxy_secret_len_max(), proxy_secret_len_max: default_proxy_secret_len_max(),
me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(), me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(),
me_instadrain: default_me_instadrain(),
me_pool_drain_threshold: default_me_pool_drain_threshold(), me_pool_drain_threshold: default_me_pool_drain_threshold(),
me_pool_drain_soft_evict_enabled: default_me_pool_drain_soft_evict_enabled(), me_pool_drain_soft_evict_enabled: default_me_pool_drain_soft_evict_enabled(),
me_pool_drain_soft_evict_grace_secs: default_me_pool_drain_soft_evict_grace_secs(), me_pool_drain_soft_evict_grace_secs: default_me_pool_drain_soft_evict_grace_secs(),

View File

@ -237,6 +237,7 @@ pub(crate) async fn initialize_me_pool(
config.general.me_adaptive_floor_max_warm_writers_global, config.general.me_adaptive_floor_max_warm_writers_global,
config.general.hardswap, config.general.hardswap,
config.general.me_pool_drain_ttl_secs, config.general.me_pool_drain_ttl_secs,
config.general.me_instadrain,
config.general.me_pool_drain_threshold, config.general.me_pool_drain_threshold,
config.general.effective_me_pool_force_close_secs(), config.general.effective_me_pool_force_close_secs(),
config.general.me_pool_min_fresh_ratio, config.general.me_pool_min_fresh_ratio,
@ -335,6 +336,13 @@ pub(crate) async fn initialize_me_pool(
) )
.await; .await;
}); });
let pool_drain_enforcer = pool_bg.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(
pool_drain_enforcer,
)
.await;
});
break; break;
} }
Err(e) => { Err(e) => {
@ -402,6 +410,13 @@ pub(crate) async fn initialize_me_pool(
) )
.await; .await;
}); });
let pool_drain_enforcer = pool.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(
pool_drain_enforcer,
)
.await;
});
break Some(pool); break Some(pool);
} }

View File

@ -1065,6 +1065,7 @@ async fn make_me_pool_for_abort_test(stats: Arc<Stats>) -> Arc<MePool> {
general.me_adaptive_floor_max_warm_writers_global, general.me_adaptive_floor_max_warm_writers_global,
general.hardswap, general.hardswap,
general.me_pool_drain_ttl_secs, general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold, general.me_pool_drain_threshold,
general.effective_me_pool_force_close_secs(), general.effective_me_pool_force_close_secs(),
general.me_pool_min_fresh_ratio, general.me_pool_min_fresh_ratio,

View File

@ -298,6 +298,7 @@ async fn run_update_cycle(
pool.update_runtime_reinit_policy( pool.update_runtime_reinit_policy(
cfg.general.hardswap, cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs, cfg.general.me_pool_drain_ttl_secs,
cfg.general.me_instadrain,
cfg.general.me_pool_drain_threshold, cfg.general.me_pool_drain_threshold,
cfg.general.effective_me_pool_force_close_secs(), cfg.general.effective_me_pool_force_close_secs(),
cfg.general.me_pool_min_fresh_ratio, cfg.general.me_pool_min_fresh_ratio,
@ -525,6 +526,7 @@ pub async fn me_config_updater(
pool.update_runtime_reinit_policy( pool.update_runtime_reinit_policy(
cfg.general.hardswap, cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs, cfg.general.me_pool_drain_ttl_secs,
cfg.general.me_instadrain,
cfg.general.me_pool_drain_threshold, cfg.general.me_pool_drain_threshold,
cfg.general.effective_me_pool_force_close_secs(), cfg.general.effective_me_pool_force_close_secs(),
cfg.general.me_pool_min_fresh_ratio, cfg.general.me_pool_min_fresh_ratio,

View File

@ -28,6 +28,7 @@ const HEALTH_RECONNECT_BUDGET_MAX: usize = 128;
const HEALTH_DRAIN_CLOSE_BUDGET_PER_CORE: usize = 16; const HEALTH_DRAIN_CLOSE_BUDGET_PER_CORE: usize = 16;
const HEALTH_DRAIN_CLOSE_BUDGET_MIN: usize = 16; const HEALTH_DRAIN_CLOSE_BUDGET_MIN: usize = 16;
const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256; const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256;
const HEALTH_DRAIN_TIMEOUT_ENFORCER_INTERVAL_SECS: u64 = 1;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct DcFloorPlanEntry { struct DcFloorPlanEntry {
@ -114,6 +115,17 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
} }
} }
pub async fn me_drain_timeout_enforcer(pool: Arc<MePool>) {
let mut drain_warn_next_allowed: HashMap<u64, Instant> = HashMap::new();
loop {
tokio::time::sleep(Duration::from_secs(
HEALTH_DRAIN_TIMEOUT_ENFORCER_INTERVAL_SECS,
))
.await;
reap_draining_writers(&pool, &mut drain_warn_next_allowed).await;
}
}
pub(super) async fn reap_draining_writers( pub(super) async fn reap_draining_writers(
pool: &Arc<MePool>, pool: &Arc<MePool>,
warn_next_allowed: &mut HashMap<u64, Instant>, warn_next_allowed: &mut HashMap<u64, Instant>,
@ -1482,6 +1494,7 @@ mod tests {
general.me_adaptive_floor_max_warm_writers_global, general.me_adaptive_floor_max_warm_writers_global,
general.hardswap, general.hardswap,
general.me_pool_drain_ttl_secs, general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold, general.me_pool_drain_threshold,
general.effective_me_pool_force_close_secs(), general.effective_me_pool_force_close_secs(),
general.me_pool_min_fresh_ratio, general.me_pool_min_fresh_ratio,

View File

@ -82,6 +82,7 @@ async fn make_pool(
general.me_adaptive_floor_max_warm_writers_global, general.me_adaptive_floor_max_warm_writers_global,
general.hardswap, general.hardswap,
general.me_pool_drain_ttl_secs, general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold, general.me_pool_drain_threshold,
general.effective_me_pool_force_close_secs(), general.effective_me_pool_force_close_secs(),
general.me_pool_min_fresh_ratio, general.me_pool_min_fresh_ratio,
@ -238,7 +239,7 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle
insert_draining_writer( insert_draining_writer(
&pool, &pool,
writer_id, writer_id,
now_epoch_secs.saturating_sub(600).saturating_add(writer_id), now_epoch_secs.saturating_sub(20),
1, 1,
0, 0,
) )
@ -254,7 +255,7 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle
} }
assert_eq!(writer_count(&pool).await, threshold as usize); assert_eq!(writer_count(&pool).await, threshold as usize);
assert_eq!(sorted_writer_ids(&pool).await, vec![58, 59, 60]); assert_eq!(sorted_writer_ids(&pool).await, vec![1, 2, 3]);
} }
#[tokio::test] #[tokio::test]

View File

@ -80,6 +80,7 @@ async fn make_pool(
general.me_adaptive_floor_max_warm_writers_global, general.me_adaptive_floor_max_warm_writers_global,
general.hardswap, general.hardswap,
general.me_pool_drain_ttl_secs, general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold, general.me_pool_drain_threshold,
general.effective_me_pool_force_close_secs(), general.effective_me_pool_force_close_secs(),
general.me_pool_min_fresh_ratio, general.me_pool_min_fresh_ratio,

View File

@ -73,6 +73,7 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
general.me_adaptive_floor_max_warm_writers_global, general.me_adaptive_floor_max_warm_writers_global,
general.hardswap, general.hardswap,
general.me_pool_drain_ttl_secs, general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold, general.me_pool_drain_threshold,
general.effective_me_pool_force_close_secs(), general.effective_me_pool_force_close_secs(),
general.me_pool_min_fresh_ratio, general.me_pool_min_fresh_ratio,
@ -187,8 +188,14 @@ async fn set_writer_draining(pool: &Arc<MePool>, writer_id: u64, draining: bool)
async fn reap_draining_writers_drops_warn_state_for_removed_writer() { async fn reap_draining_writers_drops_warn_state_for_removed_writer() {
let pool = make_pool(128).await; let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs(); let now_epoch_secs = MePool::now_epoch_secs();
let conn_ids = let conn_ids = insert_draining_writer(
insert_draining_writer(&pool, 7, now_epoch_secs.saturating_sub(180), 1, 0).await; &pool,
7,
now_epoch_secs.saturating_sub(180),
1,
now_epoch_secs.saturating_add(3_600),
)
.await;
let mut warn_next_allowed = HashMap::new(); let mut warn_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await; reap_draining_writers(&pool, &mut warn_next_allowed).await;
@ -251,17 +258,17 @@ async fn reap_draining_writers_deadline_force_close_applies_under_threshold() {
#[tokio::test] #[tokio::test]
async fn reap_draining_writers_limits_closes_per_health_tick() { async fn reap_draining_writers_limits_closes_per_health_tick() {
let pool = make_pool(128).await; let pool = make_pool(1).await;
let now_epoch_secs = MePool::now_epoch_secs(); let now_epoch_secs = MePool::now_epoch_secs();
let close_budget = health_drain_close_budget(); let close_budget = health_drain_close_budget();
let writer_total = close_budget.saturating_add(19); let writer_total = close_budget.saturating_add(20);
for writer_id in 1..=writer_total as u64 { for writer_id in 1..=writer_total as u64 {
insert_draining_writer( insert_draining_writer(
&pool, &pool,
writer_id, writer_id,
now_epoch_secs.saturating_sub(20), now_epoch_secs.saturating_sub(20),
1, 1,
now_epoch_secs.saturating_sub(1), 0,
) )
.await; .await;
} }
@ -400,8 +407,8 @@ async fn reap_draining_writers_backlog_drains_across_ticks() {
&pool, &pool,
writer_id, writer_id,
now_epoch_secs.saturating_sub(20), now_epoch_secs.saturating_sub(20),
1, 0,
now_epoch_secs.saturating_sub(1), 0,
) )
.await; .await;
} }
@ -428,7 +435,7 @@ async fn reap_draining_writers_threshold_backlog_converges_to_threshold() {
insert_draining_writer( insert_draining_writer(
&pool, &pool,
writer_id, writer_id,
now_epoch_secs.saturating_sub(200).saturating_add(writer_id), now_epoch_secs.saturating_sub(20),
1, 1,
0, 0,
) )
@ -462,26 +469,26 @@ async fn reap_draining_writers_threshold_zero_preserves_non_expired_non_empty_wr
#[tokio::test] #[tokio::test]
async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() { async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() {
let pool = make_pool(128).await; let pool = make_pool(1).await;
let now_epoch_secs = MePool::now_epoch_secs(); let now_epoch_secs = MePool::now_epoch_secs();
let close_budget = health_drain_close_budget(); let close_budget = health_drain_close_budget();
for writer_id in 1..=close_budget as u64 { for writer_id in 1..=close_budget.saturating_add(1) as u64 {
insert_draining_writer( insert_draining_writer(
&pool, &pool,
writer_id, writer_id,
now_epoch_secs.saturating_sub(20), now_epoch_secs.saturating_sub(20),
1, 1,
now_epoch_secs.saturating_sub(1), 0,
) )
.await; .await;
} }
let empty_writer_id = close_budget as u64 + 1; let empty_writer_id = close_budget.saturating_add(2) as u64;
insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await; insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await;
let mut warn_next_allowed = HashMap::new(); let mut warn_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await; reap_draining_writers(&pool, &mut warn_next_allowed).await;
assert_eq!(current_writer_ids(&pool).await, vec![empty_writer_id]); assert_eq!(current_writer_ids(&pool).await, vec![1, empty_writer_id]);
} }
#[tokio::test] #[tokio::test]

View File

@ -32,7 +32,7 @@ mod send_adversarial_tests;
use bytes::Bytes; use bytes::Bytes;
pub use health::me_health_monitor; pub use health::{me_drain_timeout_enforcer, me_health_monitor};
#[allow(unused_imports)] #[allow(unused_imports)]
pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily}; pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily};
pub use pool::MePool; pub use pool::MePool;

View File

@ -172,6 +172,7 @@ pub struct MePool {
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>, pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>, pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>,
pub(super) me_pool_drain_ttl_secs: AtomicU64, pub(super) me_pool_drain_ttl_secs: AtomicU64,
pub(super) me_instadrain: AtomicBool,
pub(super) me_pool_drain_threshold: AtomicU64, pub(super) me_pool_drain_threshold: AtomicU64,
pub(super) me_pool_force_close_secs: AtomicU64, pub(super) me_pool_force_close_secs: AtomicU64,
pub(super) me_pool_min_fresh_ratio_permille: AtomicU32, pub(super) me_pool_min_fresh_ratio_permille: AtomicU32,
@ -273,6 +274,7 @@ impl MePool {
me_adaptive_floor_max_warm_writers_global: u32, me_adaptive_floor_max_warm_writers_global: u32,
hardswap: bool, hardswap: bool,
me_pool_drain_ttl_secs: u64, me_pool_drain_ttl_secs: u64,
me_instadrain: bool,
me_pool_drain_threshold: u64, me_pool_drain_threshold: u64,
me_pool_force_close_secs: u64, me_pool_force_close_secs: u64,
me_pool_min_fresh_ratio: f32, me_pool_min_fresh_ratio: f32,
@ -450,6 +452,7 @@ impl MePool {
endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())), endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())),
kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())), kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())),
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs), me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
me_instadrain: AtomicBool::new(me_instadrain),
me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold), me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold),
me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs), me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs),
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille( me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(
@ -497,6 +500,7 @@ impl MePool {
&self, &self,
hardswap: bool, hardswap: bool,
drain_ttl_secs: u64, drain_ttl_secs: u64,
instadrain: bool,
pool_drain_threshold: u64, pool_drain_threshold: u64,
force_close_secs: u64, force_close_secs: u64,
min_fresh_ratio: f32, min_fresh_ratio: f32,
@ -536,6 +540,7 @@ impl MePool {
self.hardswap.store(hardswap, Ordering::Relaxed); self.hardswap.store(hardswap, Ordering::Relaxed);
self.me_pool_drain_ttl_secs self.me_pool_drain_ttl_secs
.store(drain_ttl_secs, Ordering::Relaxed); .store(drain_ttl_secs, Ordering::Relaxed);
self.me_instadrain.store(instadrain, Ordering::Relaxed);
self.me_pool_drain_threshold self.me_pool_drain_threshold
.store(pool_drain_threshold, Ordering::Relaxed); .store(pool_drain_threshold, Ordering::Relaxed);
self.me_pool_force_close_secs self.me_pool_force_close_secs

View File

@ -75,6 +75,7 @@ async fn make_pool() -> (Arc<MePool>, Arc<SecureRandom>) {
general.me_adaptive_floor_max_warm_writers_global, general.me_adaptive_floor_max_warm_writers_global,
general.hardswap, general.hardswap,
general.me_pool_drain_ttl_secs, general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold, general.me_pool_drain_threshold,
general.effective_me_pool_force_close_secs(), general.effective_me_pool_force_close_secs(),
general.me_pool_min_fresh_ratio, general.me_pool_min_fresh_ratio,