From 05c066c676328e7d26a82f462662ce0b960dd4a3 Mon Sep 17 00:00:00 2001 From: TEMAndroid Date: Thu, 19 Mar 2026 15:54:01 +0300 Subject: [PATCH 1/5] fix(docker): expose port 9091 and allow external API access Add 9091 port mapping to compose.yml to make the REST API reachable from outside the container. Previously only port 9090 (metrics) was published, making the documented curl commands non-functional. fixes #434 --- Dockerfile | 1 + docker-compose.yml | 1 + docs/QUICK_START_GUIDE.en.md | 2 ++ docs/QUICK_START_GUIDE.ru.md | 2 ++ 4 files changed, 6 insertions(+) diff --git a/Dockerfile b/Dockerfile index 7abe548..15a4900 100644 --- a/Dockerfile +++ b/Dockerfile @@ -38,6 +38,7 @@ USER telemt EXPOSE 443 EXPOSE 9090 +EXPOSE 9091 ENTRYPOINT ["/app/telemt"] CMD ["config.toml"] diff --git a/docker-compose.yml b/docker-compose.yml index 5866c4d..815ba24 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,6 +7,7 @@ services: ports: - "443:443" - "127.0.0.1:9090:9090" + - "127.0.0.1:9091:9091" # Allow caching 'proxy-secret' in read-only container working_dir: /run/telemt volumes: diff --git a/docs/QUICK_START_GUIDE.en.md b/docs/QUICK_START_GUIDE.en.md index c43a2dc..ffb387f 100644 --- a/docs/QUICK_START_GUIDE.en.md +++ b/docs/QUICK_START_GUIDE.en.md @@ -181,6 +181,8 @@ docker compose down docker build -t telemt:local . docker run --name telemt --restart unless-stopped \ -p 443:443 \ + -p 9090:9090 \ + -p 9091:9091 \ -e RUST_LOG=info \ -v "$PWD/config.toml:/app/config.toml:ro" \ --read-only \ diff --git a/docs/QUICK_START_GUIDE.ru.md b/docs/QUICK_START_GUIDE.ru.md index 35fbd27..e4c5005 100644 --- a/docs/QUICK_START_GUIDE.ru.md +++ b/docs/QUICK_START_GUIDE.ru.md @@ -183,6 +183,8 @@ docker compose down docker build -t telemt:local . docker run --name telemt --restart unless-stopped \ -p 443:443 \ + -p 9090:9090 \ + -p 9091:9091 \ -e RUST_LOG=info \ -v "$PWD/config.toml:/app/config.toml:ro" \ --read-only \ From f655924323a02e337f9fdfc87c826d89145df3af Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Mar 2026 16:15:00 +0300 Subject: [PATCH 2/5] Update health.rs Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/health.rs | 35 +++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 6d0af64..21619c7 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -30,6 +30,11 @@ const HEALTH_DRAIN_CLOSE_BUDGET_MIN: usize = 16; const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256; const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MIN: usize = 8; const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MAX: usize = 256; +const HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS: u64 = 1; +#[cfg(not(test))] +const HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE: bool = true; +#[cfg(test)] +const HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE: bool = false; #[derive(Debug, Clone)] struct DcFloorPlanEntry { @@ -99,6 +104,8 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut adaptive_idle_since, &mut adaptive_recover_until, &mut floor_warn_next_allowed, + &mut drain_warn_next_allowed, + &mut drain_soft_evict_next_allowed, ) .await; let v6_degraded = check_family( @@ -116,6 +123,8 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut adaptive_idle_since, &mut adaptive_recover_until, &mut floor_warn_next_allowed, + &mut drain_warn_next_allowed, + &mut drain_soft_evict_next_allowed, ) .await; degraded_interval = v4_degraded || v6_degraded; @@ -154,6 +163,11 @@ pub(super) async fn reap_draining_writers( } draining_writers.push(writer); } + if HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE { + for writer in draining_writers.drain(..) { + force_close_writer_ids.push(writer.id); + } + } if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize { draining_writers.sort_by(|left, right| { @@ -299,10 +313,14 @@ pub(super) async fn reap_draining_writers( } } - let close_budget = health_drain_close_budget(); let requested_force_close = force_close_writer_ids.len(); let requested_empty_close = empty_writer_ids.len(); let requested_close_total = requested_force_close.saturating_add(requested_empty_close); + let close_budget = if HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE { + requested_close_total + } else { + health_drain_close_budget() + }; let mut closed_writer_ids = HashSet::::new(); let mut closed_total = 0usize; for writer_id in force_close_writer_ids { @@ -396,6 +414,8 @@ async fn check_family( adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>, + drain_warn_next_allowed: &mut HashMap, + drain_soft_evict_next_allowed: &mut HashMap, ) -> bool { let enabled = match family { IpFamily::V4 => pool.decision.ipv4_me, @@ -476,8 +496,15 @@ async fn check_family( floor_plan.active_writers_current, floor_plan.warm_writers_current, ); + let mut next_drain_reap_at = Instant::now(); for (dc, endpoints) in dc_endpoints { + if Instant::now() >= next_drain_reap_at { + reap_draining_writers(pool, drain_warn_next_allowed, drain_soft_evict_next_allowed) + .await; + next_drain_reap_at = Instant::now() + + Duration::from_secs(HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS); + } if endpoints.is_empty() { continue; } @@ -621,6 +648,12 @@ async fn check_family( let mut restored = 0usize; for _ in 0..missing { + if Instant::now() >= next_drain_reap_at { + reap_draining_writers(pool, drain_warn_next_allowed, drain_soft_evict_next_allowed) + .await; + next_drain_reap_at = Instant::now() + + Duration::from_secs(HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS); + } if reconnect_budget == 0 { break; } From dc3363aa0d05bcbf83453cf567134b144997341d Mon Sep 17 00:00:00 2001 From: Dimasssss Date: Thu, 19 Mar 2026 16:23:32 +0300 Subject: [PATCH 3/5] Update install.sh --- install.sh | 583 ++++++++++++++++++++++++++++------------------------- 1 file changed, 307 insertions(+), 276 deletions(-) diff --git a/install.sh b/install.sh index 330bc3e..90c28f4 100644 --- a/install.sh +++ b/install.sh @@ -1,154 +1,190 @@ #!/bin/sh set -eu -# --- Global Configurations --- REPO="${REPO:-telemt/telemt}" BIN_NAME="${BIN_NAME:-telemt}" INSTALL_DIR="${INSTALL_DIR:-/bin}" CONFIG_DIR="${CONFIG_DIR:-/etc/telemt}" CONFIG_FILE="${CONFIG_FILE:-${CONFIG_DIR}/telemt.toml}" WORK_DIR="${WORK_DIR:-/opt/telemt}" +TLS_DOMAIN="${TLS_DOMAIN:-petrovich.ru}" SERVICE_NAME="telemt" TEMP_DIR="" SUDO="" +CONFIG_PARENT_DIR="" +SERVICE_START_FAILED=0 -# --- Argument Parsing --- ACTION="install" TARGET_VERSION="${VERSION:-latest}" while [ $# -gt 0 ]; do case "$1" in - -h|--help) - ACTION="help" - shift - ;; + -h|--help) ACTION="help"; shift ;; uninstall|--uninstall) - [ "$ACTION" != "purge" ] && ACTION="uninstall" - shift - ;; - --purge) - ACTION="purge" - shift - ;; - install|--install) - ACTION="install" - shift - ;; - -*) - printf '[ERROR] Unknown option: %s\n' "$1" >&2 - exit 1 - ;; + if [ "$ACTION" != "purge" ]; then ACTION="uninstall"; fi + shift ;; + purge|--purge) ACTION="purge"; shift ;; + install|--install) ACTION="install"; shift ;; + -*) printf '[ERROR] Unknown option: %s\n' "$1" >&2; exit 1 ;; *) - if [ "$ACTION" = "install" ]; then - TARGET_VERSION="$1" - fi - shift - ;; + if [ "$ACTION" = "install" ]; then TARGET_VERSION="$1" + else printf '[WARNING] Ignoring extra argument: %s\n' "$1" >&2; fi + shift ;; esac done -# --- Core Functions --- -say() { printf '[INFO] %s\n' "$*"; } +say() { + if [ "$#" -eq 0 ] || [ -z "${1:-}" ]; then + printf '\n' + else + printf '[INFO] %s\n' "$*" + fi +} die() { printf '[ERROR] %s\n' "$*" >&2; exit 1; } +write_root() { $SUDO sh -c 'cat > "$1"' _ "$1"; } + cleanup() { if [ -n "${TEMP_DIR:-}" ] && [ -d "$TEMP_DIR" ]; then rm -rf -- "$TEMP_DIR" fi } - trap cleanup EXIT INT TERM show_help() { - say "Usage: $0 [version | install | uninstall | --purge | --help]" - say " version Install specific version (e.g. 1.0.0, default: latest)" - say " uninstall Remove the binary and service (keeps config)" - say " --purge Remove everything including configuration" + say "Usage: $0 [ | install | uninstall | purge | --help ]" + say " Install specific version (e.g. 3.3.15, default: latest)" + say " install Install the latest version" + say " uninstall Remove the binary and service (keeps config and user)" + say " purge Remove everything including configuration, data, and user" exit 0 } -user_exists() { - if command -v getent >/dev/null 2>&1; then - getent passwd "$1" >/dev/null 2>&1 +check_os_entity() { + if command -v getent >/dev/null 2>&1; then getent "$1" "$2" >/dev/null 2>&1 + else grep -q "^${2}:" "/etc/$1" 2>/dev/null; fi +} + +normalize_path() { + printf '%s\n' "$1" | tr -s '/' | sed 's|/$||; s|^$|/|' +} + +get_realpath() { + path_in="$1" + case "$path_in" in /*) ;; *) path_in="$(pwd)/$path_in" ;; esac + + if command -v realpath >/dev/null 2>&1; then + if realpath_out="$(realpath -m "$path_in" 2>/dev/null)"; then + printf '%s\n' "$realpath_out" + return + fi + fi + + if command -v readlink >/dev/null 2>&1; then + resolved_path="$(readlink -f "$path_in" 2>/dev/null || true)" + if [ -n "$resolved_path" ]; then + printf '%s\n' "$resolved_path" + return + fi + fi + + d="${path_in%/*}"; b="${path_in##*/}" + if [ -z "$d" ]; then d="/"; fi + if [ "$d" = "$path_in" ]; then d="/"; b="$path_in"; fi + + if [ -d "$d" ]; then + abs_d="$(cd "$d" >/dev/null 2>&1 && pwd || true)" + if [ -n "$abs_d" ]; then + if [ "$b" = "." ] || [ -z "$b" ]; then printf '%s\n' "$abs_d" + elif [ "$abs_d" = "/" ]; then printf '/%s\n' "$b" + else printf '%s/%s\n' "$abs_d" "$b"; fi + else + normalize_path "$path_in" + fi else - grep -q "^${1}:" /etc/passwd 2>/dev/null + normalize_path "$path_in" fi } -group_exists() { - if command -v getent >/dev/null 2>&1; then - getent group "$1" >/dev/null 2>&1 - else - grep -q "^${1}:" /etc/group 2>/dev/null - fi +get_svc_mgr() { + if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then echo "systemd" + elif command -v rc-service >/dev/null 2>&1; then echo "openrc" + else echo "none"; fi } verify_common() { - [ -z "$BIN_NAME" ] && die "BIN_NAME cannot be empty." - [ -z "$INSTALL_DIR" ] && die "INSTALL_DIR cannot be empty." - [ -z "$CONFIG_DIR" ] && die "CONFIG_DIR cannot be empty." + [ -n "$BIN_NAME" ] || die "BIN_NAME cannot be empty." + [ -n "$INSTALL_DIR" ] || die "INSTALL_DIR cannot be empty." + [ -n "$CONFIG_DIR" ] || die "CONFIG_DIR cannot be empty." + [ -n "$CONFIG_FILE" ] || die "CONFIG_FILE cannot be empty." + + case "${INSTALL_DIR}${CONFIG_DIR}${WORK_DIR}${CONFIG_FILE}" in + *[!a-zA-Z0-9_./-]*) die "Invalid characters in paths. Only alphanumeric, _, ., -, and / allowed." ;; + esac + + case "$TARGET_VERSION" in *[!a-zA-Z0-9_.-]*) die "Invalid characters in version." ;; esac + case "$BIN_NAME" in *[!a-zA-Z0-9_-]*) die "Invalid characters in BIN_NAME." ;; esac + + INSTALL_DIR="$(get_realpath "$INSTALL_DIR")" + CONFIG_DIR="$(get_realpath "$CONFIG_DIR")" + WORK_DIR="$(get_realpath "$WORK_DIR")" + CONFIG_FILE="$(get_realpath "$CONFIG_FILE")" + + CONFIG_PARENT_DIR="${CONFIG_FILE%/*}" + if [ -z "$CONFIG_PARENT_DIR" ]; then CONFIG_PARENT_DIR="/"; fi + if [ "$CONFIG_PARENT_DIR" = "$CONFIG_FILE" ]; then CONFIG_PARENT_DIR="."; fi if [ "$(id -u)" -eq 0 ]; then SUDO="" else - if ! command -v sudo >/dev/null 2>&1; then - die "This script requires root or sudo. Neither found." - fi + command -v sudo >/dev/null 2>&1 || die "This script requires root or sudo. Neither found." SUDO="sudo" - say "sudo is available. Caching credentials..." - if ! sudo -v; then - die "Failed to cache sudo credentials" + if ! sudo -n true 2>/dev/null; then + if ! [ -t 0 ]; then + die "sudo requires a password, but no TTY detected. Aborting to prevent hang." + fi fi fi - case "${INSTALL_DIR}${CONFIG_DIR}${WORK_DIR}" in - *[!a-zA-Z0-9_./-]*) - die "Invalid characters in path variables. Only alphanumeric, _, ., -, and / are allowed." - ;; - esac - - case "$BIN_NAME" in - *[!a-zA-Z0-9_-]*) die "Invalid characters in BIN_NAME: $BIN_NAME" ;; - esac - - for path in "$CONFIG_DIR" "$WORK_DIR"; do - check_path="$path" - - while [ "$check_path" != "/" ] && [ "${check_path%"/"}" != "$check_path" ]; do - check_path="${check_path%"/"}" - done - [ -z "$check_path" ] && check_path="/" + if [ -n "$SUDO" ]; then + if $SUDO sh -c '[ -d "$1" ]' _ "$CONFIG_FILE"; then + die "Safety check failed: CONFIG_FILE '$CONFIG_FILE' is a directory." + fi + elif [ -d "$CONFIG_FILE" ]; then + die "Safety check failed: CONFIG_FILE '$CONFIG_FILE' is a directory." + fi + for path in "$CONFIG_DIR" "$CONFIG_PARENT_DIR" "$WORK_DIR"; do + check_path="$(get_realpath "$path")" case "$check_path" in - /|/bin|/sbin|/usr|/usr/bin|/usr/local|/etc|/opt|/var|/home|/root|/tmp) - die "Safety check failed: '$path' is a critical system directory." - ;; + /|/bin|/sbin|/usr|/usr/bin|/usr/sbin|/usr/local|/usr/local/bin|/usr/local/sbin|/usr/local/etc|/usr/local/share|/etc|/var|/var/lib|/var/log|/var/run|/home|/root|/tmp|/lib|/lib64|/opt|/run|/boot|/dev|/sys|/proc) + die "Safety check failed: '$path' (resolved to '$check_path') is a critical system directory." ;; esac done - for cmd in uname grep find rm chown chmod mv head mktemp; do + check_install_dir="$(get_realpath "$INSTALL_DIR")" + case "$check_install_dir" in + /|/etc|/var|/home|/root|/tmp|/usr|/usr/local|/opt|/boot|/dev|/sys|/proc|/run) + die "Safety check failed: INSTALL_DIR '$INSTALL_DIR' is a critical system directory." ;; + esac + + for cmd in id uname grep find rm chown chmod mv mktemp mkdir tr dd sed ps head sleep cat tar gzip rmdir; do command -v "$cmd" >/dev/null 2>&1 || die "Required command not found: $cmd" done } verify_install_deps() { - if ! command -v curl >/dev/null 2>&1 && ! command -v wget >/dev/null 2>&1; then - die "Neither curl nor wget is installed." - fi - command -v tar >/dev/null 2>&1 || die "Required command not found: tar" - command -v gzip >/dev/null 2>&1 || die "Required command not found: gzip" + command -v curl >/dev/null 2>&1 || command -v wget >/dev/null 2>&1 || die "Neither curl nor wget is installed." command -v cp >/dev/null 2>&1 || command -v install >/dev/null 2>&1 || die "Need cp or install" if ! command -v setcap >/dev/null 2>&1; then - say "setcap is missing. Installing required capability tools..." if command -v apk >/dev/null 2>&1; then - $SUDO apk add --no-cache libcap || die "Failed to install libcap" + $SUDO apk add --no-cache libcap-utils >/dev/null 2>&1 || $SUDO apk add --no-cache libcap >/dev/null 2>&1 || true elif command -v apt-get >/dev/null 2>&1; then - $SUDO apt-get update -qq && $SUDO apt-get install -y -qq libcap2-bin || die "Failed to install libcap2-bin" - elif command -v dnf >/dev/null 2>&1 || command -v yum >/dev/null 2>&1; then - $SUDO ${YUM_CMD:-yum} install -y -q libcap || die "Failed to install libcap" - else - die "Cannot install 'setcap'. Package manager not found. Please install libcap manually." + $SUDO apt-get update -q >/dev/null 2>&1 || true + $SUDO apt-get install -y -q libcap2-bin >/dev/null 2>&1 || true + elif command -v dnf >/dev/null 2>&1; then $SUDO dnf install -y -q libcap >/dev/null 2>&1 || true + elif command -v yum >/dev/null 2>&1; then $SUDO yum install -y -q libcap >/dev/null 2>&1 || true fi fi } @@ -163,122 +199,96 @@ detect_arch() { } detect_libc() { - if command -v ldd >/dev/null 2>&1 && ldd --version 2>&1 | grep -qi musl; then - echo "musl"; return 0 - fi - - if grep -q '^ID=alpine' /etc/os-release 2>/dev/null || grep -q '^ID="alpine"' /etc/os-release 2>/dev/null; then - echo "musl"; return 0 - fi for f in /lib/ld-musl-*.so.* /lib64/ld-musl-*.so.*; do - if [ -e "$f" ]; then - echo "musl"; return 0 - fi + if [ -e "$f" ]; then echo "musl"; return 0; fi done + if grep -qE '^ID="?alpine"?' /etc/os-release 2>/dev/null; then echo "musl"; return 0; fi + if command -v ldd >/dev/null 2>&1 && (ldd --version 2>&1 || true) | grep -qi musl; then echo "musl"; return 0; fi echo "gnu" } fetch_file() { - fetch_url="$1" - fetch_out="$2" - - if command -v curl >/dev/null 2>&1; then - curl -fsSL "$fetch_url" -o "$fetch_out" || return 1 - elif command -v wget >/dev/null 2>&1; then - wget -qO "$fetch_out" "$fetch_url" || return 1 - else - die "curl or wget required" - fi + if command -v curl >/dev/null 2>&1; then curl -fsSL "$1" -o "$2" + else wget -q -O "$2" "$1"; fi } ensure_user_group() { - nologin_bin="/bin/false" + nologin_bin="$(command -v nologin 2>/dev/null || command -v false 2>/dev/null || echo /bin/false)" - cmd_nologin="$(command -v nologin 2>/dev/null || true)" - if [ -n "$cmd_nologin" ] && [ -x "$cmd_nologin" ]; then - nologin_bin="$cmd_nologin" - else - for bin in /sbin/nologin /usr/sbin/nologin; do - if [ -x "$bin" ]; then - nologin_bin="$bin" - break - fi - done + if ! check_os_entity group telemt; then + if command -v groupadd >/dev/null 2>&1; then $SUDO groupadd -r telemt + elif command -v addgroup >/dev/null 2>&1; then $SUDO addgroup -S telemt + else die "Cannot create group"; fi fi - if ! group_exists telemt; then - if command -v groupadd >/dev/null 2>&1; then - $SUDO groupadd -r telemt || die "Failed to create group via groupadd" - elif command -v addgroup >/dev/null 2>&1; then - $SUDO addgroup -S telemt || die "Failed to create group via addgroup" - else - die "Cannot create group: neither groupadd nor addgroup found" - fi - fi - - if ! user_exists telemt; then + if ! check_os_entity passwd telemt; then if command -v useradd >/dev/null 2>&1; then - $SUDO useradd -r -g telemt -d "$WORK_DIR" -s "$nologin_bin" -c "Telemt Proxy" telemt || die "Failed to create user via useradd" + $SUDO useradd -r -g telemt -d "$WORK_DIR" -s "$nologin_bin" -c "Telemt Proxy" telemt elif command -v adduser >/dev/null 2>&1; then - $SUDO adduser -S -D -H -h "$WORK_DIR" -s "$nologin_bin" -G telemt telemt || die "Failed to create user via adduser" - else - die "Cannot create user: neither useradd nor adduser found" - fi + if adduser --help 2>&1 | grep -q -- '-S'; then + $SUDO adduser -S -D -H -h "$WORK_DIR" -s "$nologin_bin" -G telemt telemt + else + $SUDO adduser --system --home "$WORK_DIR" --shell "$nologin_bin" --no-create-home --ingroup telemt --disabled-password telemt + fi + else die "Cannot create user"; fi fi } setup_dirs() { - say "Setting up directories..." - $SUDO mkdir -p "$WORK_DIR" "$CONFIG_DIR" || die "Failed to create directories" - $SUDO chown telemt:telemt "$WORK_DIR" || die "Failed to set owner on WORK_DIR" - $SUDO chmod 750 "$WORK_DIR" || die "Failed to set permissions on WORK_DIR" + $SUDO mkdir -p "$WORK_DIR" "$CONFIG_DIR" "$CONFIG_PARENT_DIR" || die "Failed to create directories" + + $SUDO chown telemt:telemt "$WORK_DIR" && $SUDO chmod 750 "$WORK_DIR" + $SUDO chown root:telemt "$CONFIG_DIR" && $SUDO chmod 750 "$CONFIG_DIR" + + if [ "$CONFIG_PARENT_DIR" != "$CONFIG_DIR" ] && [ "$CONFIG_PARENT_DIR" != "." ] && [ "$CONFIG_PARENT_DIR" != "/" ]; then + $SUDO chown root:telemt "$CONFIG_PARENT_DIR" && $SUDO chmod 750 "$CONFIG_PARENT_DIR" + fi } stop_service() { - say "Stopping service if running..." - if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then + svc="$(get_svc_mgr)" + if [ "$svc" = "systemd" ] && systemctl is-active --quiet "$SERVICE_NAME" 2>/dev/null; then $SUDO systemctl stop "$SERVICE_NAME" 2>/dev/null || true - elif command -v rc-service >/dev/null 2>&1; then + elif [ "$svc" = "openrc" ] && rc-service "$SERVICE_NAME" status >/dev/null 2>&1; then $SUDO rc-service "$SERVICE_NAME" stop 2>/dev/null || true fi } install_binary() { - bin_src="$1" - bin_dst="$2" + bin_src="$1"; bin_dst="$2" + if [ -e "$INSTALL_DIR" ] && [ ! -d "$INSTALL_DIR" ]; then + die "'$INSTALL_DIR' is not a directory." + fi $SUDO mkdir -p "$INSTALL_DIR" || die "Failed to create install directory" if command -v install >/dev/null 2>&1; then $SUDO install -m 0755 "$bin_src" "$bin_dst" || die "Failed to install binary" else - $SUDO rm -f "$bin_dst" - $SUDO cp "$bin_src" "$bin_dst" || die "Failed to copy binary" - $SUDO chmod 0755 "$bin_dst" || die "Failed to set permissions" + $SUDO rm -f "$bin_dst" 2>/dev/null || true + $SUDO cp "$bin_src" "$bin_dst" && $SUDO chmod 0755 "$bin_dst" || die "Failed to copy binary" fi - if [ ! -x "$bin_dst" ]; then - die "Failed to install binary or it is not executable: $bin_dst" - fi + $SUDO sh -c '[ -x "$1" ]' _ "$bin_dst" || die "Binary not executable: $bin_dst" - say "Granting network bind capabilities to bind port 443..." - if ! $SUDO setcap cap_net_bind_service=+ep "$bin_dst" 2>/dev/null; then - say "[WARNING] Failed to apply setcap. The service will NOT be able to open port 443!" - say "[WARNING] This usually happens inside unprivileged Docker/LXC containers." + if command -v setcap >/dev/null 2>&1; then + $SUDO setcap cap_net_bind_service=+ep "$bin_dst" 2>/dev/null || true fi } generate_secret() { - if command -v openssl >/dev/null 2>&1; then - secret="$(openssl rand -hex 16 2>/dev/null)" && [ -n "$secret" ] && { echo "$secret"; return 0; } + secret="$(command -v openssl >/dev/null 2>&1 && openssl rand -hex 16 2>/dev/null || true)" + if [ -z "$secret" ] || [ "${#secret}" -ne 32 ]; then + if command -v od >/dev/null 2>&1; then secret="$(dd if=/dev/urandom bs=16 count=1 2>/dev/null | od -An -tx1 | tr -d ' \n')" + elif command -v hexdump >/dev/null 2>&1; then secret="$(dd if=/dev/urandom bs=16 count=1 2>/dev/null | hexdump -e '1/1 "%02x"')" + elif command -v xxd >/dev/null 2>&1; then secret="$(dd if=/dev/urandom bs=16 count=1 2>/dev/null | xxd -p | tr -d '\n')" + fi fi - if command -v xxd >/dev/null 2>&1; then - secret="$(dd if=/dev/urandom bs=1 count=16 2>/dev/null | xxd -p | tr -d '\n')" && [ -n "$secret" ] && { echo "$secret"; return 0; } - fi - secret="$(dd if=/dev/urandom bs=1 count=16 2>/dev/null | od -An -tx1 | tr -d ' \n')" && [ -n "$secret" ] && { echo "$secret"; return 0; } - return 1 + if [ "${#secret}" -eq 32 ]; then echo "$secret"; else return 1; fi } generate_config_content() { + escaped_tls_domain="$(printf '%s\n' "$TLS_DOMAIN" | tr -d '[:cntrl:]' | sed 's/\\/\\\\/g; s/"/\\"/g')" + cat </dev/null && config_exists=1 || true - else - [ -f "$CONFIG_FILE" ] && config_exists=1 || true - fi - - if [ "$config_exists" -eq 1 ]; then - say "Config already exists, skipping generation." + if $SUDO sh -c '[ -f "$1" ]' _ "$CONFIG_FILE"; then + say " -> Config already exists at $CONFIG_FILE. Skipping creation." + return 0 + fi + elif [ -f "$CONFIG_FILE" ]; then + say " -> Config already exists at $CONFIG_FILE. Skipping creation." return 0 fi - toml_secret="$(generate_secret)" || die "Failed to generate secret" - say "Creating config at $CONFIG_FILE..." + toml_secret="$(generate_secret)" || die "Failed to generate secret." - tmp_conf="$(mktemp "${TEMP_DIR:-/tmp}/telemt_conf.XXXXXX")" || die "Failed to create temp config" - generate_config_content "$toml_secret" > "$tmp_conf" || die "Failed to write temp config" + generate_config_content "$toml_secret" | write_root "$CONFIG_FILE" || die "Failed to install config" + $SUDO chown root:telemt "$CONFIG_FILE" && $SUDO chmod 640 "$CONFIG_FILE" - $SUDO mv "$tmp_conf" "$CONFIG_FILE" || die "Failed to install config file" - $SUDO chown root:telemt "$CONFIG_FILE" || die "Failed to set owner" - $SUDO chmod 640 "$CONFIG_FILE" || die "Failed to set config permissions" - - say "Secret for user 'hello': $toml_secret" + say " -> Config created successfully." + say " -> Generated secret for default user 'hello': $toml_secret" } generate_systemd_content() { cat </dev/null 2>&1 && [ -d /run/systemd/system ]; then - say "Installing systemd service..." - tmp_svc="$(mktemp "${TEMP_DIR:-/tmp}/${SERVICE_NAME}.service.XXXXXX")" || die "Failed to create temp service" - generate_systemd_content > "$tmp_svc" || die "Failed to generate service content" + svc="$(get_svc_mgr)" + if [ "$svc" = "systemd" ]; then + generate_systemd_content | write_root "/etc/systemd/system/${SERVICE_NAME}.service" + $SUDO chown root:root "/etc/systemd/system/${SERVICE_NAME}.service" && $SUDO chmod 644 "/etc/systemd/system/${SERVICE_NAME}.service" - $SUDO mv "$tmp_svc" "/etc/systemd/system/${SERVICE_NAME}.service" || die "Failed to move service file" - $SUDO chown root:root "/etc/systemd/system/${SERVICE_NAME}.service" - $SUDO chmod 644 "/etc/systemd/system/${SERVICE_NAME}.service" + $SUDO systemctl daemon-reload || true + $SUDO systemctl enable "$SERVICE_NAME" || true + + if ! $SUDO systemctl start "$SERVICE_NAME"; then + say "[WARNING] Failed to start service" + SERVICE_START_FAILED=1 + fi + elif [ "$svc" = "openrc" ]; then + generate_openrc_content | write_root "/etc/init.d/${SERVICE_NAME}" + $SUDO chown root:root "/etc/init.d/${SERVICE_NAME}" && $SUDO chmod 0755 "/etc/init.d/${SERVICE_NAME}" - $SUDO systemctl daemon-reload || die "Failed to reload systemd" - $SUDO systemctl enable "$SERVICE_NAME" || die "Failed to enable service" - $SUDO systemctl start "$SERVICE_NAME" || die "Failed to start service" - - elif command -v rc-update >/dev/null 2>&1; then - say "Installing OpenRC service..." - tmp_svc="$(mktemp "${TEMP_DIR:-/tmp}/${SERVICE_NAME}.init.XXXXXX")" || die "Failed to create temp file" - generate_openrc_content > "$tmp_svc" || die "Failed to generate init content" - - $SUDO mv "$tmp_svc" "/etc/init.d/${SERVICE_NAME}" || die "Failed to move service file" - $SUDO chown root:root "/etc/init.d/${SERVICE_NAME}" - $SUDO chmod 0755 "/etc/init.d/${SERVICE_NAME}" - - $SUDO rc-update add "$SERVICE_NAME" default 2>/dev/null || die "Failed to register service" - $SUDO rc-service "$SERVICE_NAME" start 2>/dev/null || die "Failed to start OpenRC service" + $SUDO rc-update add "$SERVICE_NAME" default 2>/dev/null || true + + if ! $SUDO rc-service "$SERVICE_NAME" start 2>/dev/null; then + say "[WARNING] Failed to start service" + SERVICE_START_FAILED=1 + fi else - say "No service manager found. You can start it manually with:" - if [ -n "$SUDO" ]; then - say " sudo -u telemt ${INSTALL_DIR}/${BIN_NAME} ${CONFIG_FILE}" - else - say " su -s /bin/sh telemt -c '${INSTALL_DIR}/${BIN_NAME} ${CONFIG_FILE}'" + cmd="\"${INSTALL_DIR}/${BIN_NAME}\" \"${CONFIG_FILE}\"" + if [ -n "$SUDO" ]; then + say " -> Service manager not found. Start manually: sudo -u telemt $cmd" + else + say " -> Service manager not found. Start manually: su -s /bin/sh telemt -c '$cmd'" fi fi } kill_user_procs() { - say "Ensuring $BIN_NAME processes are killed..." - - if pkill_cmd="$(command -v pkill 2>/dev/null)"; then - $SUDO "$pkill_cmd" -u telemt "$BIN_NAME" 2>/dev/null || true + if command -v pkill >/dev/null 2>&1; then + $SUDO pkill -u telemt "$BIN_NAME" 2>/dev/null || true sleep 1 - $SUDO "$pkill_cmd" -9 -u telemt "$BIN_NAME" 2>/dev/null || true - elif killall_cmd="$(command -v killall 2>/dev/null)"; then - $SUDO "$killall_cmd" "$BIN_NAME" 2>/dev/null || true - sleep 1 - $SUDO "$killall_cmd" -9 "$BIN_NAME" 2>/dev/null || true + $SUDO pkill -9 -u telemt "$BIN_NAME" 2>/dev/null || true + else + if command -v pgrep >/dev/null 2>&1; then + pids="$(pgrep -u telemt 2>/dev/null || true)" + else + pids="$(ps -u telemt -o pid= 2>/dev/null || true)" + fi + + if [ -n "$pids" ]; then + for pid in $pids; do + case "$pid" in ''|*[!0-9]*) continue ;; *) $SUDO kill "$pid" 2>/dev/null || true ;; esac + done + sleep 1 + for pid in $pids; do + case "$pid" in ''|*[!0-9]*) continue ;; *) $SUDO kill -9 "$pid" 2>/dev/null || true ;; esac + done + fi fi } uninstall() { - purge_data=0 - [ "$ACTION" = "purge" ] && purge_data=1 + say "Starting uninstallation of $BIN_NAME..." - say "Uninstalling $BIN_NAME..." + say ">>> Stage 1: Stopping services" stop_service - if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then + say ">>> Stage 2: Removing service configuration" + svc="$(get_svc_mgr)" + if [ "$svc" = "systemd" ]; then $SUDO systemctl disable "$SERVICE_NAME" 2>/dev/null || true $SUDO rm -f "/etc/systemd/system/${SERVICE_NAME}.service" - $SUDO systemctl daemon-reload || true - elif command -v rc-update >/dev/null 2>&1; then + $SUDO systemctl daemon-reload 2>/dev/null || true + elif [ "$svc" = "openrc" ]; then $SUDO rc-update del "$SERVICE_NAME" 2>/dev/null || true $SUDO rm -f "/etc/init.d/${SERVICE_NAME}" fi + say ">>> Stage 3: Terminating user processes" kill_user_procs + say ">>> Stage 4: Removing binary" $SUDO rm -f "${INSTALL_DIR}/${BIN_NAME}" - $SUDO userdel telemt 2>/dev/null || $SUDO deluser telemt 2>/dev/null || true - $SUDO groupdel telemt 2>/dev/null || $SUDO delgroup telemt 2>/dev/null || true - - if [ "$purge_data" -eq 1 ]; then - say "Purging configuration and data..." + if [ "$ACTION" = "purge" ]; then + say ">>> Stage 5: Purging configuration, data, and user" $SUDO rm -rf "$CONFIG_DIR" "$WORK_DIR" + $SUDO rm -f "$CONFIG_FILE" + if [ "$CONFIG_PARENT_DIR" != "$CONFIG_DIR" ] && [ "$CONFIG_PARENT_DIR" != "." ] && [ "$CONFIG_PARENT_DIR" != "/" ]; then + $SUDO rmdir "$CONFIG_PARENT_DIR" 2>/dev/null || true + fi + $SUDO userdel telemt 2>/dev/null || $SUDO deluser telemt 2>/dev/null || true + $SUDO groupdel telemt 2>/dev/null || $SUDO delgroup telemt 2>/dev/null || true else - say "Note: Configuration in $CONFIG_DIR was kept. Run with '--purge' to remove it." + say "Note: Configuration and user kept. Run with 'purge' to remove completely." fi - - say "Uninstallation complete." + + printf '\n====================================================================\n' + printf ' UNINSTALLATION COMPLETE\n' + printf '====================================================================\n\n' exit 0 } -# ============================================================================ -# Main Entry Point -# ============================================================================ - case "$ACTION" in - help) - show_help - ;; - uninstall|purge) - verify_common - uninstall - ;; + help) show_help ;; + uninstall|purge) verify_common; uninstall ;; install) - say "Starting installation..." - verify_common - verify_install_deps + say "Starting installation of $BIN_NAME (Version: $TARGET_VERSION)" - ARCH="$(detect_arch)" - LIBC="$(detect_libc)" - say "Detected system: $ARCH-linux-$LIBC" + say ">>> Stage 1: Verifying environment and dependencies" + verify_common; verify_install_deps + if [ "$TARGET_VERSION" != "latest" ]; then + TARGET_VERSION="${TARGET_VERSION#v}" + fi + + ARCH="$(detect_arch)"; LIBC="$(detect_libc)" FILE_NAME="${BIN_NAME}-${ARCH}-linux-${LIBC}.tar.gz" - FILE_NAME="$(printf '%s' "$FILE_NAME" | tr -d ' \t\n\r')" - + if [ "$TARGET_VERSION" = "latest" ]; then DL_URL="https://github.com/${REPO}/releases/latest/download/${FILE_NAME}" - else + else DL_URL="https://github.com/${REPO}/releases/download/${TARGET_VERSION}/${FILE_NAME}" fi - TEMP_DIR="$(mktemp -d)" || die "Failed to create temp directory" + say ">>> Stage 2: Downloading archive" + TEMP_DIR="$(mktemp -d)" || die "Temp directory creation failed" if [ -z "$TEMP_DIR" ] || [ ! -d "$TEMP_DIR" ]; then - die "Temp directory creation failed" + die "Temp directory is invalid or was not created" fi - say "Downloading from $DL_URL..." - fetch_file "$DL_URL" "${TEMP_DIR}/archive.tar.gz" || die "Download failed (check version or network)" + fetch_file "$DL_URL" "${TEMP_DIR}/${FILE_NAME}" || die "Download failed" - gzip -dc "${TEMP_DIR}/archive.tar.gz" | tar -xf - -C "$TEMP_DIR" || die "Extraction failed" + say ">>> Stage 3: Extracting archive" + if ! gzip -dc "${TEMP_DIR}/${FILE_NAME}" | tar -xf - -C "$TEMP_DIR" 2>/dev/null; then + die "Extraction failed (downloaded archive might be invalid or 404)." + fi - EXTRACTED_BIN="$(find "$TEMP_DIR" -type f -name "$BIN_NAME" -print 2>/dev/null | head -n 1)" - [ -z "$EXTRACTED_BIN" ] && die "Binary '$BIN_NAME' not found in archive" + EXTRACTED_BIN="$(find "$TEMP_DIR" -type f -name "$BIN_NAME" -print 2>/dev/null | head -n 1 || true)" + [ -n "$EXTRACTED_BIN" ] || die "Binary '$BIN_NAME' not found in archive" - ensure_user_group - setup_dirs - stop_service - - say "Installing binary..." + say ">>> Stage 4: Setting up environment (User, Group, Directories)" + ensure_user_group; setup_dirs; stop_service + + say ">>> Stage 5: Installing binary" install_binary "$EXTRACTED_BIN" "${INSTALL_DIR}/${BIN_NAME}" - + + say ">>> Stage 6: Generating configuration" install_config + + say ">>> Stage 7: Installing and starting service" install_service - say "" - say "=============================================" - say "Installation complete!" - say "=============================================" - if command -v systemctl >/dev/null 2>&1 && [ -d /run/systemd/system ]; then - say "To check the logs, run:" - say " journalctl -u $SERVICE_NAME -f" - say "" - fi - say "To get user connection links, run:" - if command -v jq >/dev/null 2>&1; then - say " curl -s http://127.0.0.1:9091/v1/users | jq -r '.data[] | \"User: \\(.username)\\n\\(.links.tls[0] // empty)\"'" + if [ "${SERVICE_START_FAILED:-0}" -eq 1 ]; then + printf '\n====================================================================\n' + printf ' INSTALLATION COMPLETED WITH WARNINGS\n' + printf '====================================================================\n\n' + printf 'The service was installed but failed to start automatically.\n' + printf 'Please check the logs to determine the issue.\n\n' else - say " curl -s http://127.0.0.1:9091/v1/users" - say " (Note: Install 'jq' package to see the links nicely formatted)" + printf '\n====================================================================\n' + printf ' INSTALLATION SUCCESS\n' + printf '====================================================================\n\n' fi + + svc="$(get_svc_mgr)" + if [ "$svc" = "systemd" ]; then + printf 'To check the status of your proxy service, run:\n' + printf ' systemctl status %s\n\n' "$SERVICE_NAME" + elif [ "$svc" = "openrc" ]; then + printf 'To check the status of your proxy service, run:\n' + printf ' rc-service %s status\n\n' "$SERVICE_NAME" + fi + + printf 'To get your user connection links (for Telegram), run:\n' + if command -v jq >/dev/null 2>&1; then + printf ' curl -s http://127.0.0.1:9091/v1/users | jq -r '\''.data[] | "User: \\(.username)\\n\\(.links.tls[0] // empty)\\n"'\''\n' + else + printf ' curl -s http://127.0.0.1:9091/v1/users\n' + printf ' (Tip: Install '\''jq'\'' for a much cleaner output)\n' + fi + + printf '\n====================================================================\n' ;; esac From 8d1faece60a40bc2c2adbebd96cf5975d4bf8037 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Mar 2026 17:45:17 +0300 Subject: [PATCH 4/5] Instadrain + Hard-remove for long draining-state Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/api/model.rs | 1 + src/api/runtime_stats.rs | 1 + src/cli.rs | 1 + src/config/defaults.rs | 4 + src/config/hot_reload.rs | 9 ++ src/config/types.rs | 5 + src/maestro/me_startup.rs | 15 +++ src/transport/middle_proxy/config_updater.rs | 2 + src/transport/middle_proxy/health.rs | 92 ++++++++++++++----- .../middle_proxy/health_adversarial_tests.rs | 5 +- .../middle_proxy/health_integration_tests.rs | 1 + .../middle_proxy/health_regression_tests.rs | 66 ++++++++++--- src/transport/middle_proxy/mod.rs | 2 +- src/transport/middle_proxy/pool.rs | 5 + src/transport/middle_proxy/pool_status.rs | 2 + 15 files changed, 170 insertions(+), 41 deletions(-) diff --git a/src/api/model.rs b/src/api/model.rs index ac4e297..e98de8b 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -364,6 +364,7 @@ pub(super) struct MinimalMeRuntimeData { pub(super) me_reconnect_backoff_cap_ms: u64, pub(super) me_reconnect_fast_retry_count: u32, pub(super) me_pool_drain_ttl_secs: u64, + pub(super) me_instadrain: bool, pub(super) me_pool_drain_soft_evict_enabled: bool, pub(super) me_pool_drain_soft_evict_grace_secs: u64, pub(super) me_pool_drain_soft_evict_per_writer: u8, diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index f8948d1..cdeacc0 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -431,6 +431,7 @@ async fn get_minimal_payload_cached( me_reconnect_backoff_cap_ms: runtime.me_reconnect_backoff_cap_ms, me_reconnect_fast_retry_count: runtime.me_reconnect_fast_retry_count, me_pool_drain_ttl_secs: runtime.me_pool_drain_ttl_secs, + me_instadrain: runtime.me_instadrain, me_pool_drain_soft_evict_enabled: runtime.me_pool_drain_soft_evict_enabled, me_pool_drain_soft_evict_grace_secs: runtime.me_pool_drain_soft_evict_grace_secs, me_pool_drain_soft_evict_per_writer: runtime.me_pool_drain_soft_evict_per_writer, diff --git a/src/cli.rs b/src/cli.rs index a1182a7..5fbd7d5 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -198,6 +198,7 @@ desync_all_full = false update_every = 43200 hardswap = false me_pool_drain_ttl_secs = 90 +me_instadrain = false me_pool_min_fresh_ratio = 0.8 me_reinit_drain_timeout_secs = 120 diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 54a53b3..6d74c93 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -613,6 +613,10 @@ pub(crate) fn default_me_pool_drain_ttl_secs() -> u64 { 90 } +pub(crate) fn default_me_instadrain() -> bool { + false +} + pub(crate) fn default_me_pool_drain_threshold() -> u64 { 128 } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 7b94999..1315f9c 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -56,6 +56,7 @@ pub struct HotFields { pub me_reinit_coalesce_window_ms: u64, pub hardswap: bool, pub me_pool_drain_ttl_secs: u64, + pub me_instadrain: bool, pub me_pool_drain_threshold: u64, pub me_pool_drain_soft_evict_enabled: bool, pub me_pool_drain_soft_evict_grace_secs: u64, @@ -143,6 +144,7 @@ impl HotFields { me_reinit_coalesce_window_ms: cfg.general.me_reinit_coalesce_window_ms, hardswap: cfg.general.hardswap, me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs, + me_instadrain: cfg.general.me_instadrain, me_pool_drain_threshold: cfg.general.me_pool_drain_threshold, me_pool_drain_soft_evict_enabled: cfg.general.me_pool_drain_soft_evict_enabled, me_pool_drain_soft_evict_grace_secs: cfg.general.me_pool_drain_soft_evict_grace_secs, @@ -477,6 +479,7 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig { cfg.general.me_reinit_coalesce_window_ms = new.general.me_reinit_coalesce_window_ms; cfg.general.hardswap = new.general.hardswap; cfg.general.me_pool_drain_ttl_secs = new.general.me_pool_drain_ttl_secs; + cfg.general.me_instadrain = new.general.me_instadrain; cfg.general.me_pool_drain_threshold = new.general.me_pool_drain_threshold; cfg.general.me_pool_drain_soft_evict_enabled = new.general.me_pool_drain_soft_evict_enabled; cfg.general.me_pool_drain_soft_evict_grace_secs = @@ -869,6 +872,12 @@ fn log_changes( old_hot.me_pool_drain_ttl_secs, new_hot.me_pool_drain_ttl_secs, ); } + if old_hot.me_instadrain != new_hot.me_instadrain { + info!( + "config reload: me_instadrain: {} → {}", + old_hot.me_instadrain, new_hot.me_instadrain, + ); + } if old_hot.me_pool_drain_threshold != new_hot.me_pool_drain_threshold { info!( diff --git a/src/config/types.rs b/src/config/types.rs index 047f3c2..ecd051d 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -812,6 +812,10 @@ pub struct GeneralConfig { #[serde(default = "default_me_pool_drain_ttl_secs")] pub me_pool_drain_ttl_secs: u64, + /// Force-remove any draining writer on the next cleanup tick, regardless of age/deadline. + #[serde(default = "default_me_instadrain")] + pub me_instadrain: bool, + /// Maximum allowed number of draining ME writers before oldest ones are force-closed in batches. /// Set to 0 to disable threshold-based draining cleanup and keep timeout-only behavior. #[serde(default = "default_me_pool_drain_threshold")] @@ -1020,6 +1024,7 @@ impl Default for GeneralConfig { me_secret_atomic_snapshot: default_me_secret_atomic_snapshot(), proxy_secret_len_max: default_proxy_secret_len_max(), me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(), + me_instadrain: default_me_instadrain(), me_pool_drain_threshold: default_me_pool_drain_threshold(), me_pool_drain_soft_evict_enabled: default_me_pool_drain_soft_evict_enabled(), me_pool_drain_soft_evict_grace_secs: default_me_pool_drain_soft_evict_grace_secs(), diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index 827b00c..0b1310a 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -237,6 +237,7 @@ pub(crate) async fn initialize_me_pool( config.general.me_adaptive_floor_max_warm_writers_global, config.general.hardswap, config.general.me_pool_drain_ttl_secs, + config.general.me_instadrain, config.general.me_pool_drain_threshold, config.general.me_pool_drain_soft_evict_enabled, config.general.me_pool_drain_soft_evict_grace_secs, @@ -342,6 +343,13 @@ pub(crate) async fn initialize_me_pool( ) .await; }); + let pool_drain_enforcer = pool_bg.clone(); + tokio::spawn(async move { + crate::transport::middle_proxy::me_drain_timeout_enforcer( + pool_drain_enforcer, + ) + .await; + }); break; } Err(e) => { @@ -409,6 +417,13 @@ pub(crate) async fn initialize_me_pool( ) .await; }); + let pool_drain_enforcer = pool.clone(); + tokio::spawn(async move { + crate::transport::middle_proxy::me_drain_timeout_enforcer( + pool_drain_enforcer, + ) + .await; + }); break Some(pool); } diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 43a3569..26ec497 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -298,6 +298,7 @@ async fn run_update_cycle( pool.update_runtime_reinit_policy( cfg.general.hardswap, cfg.general.me_pool_drain_ttl_secs, + cfg.general.me_instadrain, cfg.general.me_pool_drain_threshold, cfg.general.me_pool_drain_soft_evict_enabled, cfg.general.me_pool_drain_soft_evict_grace_secs, @@ -530,6 +531,7 @@ pub async fn me_config_updater( pool.update_runtime_reinit_policy( cfg.general.hardswap, cfg.general.me_pool_drain_ttl_secs, + cfg.general.me_instadrain, cfg.general.me_pool_drain_threshold, cfg.general.me_pool_drain_soft_evict_enabled, cfg.general.me_pool_drain_soft_evict_grace_secs, diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 21619c7..8b62cff 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -12,6 +12,7 @@ use crate::crypto::SecureRandom; use crate::network::IpFamily; use super::MePool; +use super::pool::MeWriter; const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff #[allow(dead_code)] @@ -31,10 +32,7 @@ const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256; const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MIN: usize = 8; const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MAX: usize = 256; const HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS: u64 = 1; -#[cfg(not(test))] -const HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE: bool = true; -#[cfg(test)] -const HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE: bool = false; +const HEALTH_DRAIN_TIMEOUT_ENFORCER_INTERVAL_SECS: u64 = 1; #[derive(Debug, Clone)] struct DcFloorPlanEntry { @@ -131,6 +129,55 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c } } +pub async fn me_drain_timeout_enforcer(pool: Arc) { + let mut drain_warn_next_allowed: HashMap = HashMap::new(); + let mut drain_soft_evict_next_allowed: HashMap = HashMap::new(); + loop { + tokio::time::sleep(Duration::from_secs( + HEALTH_DRAIN_TIMEOUT_ENFORCER_INTERVAL_SECS, + )) + .await; + reap_draining_writers( + &pool, + &mut drain_warn_next_allowed, + &mut drain_soft_evict_next_allowed, + ) + .await; + } +} + +fn draining_writer_timeout_expired( + pool: &MePool, + writer: &MeWriter, + now_epoch_secs: u64, + drain_ttl_secs: u64, +) -> bool { + if pool + .me_instadrain + .load(std::sync::atomic::Ordering::Relaxed) + { + return true; + } + + let deadline_epoch_secs = writer + .drain_deadline_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed); + if deadline_epoch_secs != 0 { + return now_epoch_secs >= deadline_epoch_secs; + } + + if drain_ttl_secs == 0 { + return false; + } + let drain_started_at_epoch_secs = writer + .draining_started_at_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed); + if drain_started_at_epoch_secs == 0 { + return false; + } + now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > drain_ttl_secs +} + pub(super) async fn reap_draining_writers( pool: &Arc, warn_next_allowed: &mut HashMap, @@ -146,11 +193,16 @@ pub(super) async fn reap_draining_writers( let activity = pool.registry.writer_activity_snapshot().await; let mut draining_writers = Vec::new(); let mut empty_writer_ids = Vec::::new(); + let mut timeout_expired_writer_ids = Vec::::new(); let mut force_close_writer_ids = Vec::::new(); for writer in writers { if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) { continue; } + if draining_writer_timeout_expired(pool, &writer, now_epoch_secs, drain_ttl_secs) { + timeout_expired_writer_ids.push(writer.id); + continue; + } if activity .bound_clients_by_writer .get(&writer.id) @@ -163,11 +215,6 @@ pub(super) async fn reap_draining_writers( } draining_writers.push(writer); } - if HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE { - for writer in draining_writers.drain(..) { - force_close_writer_ids.push(writer.id); - } - } if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize { draining_writers.sort_by(|left, right| { @@ -221,14 +268,6 @@ pub(super) async fn reap_draining_writers( "ME draining writer remains non-empty past drain TTL" ); } - let deadline_epoch_secs = writer - .drain_deadline_epoch_secs - .load(std::sync::atomic::Ordering::Relaxed); - if deadline_epoch_secs != 0 && now_epoch_secs >= deadline_epoch_secs { - warn!(writer_id = writer.id, "Drain timeout, force-closing"); - force_close_writer_ids.push(writer.id); - active_draining_writer_ids.remove(&writer.id); - } } warn_next_allowed.retain(|writer_id, _| active_draining_writer_ids.contains(writer_id)); @@ -313,15 +352,21 @@ pub(super) async fn reap_draining_writers( } } + let mut closed_writer_ids = HashSet::::new(); + for writer_id in timeout_expired_writer_ids { + if !closed_writer_ids.insert(writer_id) { + continue; + } + pool.stats.increment_pool_force_close_total(); + pool.remove_writer_and_close_clients(writer_id).await; + pool.stats + .increment_me_draining_writers_reap_progress_total(); + } + let requested_force_close = force_close_writer_ids.len(); let requested_empty_close = empty_writer_ids.len(); let requested_close_total = requested_force_close.saturating_add(requested_empty_close); - let close_budget = if HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE { - requested_close_total - } else { - health_drain_close_budget() - }; - let mut closed_writer_ids = HashSet::::new(); + let close_budget = health_drain_close_budget(); let mut closed_total = 0usize; for writer_id in force_close_writer_ids { if closed_total >= close_budget { @@ -1581,6 +1626,7 @@ mod tests { general.me_adaptive_floor_max_warm_writers_global, general.hardswap, general.me_pool_drain_ttl_secs, + general.me_instadrain, general.me_pool_drain_threshold, general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_grace_secs, diff --git a/src/transport/middle_proxy/health_adversarial_tests.rs b/src/transport/middle_proxy/health_adversarial_tests.rs index 3f182e4..ae517b3 100644 --- a/src/transport/middle_proxy/health_adversarial_tests.rs +++ b/src/transport/middle_proxy/health_adversarial_tests.rs @@ -81,6 +81,7 @@ async fn make_pool( general.me_adaptive_floor_max_warm_writers_global, general.hardswap, general.me_pool_drain_ttl_secs, + general.me_instadrain, general.me_pool_drain_threshold, general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_grace_secs, @@ -213,7 +214,7 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle insert_draining_writer( &pool, writer_id, - now_epoch_secs.saturating_sub(600).saturating_add(writer_id), + now_epoch_secs.saturating_sub(20), 1, 0, ) @@ -230,7 +231,7 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle } assert_eq!(writer_count(&pool).await, threshold as usize); - assert_eq!(sorted_writer_ids(&pool).await, vec![58, 59, 60]); + assert_eq!(sorted_writer_ids(&pool).await, vec![1, 2, 3]); } #[tokio::test] diff --git a/src/transport/middle_proxy/health_integration_tests.rs b/src/transport/middle_proxy/health_integration_tests.rs index 7f99d2a..fbbffce 100644 --- a/src/transport/middle_proxy/health_integration_tests.rs +++ b/src/transport/middle_proxy/health_integration_tests.rs @@ -80,6 +80,7 @@ async fn make_pool( general.me_adaptive_floor_max_warm_writers_global, general.hardswap, general.me_pool_drain_ttl_secs, + general.me_instadrain, general.me_pool_drain_threshold, general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_grace_secs, diff --git a/src/transport/middle_proxy/health_regression_tests.rs b/src/transport/middle_proxy/health_regression_tests.rs index 565ac74..bcdaf2e 100644 --- a/src/transport/middle_proxy/health_regression_tests.rs +++ b/src/transport/middle_proxy/health_regression_tests.rs @@ -74,6 +74,7 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc { general.me_adaptive_floor_max_warm_writers_global, general.hardswap, general.me_pool_drain_ttl_secs, + general.me_instadrain, general.me_pool_drain_threshold, general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_grace_secs, @@ -180,8 +181,14 @@ async fn current_writer_ids(pool: &Arc) -> Vec { async fn reap_draining_writers_drops_warn_state_for_removed_writer() { let pool = make_pool(128).await; let now_epoch_secs = MePool::now_epoch_secs(); - let conn_ids = - insert_draining_writer(&pool, 7, now_epoch_secs.saturating_sub(180), 1, 0).await; + let conn_ids = insert_draining_writer( + &pool, + 7, + now_epoch_secs.saturating_sub(180), + 1, + now_epoch_secs.saturating_add(3_600), + ) + .await; let mut warn_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new(); @@ -331,17 +338,17 @@ async fn reap_draining_writers_deadline_force_close_applies_under_threshold() { #[tokio::test] async fn reap_draining_writers_limits_closes_per_health_tick() { - let pool = make_pool(128).await; + let pool = make_pool(1).await; let now_epoch_secs = MePool::now_epoch_secs(); let close_budget = health_drain_close_budget(); - let writer_total = close_budget.saturating_add(19); + let writer_total = close_budget.saturating_add(20); for writer_id in 1..=writer_total as u64 { insert_draining_writer( &pool, writer_id, now_epoch_secs.saturating_sub(20), 1, - now_epoch_secs.saturating_sub(1), + 0, ) .await; } @@ -364,8 +371,8 @@ async fn reap_draining_writers_backlog_drains_across_ticks() { &pool, writer_id, now_epoch_secs.saturating_sub(20), - 1, - now_epoch_secs.saturating_sub(1), + 0, + 0, ) .await; } @@ -393,7 +400,7 @@ async fn reap_draining_writers_threshold_backlog_converges_to_threshold() { insert_draining_writer( &pool, writer_id, - now_epoch_secs.saturating_sub(200).saturating_add(writer_id), + now_epoch_secs.saturating_sub(20), 1, 0, ) @@ -429,27 +436,27 @@ async fn reap_draining_writers_threshold_zero_preserves_non_expired_non_empty_wr #[tokio::test] async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() { - let pool = make_pool(128).await; + let pool = make_pool(1).await; let now_epoch_secs = MePool::now_epoch_secs(); let close_budget = health_drain_close_budget(); - for writer_id in 1..=close_budget as u64 { + for writer_id in 1..=close_budget.saturating_add(1) as u64 { insert_draining_writer( &pool, writer_id, now_epoch_secs.saturating_sub(20), 1, - now_epoch_secs.saturating_sub(1), + 0, ) .await; } - let empty_writer_id = close_budget as u64 + 1; + let empty_writer_id = close_budget.saturating_add(2) as u64; insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await; let mut warn_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new(); reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; - assert_eq!(current_writer_ids(&pool).await, vec![empty_writer_id]); + assert_eq!(current_writer_ids(&pool).await, vec![1, empty_writer_id]); } #[tokio::test] @@ -571,7 +578,14 @@ async fn reap_draining_writers_soft_evicts_stuck_writer_with_per_writer_cap() { .store(1, Ordering::Relaxed); let now_epoch_secs = MePool::now_epoch_secs(); - insert_draining_writer(&pool, 77, now_epoch_secs.saturating_sub(240), 3, 0).await; + insert_draining_writer( + &pool, + 77, + now_epoch_secs.saturating_sub(240), + 3, + now_epoch_secs.saturating_add(3_600), + ) + .await; let mut warn_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new(); @@ -595,7 +609,14 @@ async fn reap_draining_writers_soft_evict_respects_cooldown_per_writer() { .store(60_000, Ordering::Relaxed); let now_epoch_secs = MePool::now_epoch_secs(); - insert_draining_writer(&pool, 88, now_epoch_secs.saturating_sub(240), 3, 0).await; + insert_draining_writer( + &pool, + 88, + now_epoch_secs.saturating_sub(240), + 3, + now_epoch_secs.saturating_add(3_600), + ) + .await; let mut warn_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new(); @@ -608,6 +629,21 @@ async fn reap_draining_writers_soft_evict_respects_cooldown_per_writer() { assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1); } +#[tokio::test] +async fn reap_draining_writers_instadrain_removes_non_expired_writers_immediately() { + let pool = make_pool(0).await; + pool.me_instadrain.store(true, Ordering::Relaxed); + let now_epoch_secs = MePool::now_epoch_secs(); + insert_draining_writer(&pool, 101, now_epoch_secs.saturating_sub(5), 1, 0).await; + insert_draining_writer(&pool, 102, now_epoch_secs.saturating_sub(4), 1, 0).await; + let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + + assert!(current_writer_ids(&pool).await.is_empty()); +} + #[test] fn general_config_default_drain_threshold_remains_enabled() { assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128); diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index 590c996..26ded29 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -30,7 +30,7 @@ mod health_adversarial_tests; use bytes::Bytes; -pub use health::me_health_monitor; +pub use health::{me_drain_timeout_enforcer, me_health_monitor}; #[allow(unused_imports)] pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily}; pub use pool::MePool; diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index d09f07c..441d41d 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -171,6 +171,7 @@ pub struct MePool { pub(super) endpoint_quarantine: Arc>>, pub(super) kdf_material_fingerprint: Arc>>, pub(super) me_pool_drain_ttl_secs: AtomicU64, + pub(super) me_instadrain: AtomicBool, pub(super) me_pool_drain_threshold: AtomicU64, pub(super) me_pool_drain_soft_evict_enabled: AtomicBool, pub(super) me_pool_drain_soft_evict_grace_secs: AtomicU64, @@ -279,6 +280,7 @@ impl MePool { me_adaptive_floor_max_warm_writers_global: u32, hardswap: bool, me_pool_drain_ttl_secs: u64, + me_instadrain: bool, me_pool_drain_threshold: u64, me_pool_drain_soft_evict_enabled: bool, me_pool_drain_soft_evict_grace_secs: u64, @@ -462,6 +464,7 @@ impl MePool { endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())), kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())), me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs), + me_instadrain: AtomicBool::new(me_instadrain), me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold), me_pool_drain_soft_evict_enabled: AtomicBool::new(me_pool_drain_soft_evict_enabled), me_pool_drain_soft_evict_grace_secs: AtomicU64::new(me_pool_drain_soft_evict_grace_secs), @@ -524,6 +527,7 @@ impl MePool { &self, hardswap: bool, drain_ttl_secs: u64, + instadrain: bool, pool_drain_threshold: u64, pool_drain_soft_evict_enabled: bool, pool_drain_soft_evict_grace_secs: u64, @@ -568,6 +572,7 @@ impl MePool { self.hardswap.store(hardswap, Ordering::Relaxed); self.me_pool_drain_ttl_secs .store(drain_ttl_secs, Ordering::Relaxed); + self.me_instadrain.store(instadrain, Ordering::Relaxed); self.me_pool_drain_threshold .store(pool_drain_threshold, Ordering::Relaxed); self.me_pool_drain_soft_evict_enabled diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index 214ee49..5fe45cb 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -126,6 +126,7 @@ pub(crate) struct MeApiRuntimeSnapshot { pub me_reconnect_backoff_cap_ms: u64, pub me_reconnect_fast_retry_count: u32, pub me_pool_drain_ttl_secs: u64, + pub me_instadrain: bool, pub me_pool_drain_soft_evict_enabled: bool, pub me_pool_drain_soft_evict_grace_secs: u64, pub me_pool_drain_soft_evict_per_writer: u8, @@ -583,6 +584,7 @@ impl MePool { me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64, me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count, me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed), + me_instadrain: self.me_instadrain.load(Ordering::Relaxed), me_pool_drain_soft_evict_enabled: self .me_pool_drain_soft_evict_enabled .load(Ordering::Relaxed), From ad8ada33c9c76558bcff2756a09e4cccacd5565f Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Mar 2026 18:24:01 +0300 Subject: [PATCH 5/5] Update Cargo.toml --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index d4ef990..2a9cbe3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.3.24" +version = "3.3.25" edition = "2024" [dependencies]