mirror of
https://github.com/telemt/telemt.git
synced 2026-04-15 01:24:09 +03:00
Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e7bdc80956 | ||
|
|
d641137537 | ||
|
|
4fd22b3219 | ||
|
|
fca0e3f619 | ||
|
|
9401c46727 | ||
|
|
6b3697ee87 | ||
|
|
c08160600e | ||
|
|
cd5c60ce1e | ||
|
|
ae1c97e27a | ||
|
|
cfee7de66b | ||
|
|
c942c492ad | ||
|
|
0e4be43b2b | ||
|
|
7eb2b60855 | ||
|
|
373ae3281e | ||
|
|
178630e3bf | ||
|
|
67f307cd43 | ||
|
|
ca2eaa9ead | ||
|
|
3c78daea0c | ||
|
|
d2baa8e721 | ||
|
|
a0cf4b4713 | ||
|
|
1bd249b0a9 | ||
|
|
2f47ec5797 | ||
|
|
80f3661b8e |
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "telemt"
|
||||
version = "3.3.9"
|
||||
version = "3.3.11"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
|
||||
# === General Settings ===
|
||||
[general]
|
||||
use_middle_proxy = false
|
||||
use_middle_proxy = true
|
||||
# Global ad_tag fallback when user has no per-user tag in [access.user_ad_tags]
|
||||
# ad_tag = "00000000000000000000000000000000"
|
||||
# Per-user ad_tag in [access.user_ad_tags] (32 hex from @MTProxybot)
|
||||
|
||||
138
install.sh
138
install.sh
@@ -1,73 +1,93 @@
|
||||
sudo bash -c '
|
||||
set -e
|
||||
#!/bin/sh
|
||||
set -eu
|
||||
|
||||
# --- Проверка на существующую установку ---
|
||||
if systemctl list-unit-files | grep -q telemt.service; then
|
||||
# --- РЕЖИМ ОБНОВЛЕНИЯ ---
|
||||
echo "--- Обнаружена существующая установка Telemt. Запускаю обновление... ---"
|
||||
REPO="${REPO:-telemt/telemt}"
|
||||
BIN_NAME="${BIN_NAME:-telemt}"
|
||||
VERSION="${1:-${VERSION:-latest}}"
|
||||
INSTALL_DIR="${INSTALL_DIR:-/usr/local/bin}"
|
||||
|
||||
echo "[*] Остановка службы telemt..."
|
||||
systemctl stop telemt || true # Игнорируем ошибку, если служба уже остановлена
|
||||
say() {
|
||||
printf '%s\n' "$*"
|
||||
}
|
||||
|
||||
echo "[1/2] Скачивание последней версии Telemt..."
|
||||
wget -qO- "https://github.com/telemt/telemt/releases/latest/download/telemt-$(uname -m)-linux-$(ldd --version 2>&1 | grep -iq musl && echo musl || echo gnu).tar.gz" | tar -xz
|
||||
die() {
|
||||
printf 'Error: %s\n' "$*" >&2
|
||||
exit 1
|
||||
}
|
||||
|
||||
echo "[1/2] Замена исполняемого файла в /usr/local/bin..."
|
||||
mv telemt /usr/local/bin/telemt
|
||||
chmod +x /usr/local/bin/telemt
|
||||
need_cmd() {
|
||||
command -v "$1" >/dev/null 2>&1 || die "required command not found: $1"
|
||||
}
|
||||
|
||||
echo "[2/2] Запуск службы..."
|
||||
systemctl start telemt
|
||||
detect_arch() {
|
||||
arch="$(uname -m)"
|
||||
case "$arch" in
|
||||
x86_64|amd64) printf 'x86_64\n' ;;
|
||||
aarch64|arm64) printf 'aarch64\n' ;;
|
||||
*) die "unsupported architecture: $arch" ;;
|
||||
esac
|
||||
}
|
||||
|
||||
echo "--- Обновление Telemt успешно завершено! ---"
|
||||
echo
|
||||
echo "Для проверки статуса службы выполните:"
|
||||
echo " systemctl status telemt"
|
||||
detect_libc() {
|
||||
case "$(ldd --version 2>&1 || true)" in
|
||||
*musl*) printf 'musl\n' ;;
|
||||
*) printf 'gnu\n' ;;
|
||||
esac
|
||||
}
|
||||
|
||||
else
|
||||
# --- РЕЖИМ НОВОЙ УСТАНОВКИ ---
|
||||
echo "--- Начало автоматической установки Telemt ---"
|
||||
fetch_to_stdout() {
|
||||
url="$1"
|
||||
if command -v curl >/dev/null 2>&1; then
|
||||
curl -fsSL "$url"
|
||||
elif command -v wget >/dev/null 2>&1; then
|
||||
wget -qO- "$url"
|
||||
else
|
||||
die "neither curl nor wget is installed"
|
||||
fi
|
||||
}
|
||||
|
||||
# Шаг 1: Скачивание и установка бинарного файла
|
||||
echo "[1/5] Скачивание последней версии Telemt..."
|
||||
wget -qO- "https://github.com/telemt/telemt/releases/latest/download/telemt-$(uname -m)-linux-$(ldd --version 2>&1 | grep -iq musl && echo musl || echo gnu).tar.gz" | tar -xz
|
||||
install_binary() {
|
||||
src="$1"
|
||||
dst="$2"
|
||||
|
||||
echo "[1/5] Перемещение исполняемого файла в /usr/local/bin и установка прав..."
|
||||
mv telemt /usr/local/bin/telemt
|
||||
chmod +x /usr/local/bin/telemt
|
||||
if [ -w "$INSTALL_DIR" ] || { [ ! -e "$INSTALL_DIR" ] && [ -w "$(dirname "$INSTALL_DIR")" ]; }; then
|
||||
mkdir -p "$INSTALL_DIR"
|
||||
install -m 0755 "$src" "$dst"
|
||||
elif command -v sudo >/dev/null 2>&1; then
|
||||
sudo mkdir -p "$INSTALL_DIR"
|
||||
sudo install -m 0755 "$src" "$dst"
|
||||
else
|
||||
die "cannot write to $INSTALL_DIR and sudo is not available"
|
||||
fi
|
||||
}
|
||||
|
||||
# Шаг 2: Генерация секрета
|
||||
echo "[2/5] Генерация секретного ключа..."
|
||||
SECRET=$(openssl rand -hex 16)
|
||||
need_cmd uname
|
||||
need_cmd tar
|
||||
need_cmd mktemp
|
||||
need_cmd grep
|
||||
need_cmd install
|
||||
|
||||
# Шаг 3: Создание файла конфигурации
|
||||
echo "[3/5] Создание файла конфигурации /etc/telemt.toml..."
|
||||
printf "# === General Settings ===\n[general]\n[general.modes]\nclassic = false\nsecure = false\ntls = true\n\n# === Anti-Censorship & Masking ===\n[censorship]\n# !!! ВАЖНО: Замените на ваш домен или домен, который вы хотите использовать для маскировки !!!\ntls_domain = \"petrovich.ru\"\n\n[access.users]\nhello = \"%s\"\n" "$SECRET" > /etc/telemt.toml
|
||||
ARCH="$(detect_arch)"
|
||||
LIBC="$(detect_libc)"
|
||||
|
||||
# Шаг 4: Создание службы Systemd
|
||||
echo "[4/5] Создание службы systemd..."
|
||||
printf "[Unit]\nDescription=Telemt Proxy\nAfter=network.target\n\n[Service]\nType=simple\nExecStart=/usr/local/bin/telemt /etc/telemt.toml\nRestart=on-failure\nRestartSec=5\nLimitNOFILE=65536\n\n[Install]\nWantedBy=multi-user.target\n" > /etc/systemd/system/telemt.service
|
||||
case "$VERSION" in
|
||||
latest)
|
||||
URL="https://github.com/$REPO/releases/latest/download/${BIN_NAME}-${ARCH}-linux-${LIBC}.tar.gz"
|
||||
;;
|
||||
*)
|
||||
URL="https://github.com/$REPO/releases/download/${VERSION}/${BIN_NAME}-${ARCH}-linux-${LIBC}.tar.gz"
|
||||
;;
|
||||
esac
|
||||
|
||||
# Шаг 5: Запуск службы
|
||||
echo "[5/5] Перезагрузка systemd, запуск и включение службы telemt..."
|
||||
systemctl daemon-reload
|
||||
systemctl start telemt
|
||||
systemctl enable telemt
|
||||
TMPDIR="$(mktemp -d)"
|
||||
trap 'rm -rf "$TMPDIR"' EXIT INT TERM
|
||||
|
||||
echo "--- Установка и запуск Telemt успешно завершены! ---"
|
||||
echo
|
||||
echo "ВАЖНАЯ ИНФОРМАЦИЯ:"
|
||||
echo "==================="
|
||||
echo "1. Вам НЕОБХОДИМО отредактировать файл /etc/telemt.toml и заменить '\''petrovich.ru'\'' на другой домен"
|
||||
echo " с помощью команды:"
|
||||
echo " nano /etc/telemt.toml"
|
||||
echo " После редактирования файла перезапустите службу командой:"
|
||||
echo " sudo systemctl restart telemt"
|
||||
echo
|
||||
echo "2. Для проверки статуса службы выполните команду:"
|
||||
echo " systemctl status telemt"
|
||||
echo
|
||||
echo "3. Для получения ссылок на подключение выполните команду:"
|
||||
echo " journalctl -u telemt -n -g '\''links'\'' --no-pager -o cat | tac"
|
||||
fi
|
||||
'
|
||||
say "Installing $BIN_NAME ($VERSION) for $ARCH-linux-$LIBC..."
|
||||
fetch_to_stdout "$URL" | tar -xzf - -C "$TMPDIR"
|
||||
|
||||
[ -f "$TMPDIR/$BIN_NAME" ] || die "archive did not contain $BIN_NAME"
|
||||
|
||||
install_binary "$TMPDIR/$BIN_NAME" "$INSTALL_DIR/$BIN_NAME"
|
||||
|
||||
say "Installed: $INSTALL_DIR/$BIN_NAME"
|
||||
"$INSTALL_DIR/$BIN_NAME" --version 2>/dev/null || true
|
||||
|
||||
@@ -266,6 +266,7 @@ pub(super) struct MeWritersData {
|
||||
pub(super) struct DcStatus {
|
||||
pub(super) dc: i16,
|
||||
pub(super) endpoints: Vec<String>,
|
||||
pub(super) endpoint_writers: Vec<DcEndpointWriters>,
|
||||
pub(super) available_endpoints: usize,
|
||||
pub(super) available_pct: f64,
|
||||
pub(super) required_writers: usize,
|
||||
@@ -279,6 +280,12 @@ pub(super) struct DcStatus {
|
||||
pub(super) load: usize,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
pub(super) struct DcEndpointWriters {
|
||||
pub(super) endpoint: String,
|
||||
pub(super) active_writers: usize,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
pub(super) struct DcStatusData {
|
||||
pub(super) middle_proxy_enabled: bool,
|
||||
@@ -354,6 +361,8 @@ pub(super) struct MinimalMeRuntimeData {
|
||||
pub(super) me_single_endpoint_outage_backoff_max_ms: u64,
|
||||
pub(super) me_single_endpoint_shadow_rotate_every_secs: u64,
|
||||
pub(super) me_deterministic_writer_sort: bool,
|
||||
pub(super) me_writer_pick_mode: &'static str,
|
||||
pub(super) me_writer_pick_sample_size: u8,
|
||||
pub(super) me_socks_kdf_policy: &'static str,
|
||||
pub(super) quarantined_endpoints_total: usize,
|
||||
pub(super) quarantined_endpoints: Vec<MinimalQuarantineData>,
|
||||
|
||||
@@ -7,10 +7,10 @@ use crate::transport::UpstreamRouteKind;
|
||||
|
||||
use super::ApiShared;
|
||||
use super::model::{
|
||||
DcStatus, DcStatusData, MeWriterStatus, MeWritersData, MeWritersSummary, MinimalAllData,
|
||||
MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData, MinimalQuarantineData,
|
||||
UpstreamDcStatus, UpstreamStatus, UpstreamSummaryData, UpstreamsData, ZeroAllData,
|
||||
ZeroCodeCount, ZeroCoreData, ZeroDesyncData, ZeroMiddleProxyData, ZeroPoolData,
|
||||
DcEndpointWriters, DcStatus, DcStatusData, MeWriterStatus, MeWritersData, MeWritersSummary,
|
||||
MinimalAllData, MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData,
|
||||
MinimalQuarantineData, UpstreamDcStatus, UpstreamStatus, UpstreamSummaryData, UpstreamsData,
|
||||
ZeroAllData, ZeroCodeCount, ZeroCoreData, ZeroDesyncData, ZeroMiddleProxyData, ZeroPoolData,
|
||||
ZeroUpstreamData,
|
||||
};
|
||||
|
||||
@@ -346,6 +346,14 @@ async fn get_minimal_payload_cached(
|
||||
.into_iter()
|
||||
.map(|value| value.to_string())
|
||||
.collect(),
|
||||
endpoint_writers: entry
|
||||
.endpoint_writers
|
||||
.into_iter()
|
||||
.map(|coverage| DcEndpointWriters {
|
||||
endpoint: coverage.endpoint.to_string(),
|
||||
active_writers: coverage.active_writers,
|
||||
})
|
||||
.collect(),
|
||||
available_endpoints: entry.available_endpoints,
|
||||
available_pct: entry.available_pct,
|
||||
required_writers: entry.required_writers,
|
||||
@@ -422,6 +430,8 @@ async fn get_minimal_payload_cached(
|
||||
me_single_endpoint_shadow_rotate_every_secs: runtime
|
||||
.me_single_endpoint_shadow_rotate_every_secs,
|
||||
me_deterministic_writer_sort: runtime.me_deterministic_writer_sort,
|
||||
me_writer_pick_mode: runtime.me_writer_pick_mode,
|
||||
me_writer_pick_sample_size: runtime.me_writer_pick_sample_size,
|
||||
me_socks_kdf_policy: runtime.me_socks_kdf_policy,
|
||||
quarantined_endpoints_total: runtime.quarantined_endpoints.len(),
|
||||
quarantined_endpoints: runtime
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::sync::atomic::Ordering;
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::config::{MeFloorMode, ProxyConfig, UserMaxUniqueIpsMode};
|
||||
use crate::config::{MeFloorMode, MeWriterPickMode, ProxyConfig, UserMaxUniqueIpsMode};
|
||||
|
||||
use super::ApiShared;
|
||||
use super::runtime_init::build_runtime_startup_summary;
|
||||
@@ -78,6 +78,8 @@ pub(super) struct EffectiveMiddleProxyLimits {
|
||||
pub(super) reconnect_backoff_base_ms: u64,
|
||||
pub(super) reconnect_backoff_cap_ms: u64,
|
||||
pub(super) reconnect_fast_retry_count: u32,
|
||||
pub(super) writer_pick_mode: &'static str,
|
||||
pub(super) writer_pick_sample_size: u8,
|
||||
pub(super) me2dc_fallback: bool,
|
||||
}
|
||||
|
||||
@@ -237,6 +239,8 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD
|
||||
reconnect_backoff_base_ms: cfg.general.me_reconnect_backoff_base_ms,
|
||||
reconnect_backoff_cap_ms: cfg.general.me_reconnect_backoff_cap_ms,
|
||||
reconnect_fast_retry_count: cfg.general.me_reconnect_fast_retry_count,
|
||||
writer_pick_mode: me_writer_pick_mode_label(cfg.general.me_writer_pick_mode),
|
||||
writer_pick_sample_size: cfg.general.me_writer_pick_sample_size,
|
||||
me2dc_fallback: cfg.general.me2dc_fallback,
|
||||
},
|
||||
user_ip_policy: EffectiveUserIpPolicyLimits {
|
||||
@@ -274,3 +278,10 @@ fn me_floor_mode_label(mode: MeFloorMode) -> &'static str {
|
||||
MeFloorMode::Adaptive => "adaptive",
|
||||
}
|
||||
}
|
||||
|
||||
fn me_writer_pick_mode_label(mode: MeWriterPickMode) -> &'static str {
|
||||
match mode {
|
||||
MeWriterPickMode::SortedRr => "sorted_rr",
|
||||
MeWriterPickMode::P2c => "p2c",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,14 @@ const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_PER_CORE: u16 = 64;
|
||||
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_PER_CORE: u16 = 64;
|
||||
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_GLOBAL: u32 = 256;
|
||||
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL: u32 = 256;
|
||||
const DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 1024;
|
||||
const DEFAULT_ME_ROUTE_CHANNEL_CAPACITY: usize = 512;
|
||||
const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 256;
|
||||
const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3;
|
||||
const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000;
|
||||
const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000;
|
||||
const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000;
|
||||
const DEFAULT_ME_WARN_RATE_LIMIT_MS: u64 = 5000;
|
||||
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
|
||||
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
|
||||
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
|
||||
@@ -296,6 +304,38 @@ pub(crate) fn default_me_adaptive_floor_max_warm_writers_global() -> u32 {
|
||||
DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_writer_cmd_channel_capacity() -> usize {
|
||||
DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_route_channel_capacity() -> usize {
|
||||
DEFAULT_ME_ROUTE_CHANNEL_CAPACITY
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_c2me_channel_capacity() -> usize {
|
||||
DEFAULT_ME_C2ME_CHANNEL_CAPACITY
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_writer_pick_sample_size() -> u8 {
|
||||
DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_health_interval_ms_unhealthy() -> u64 {
|
||||
DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_health_interval_ms_healthy() -> u64 {
|
||||
DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_admission_poll_ms() -> u64 {
|
||||
DEFAULT_ME_ADMISSION_POLL_MS
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_warn_rate_limit_ms() -> u64 {
|
||||
DEFAULT_ME_WARN_RATE_LIMIT_MS
|
||||
}
|
||||
|
||||
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
|
||||
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
|
||||
}
|
||||
|
||||
@@ -29,7 +29,10 @@ use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher};
|
||||
use tokio::sync::{mpsc, watch};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::config::{LogLevel, MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy, MeTelemetryLevel};
|
||||
use crate::config::{
|
||||
LogLevel, MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy, MeTelemetryLevel,
|
||||
MeWriterPickMode,
|
||||
};
|
||||
use super::load::ProxyConfig;
|
||||
|
||||
// ── Hot fields ────────────────────────────────────────────────────────────────
|
||||
@@ -57,6 +60,8 @@ pub struct HotFields {
|
||||
pub me_bind_stale_ttl_secs: u64,
|
||||
pub me_secret_atomic_snapshot: bool,
|
||||
pub me_deterministic_writer_sort: bool,
|
||||
pub me_writer_pick_mode: MeWriterPickMode,
|
||||
pub me_writer_pick_sample_size: u8,
|
||||
pub me_single_endpoint_shadow_writers: u8,
|
||||
pub me_single_endpoint_outage_mode_enabled: bool,
|
||||
pub me_single_endpoint_outage_disable_quarantine: bool,
|
||||
@@ -91,6 +96,10 @@ pub struct HotFields {
|
||||
pub me_route_backpressure_base_timeout_ms: u64,
|
||||
pub me_route_backpressure_high_timeout_ms: u64,
|
||||
pub me_route_backpressure_high_watermark_pct: u8,
|
||||
pub me_health_interval_ms_unhealthy: u64,
|
||||
pub me_health_interval_ms_healthy: u64,
|
||||
pub me_admission_poll_ms: u64,
|
||||
pub me_warn_rate_limit_ms: u64,
|
||||
pub users: std::collections::HashMap<String, String>,
|
||||
pub user_ad_tags: std::collections::HashMap<String, String>,
|
||||
pub user_max_tcp_conns: std::collections::HashMap<String, usize>,
|
||||
@@ -126,6 +135,8 @@ impl HotFields {
|
||||
me_bind_stale_ttl_secs: cfg.general.me_bind_stale_ttl_secs,
|
||||
me_secret_atomic_snapshot: cfg.general.me_secret_atomic_snapshot,
|
||||
me_deterministic_writer_sort: cfg.general.me_deterministic_writer_sort,
|
||||
me_writer_pick_mode: cfg.general.me_writer_pick_mode,
|
||||
me_writer_pick_sample_size: cfg.general.me_writer_pick_sample_size,
|
||||
me_single_endpoint_shadow_writers: cfg.general.me_single_endpoint_shadow_writers,
|
||||
me_single_endpoint_outage_mode_enabled: cfg
|
||||
.general
|
||||
@@ -192,6 +203,10 @@ impl HotFields {
|
||||
me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms,
|
||||
me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms,
|
||||
me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct,
|
||||
me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy,
|
||||
me_health_interval_ms_healthy: cfg.general.me_health_interval_ms_healthy,
|
||||
me_admission_poll_ms: cfg.general.me_admission_poll_ms,
|
||||
me_warn_rate_limit_ms: cfg.general.me_warn_rate_limit_ms,
|
||||
users: cfg.access.users.clone(),
|
||||
user_ad_tags: cfg.access.user_ad_tags.clone(),
|
||||
user_max_tcp_conns: cfg.access.user_max_tcp_conns.clone(),
|
||||
@@ -284,6 +299,8 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
|
||||
cfg.general.me_bind_stale_ttl_secs = new.general.me_bind_stale_ttl_secs;
|
||||
cfg.general.me_secret_atomic_snapshot = new.general.me_secret_atomic_snapshot;
|
||||
cfg.general.me_deterministic_writer_sort = new.general.me_deterministic_writer_sort;
|
||||
cfg.general.me_writer_pick_mode = new.general.me_writer_pick_mode;
|
||||
cfg.general.me_writer_pick_sample_size = new.general.me_writer_pick_sample_size;
|
||||
cfg.general.me_single_endpoint_shadow_writers = new.general.me_single_endpoint_shadow_writers;
|
||||
cfg.general.me_single_endpoint_outage_mode_enabled =
|
||||
new.general.me_single_endpoint_outage_mode_enabled;
|
||||
@@ -335,6 +352,10 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
|
||||
new.general.me_route_backpressure_high_timeout_ms;
|
||||
cfg.general.me_route_backpressure_high_watermark_pct =
|
||||
new.general.me_route_backpressure_high_watermark_pct;
|
||||
cfg.general.me_health_interval_ms_unhealthy = new.general.me_health_interval_ms_unhealthy;
|
||||
cfg.general.me_health_interval_ms_healthy = new.general.me_health_interval_ms_healthy;
|
||||
cfg.general.me_admission_poll_ms = new.general.me_admission_poll_ms;
|
||||
cfg.general.me_warn_rate_limit_ms = new.general.me_warn_rate_limit_ms;
|
||||
|
||||
cfg.access.users = new.access.users.clone();
|
||||
cfg.access.user_ad_tags = new.access.user_ad_tags.clone();
|
||||
@@ -671,11 +692,15 @@ fn log_changes(
|
||||
}
|
||||
if old_hot.me_secret_atomic_snapshot != new_hot.me_secret_atomic_snapshot
|
||||
|| old_hot.me_deterministic_writer_sort != new_hot.me_deterministic_writer_sort
|
||||
|| old_hot.me_writer_pick_mode != new_hot.me_writer_pick_mode
|
||||
|| old_hot.me_writer_pick_sample_size != new_hot.me_writer_pick_sample_size
|
||||
{
|
||||
info!(
|
||||
"config reload: me_runtime_flags: secret_atomic_snapshot={} deterministic_sort={}",
|
||||
"config reload: me_runtime_flags: secret_atomic_snapshot={} deterministic_sort={} writer_pick_mode={:?} writer_pick_sample_size={}",
|
||||
new_hot.me_secret_atomic_snapshot,
|
||||
new_hot.me_deterministic_writer_sort
|
||||
new_hot.me_deterministic_writer_sort,
|
||||
new_hot.me_writer_pick_mode,
|
||||
new_hot.me_writer_pick_sample_size,
|
||||
);
|
||||
}
|
||||
if old_hot.me_single_endpoint_shadow_writers != new_hot.me_single_endpoint_shadow_writers
|
||||
@@ -796,12 +821,21 @@ fn log_changes(
|
||||
!= new_hot.me_route_backpressure_high_timeout_ms
|
||||
|| old_hot.me_route_backpressure_high_watermark_pct
|
||||
!= new_hot.me_route_backpressure_high_watermark_pct
|
||||
|| old_hot.me_health_interval_ms_unhealthy
|
||||
!= new_hot.me_health_interval_ms_unhealthy
|
||||
|| old_hot.me_health_interval_ms_healthy != new_hot.me_health_interval_ms_healthy
|
||||
|| old_hot.me_admission_poll_ms != new_hot.me_admission_poll_ms
|
||||
|| old_hot.me_warn_rate_limit_ms != new_hot.me_warn_rate_limit_ms
|
||||
{
|
||||
info!(
|
||||
"config reload: me_route_backpressure: base={}ms high={}ms watermark={}%",
|
||||
"config reload: me_route_backpressure: base={}ms high={}ms watermark={}%; me_health_interval: unhealthy={}ms healthy={}ms; me_admission_poll={}ms; me_warn_rate_limit={}ms",
|
||||
new_hot.me_route_backpressure_base_timeout_ms,
|
||||
new_hot.me_route_backpressure_high_timeout_ms,
|
||||
new_hot.me_route_backpressure_high_watermark_pct,
|
||||
new_hot.me_health_interval_ms_unhealthy,
|
||||
new_hot.me_health_interval_ms_healthy,
|
||||
new_hot.me_admission_poll_ms,
|
||||
new_hot.me_warn_rate_limit_ms,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -285,6 +285,48 @@ impl ProxyConfig {
|
||||
));
|
||||
}
|
||||
|
||||
if config.general.me_writer_cmd_channel_capacity == 0 {
|
||||
return Err(ProxyError::Config(
|
||||
"general.me_writer_cmd_channel_capacity must be > 0".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if config.general.me_route_channel_capacity == 0 {
|
||||
return Err(ProxyError::Config(
|
||||
"general.me_route_channel_capacity must be > 0".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if config.general.me_c2me_channel_capacity == 0 {
|
||||
return Err(ProxyError::Config(
|
||||
"general.me_c2me_channel_capacity must be > 0".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if config.general.me_health_interval_ms_unhealthy == 0 {
|
||||
return Err(ProxyError::Config(
|
||||
"general.me_health_interval_ms_unhealthy must be > 0".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if config.general.me_health_interval_ms_healthy == 0 {
|
||||
return Err(ProxyError::Config(
|
||||
"general.me_health_interval_ms_healthy must be > 0".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if config.general.me_admission_poll_ms == 0 {
|
||||
return Err(ProxyError::Config(
|
||||
"general.me_admission_poll_ms must be > 0".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if config.general.me_warn_rate_limit_ms == 0 {
|
||||
return Err(ProxyError::Config(
|
||||
"general.me_warn_rate_limit_ms must be > 0".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if config.access.user_max_unique_ips_window_secs == 0 {
|
||||
return Err(ProxyError::Config(
|
||||
"access.user_max_unique_ips_window_secs must be > 0".to_string(),
|
||||
@@ -477,6 +519,12 @@ impl ProxyConfig {
|
||||
));
|
||||
}
|
||||
|
||||
if !(2..=4).contains(&config.general.me_writer_pick_sample_size) {
|
||||
return Err(ProxyError::Config(
|
||||
"general.me_writer_pick_sample_size must be within [2, 4]".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if config.general.me_route_inline_recovery_attempts == 0 {
|
||||
return Err(ProxyError::Config(
|
||||
"general.me_route_inline_recovery_attempts must be > 0".to_string(),
|
||||
|
||||
@@ -212,6 +212,32 @@ impl MeRouteNoWriterMode {
|
||||
}
|
||||
}
|
||||
|
||||
/// Middle-End writer selection mode for new client bindings.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum MeWriterPickMode {
|
||||
SortedRr,
|
||||
#[default]
|
||||
P2c,
|
||||
}
|
||||
|
||||
impl MeWriterPickMode {
|
||||
pub fn as_u8(self) -> u8 {
|
||||
match self {
|
||||
MeWriterPickMode::SortedRr => 0,
|
||||
MeWriterPickMode::P2c => 1,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_u8(raw: u8) -> Self {
|
||||
match raw {
|
||||
0 => MeWriterPickMode::SortedRr,
|
||||
1 => MeWriterPickMode::P2c,
|
||||
_ => MeWriterPickMode::P2c,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-user unique source IP limit mode.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
@@ -420,6 +446,18 @@ pub struct GeneralConfig {
|
||||
#[serde(default = "default_rpc_proxy_req_every")]
|
||||
pub rpc_proxy_req_every: u64,
|
||||
|
||||
/// Capacity of per-ME writer command channel.
|
||||
#[serde(default = "default_me_writer_cmd_channel_capacity")]
|
||||
pub me_writer_cmd_channel_capacity: usize,
|
||||
|
||||
/// Capacity of per-connection ME response route channel.
|
||||
#[serde(default = "default_me_route_channel_capacity")]
|
||||
pub me_route_channel_capacity: usize,
|
||||
|
||||
/// Capacity of per-client command queue from client reader to ME sender task.
|
||||
#[serde(default = "default_me_c2me_channel_capacity")]
|
||||
pub me_c2me_channel_capacity: usize,
|
||||
|
||||
/// Max pending ciphertext buffer per client writer (bytes).
|
||||
/// Controls FakeTLS backpressure vs throughput.
|
||||
#[serde(default = "default_crypto_pending_buffer")]
|
||||
@@ -620,6 +658,22 @@ pub struct GeneralConfig {
|
||||
#[serde(default = "default_me_route_backpressure_high_watermark_pct")]
|
||||
pub me_route_backpressure_high_watermark_pct: u8,
|
||||
|
||||
/// Health monitor interval in milliseconds while writer coverage is degraded.
|
||||
#[serde(default = "default_me_health_interval_ms_unhealthy")]
|
||||
pub me_health_interval_ms_unhealthy: u64,
|
||||
|
||||
/// Health monitor interval in milliseconds while writer coverage is stable.
|
||||
#[serde(default = "default_me_health_interval_ms_healthy")]
|
||||
pub me_health_interval_ms_healthy: u64,
|
||||
|
||||
/// Poll interval in milliseconds for conditional-admission state checks.
|
||||
#[serde(default = "default_me_admission_poll_ms")]
|
||||
pub me_admission_poll_ms: u64,
|
||||
|
||||
/// Cooldown for repetitive ME warning logs in milliseconds.
|
||||
#[serde(default = "default_me_warn_rate_limit_ms")]
|
||||
pub me_warn_rate_limit_ms: u64,
|
||||
|
||||
/// ME route behavior when no writer is immediately available.
|
||||
#[serde(default)]
|
||||
pub me_route_no_writer_mode: MeRouteNoWriterMode,
|
||||
@@ -754,6 +808,14 @@ pub struct GeneralConfig {
|
||||
#[serde(default = "default_me_deterministic_writer_sort")]
|
||||
pub me_deterministic_writer_sort: bool,
|
||||
|
||||
/// Writer selection mode for ME route bind path.
|
||||
#[serde(default)]
|
||||
pub me_writer_pick_mode: MeWriterPickMode,
|
||||
|
||||
/// Number of candidates sampled by writer picker in `p2c` mode.
|
||||
#[serde(default = "default_me_writer_pick_sample_size")]
|
||||
pub me_writer_pick_sample_size: u8,
|
||||
|
||||
/// Enable NTP drift check at startup.
|
||||
#[serde(default = "default_ntp_check")]
|
||||
pub ntp_check: bool,
|
||||
@@ -796,6 +858,9 @@ impl Default for GeneralConfig {
|
||||
me_keepalive_jitter_secs: default_keepalive_jitter(),
|
||||
me_keepalive_payload_random: default_true(),
|
||||
rpc_proxy_req_every: default_rpc_proxy_req_every(),
|
||||
me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(),
|
||||
me_route_channel_capacity: default_me_route_channel_capacity(),
|
||||
me_c2me_channel_capacity: default_me_c2me_channel_capacity(),
|
||||
me_warmup_stagger_enabled: default_true(),
|
||||
me_warmup_step_delay_ms: default_warmup_step_delay_ms(),
|
||||
me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(),
|
||||
@@ -837,6 +902,10 @@ impl Default for GeneralConfig {
|
||||
me_route_backpressure_base_timeout_ms: default_me_route_backpressure_base_timeout_ms(),
|
||||
me_route_backpressure_high_timeout_ms: default_me_route_backpressure_high_timeout_ms(),
|
||||
me_route_backpressure_high_watermark_pct: default_me_route_backpressure_high_watermark_pct(),
|
||||
me_health_interval_ms_unhealthy: default_me_health_interval_ms_unhealthy(),
|
||||
me_health_interval_ms_healthy: default_me_health_interval_ms_healthy(),
|
||||
me_admission_poll_ms: default_me_admission_poll_ms(),
|
||||
me_warn_rate_limit_ms: default_me_warn_rate_limit_ms(),
|
||||
me_route_no_writer_mode: MeRouteNoWriterMode::default(),
|
||||
me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(),
|
||||
me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(),
|
||||
@@ -877,6 +946,8 @@ impl Default for GeneralConfig {
|
||||
me_reinit_trigger_channel: default_me_reinit_trigger_channel(),
|
||||
me_reinit_coalesce_window_ms: default_me_reinit_coalesce_window_ms(),
|
||||
me_deterministic_writer_sort: default_me_deterministic_writer_sort(),
|
||||
me_writer_pick_mode: MeWriterPickMode::default(),
|
||||
me_writer_pick_sample_size: default_me_writer_pick_sample_size(),
|
||||
ntp_check: default_ntp_check(),
|
||||
ntp_servers: default_ntp_servers(),
|
||||
auto_degradation_enabled: default_true(),
|
||||
|
||||
21
src/main.rs
21
src/main.rs
@@ -1047,10 +1047,17 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
config.general.me_bind_stale_ttl_secs,
|
||||
config.general.me_secret_atomic_snapshot,
|
||||
config.general.me_deterministic_writer_sort,
|
||||
config.general.me_writer_pick_mode,
|
||||
config.general.me_writer_pick_sample_size,
|
||||
config.general.me_socks_kdf_policy,
|
||||
config.general.me_writer_cmd_channel_capacity,
|
||||
config.general.me_route_channel_capacity,
|
||||
config.general.me_route_backpressure_base_timeout_ms,
|
||||
config.general.me_route_backpressure_high_timeout_ms,
|
||||
config.general.me_route_backpressure_high_watermark_pct,
|
||||
config.general.me_health_interval_ms_unhealthy,
|
||||
config.general.me_health_interval_ms_healthy,
|
||||
config.general.me_warn_rate_limit_ms,
|
||||
config.general.me_route_no_writer_mode,
|
||||
config.general.me_route_no_writer_wait_ms,
|
||||
config.general.me_route_inline_recovery_attempts,
|
||||
@@ -1784,11 +1791,24 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
let pool_for_gate = pool.clone();
|
||||
let admission_tx_gate = admission_tx.clone();
|
||||
let mut config_rx_gate = config_rx.clone();
|
||||
let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1);
|
||||
tokio::spawn(async move {
|
||||
let mut gate_open = initial_open;
|
||||
let mut open_streak = if initial_open { 1u32 } else { 0u32 };
|
||||
let mut close_streak = if initial_open { 0u32 } else { 1u32 };
|
||||
loop {
|
||||
tokio::select! {
|
||||
changed = config_rx_gate.changed() => {
|
||||
if changed.is_err() {
|
||||
break;
|
||||
}
|
||||
let cfg = config_rx_gate.borrow_and_update().clone();
|
||||
admission_poll_ms = cfg.general.me_admission_poll_ms.max(1);
|
||||
continue;
|
||||
}
|
||||
_ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {}
|
||||
}
|
||||
let ready = pool_for_gate.admission_ready_conditional_cast().await;
|
||||
if ready {
|
||||
open_streak = open_streak.saturating_add(1);
|
||||
@@ -1813,7 +1833,6 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||
);
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(250)).await;
|
||||
}
|
||||
});
|
||||
} else {
|
||||
|
||||
129
src/metrics.rs
129
src/metrics.rs
@@ -689,6 +689,135 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_writer_pick_total ME writer-pick outcomes by mode and result"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_me_writer_pick_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"success_try\"}} {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_pick_sorted_rr_success_try_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"success_fallback\"}} {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_pick_sorted_rr_success_fallback_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"full\"}} {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_pick_sorted_rr_full_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"closed\"}} {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_pick_sorted_rr_closed_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_pick_total{{mode=\"sorted_rr\",result=\"no_candidate\"}} {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_pick_sorted_rr_no_candidate_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_pick_total{{mode=\"p2c\",result=\"success_try\"}} {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_pick_p2c_success_try_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_pick_total{{mode=\"p2c\",result=\"success_fallback\"}} {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_pick_p2c_success_fallback_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_pick_total{{mode=\"p2c\",result=\"full\"}} {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_pick_p2c_full_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_pick_total{{mode=\"p2c\",result=\"closed\"}} {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_pick_p2c_closed_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_pick_total{{mode=\"p2c\",result=\"no_candidate\"}} {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_pick_p2c_no_candidate_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_writer_pick_blocking_fallback_total ME writer-pick blocking fallback attempts"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# TYPE telemt_me_writer_pick_blocking_fallback_total counter"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_pick_blocking_fallback_total {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_pick_blocking_fallback_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_writer_pick_mode_switch_total Writer-pick mode switches via runtime updates"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_me_writer_pick_mode_switch_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_pick_mode_switch_total {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_pick_mode_switch_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_socks_kdf_policy_total SOCKS KDF policy outcomes"
|
||||
|
||||
@@ -27,7 +27,7 @@ enum C2MeCommand {
|
||||
|
||||
const DESYNC_DEDUP_WINDOW: Duration = Duration::from_secs(60);
|
||||
const DESYNC_ERROR_CLASS: &str = "frame_too_large_crypto_desync";
|
||||
const C2ME_CHANNEL_CAPACITY: usize = 1024;
|
||||
const C2ME_CHANNEL_CAPACITY_FALLBACK: usize = 128;
|
||||
const C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS: usize = 64;
|
||||
const C2ME_SENDER_FAIRNESS_BUDGET: usize = 32;
|
||||
static DESYNC_DEDUP: OnceLock<Mutex<HashMap<u64, Instant>>> = OnceLock::new();
|
||||
@@ -271,7 +271,11 @@ where
|
||||
|
||||
let frame_limit = config.general.max_client_frame;
|
||||
|
||||
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(C2ME_CHANNEL_CAPACITY);
|
||||
let c2me_channel_capacity = config
|
||||
.general
|
||||
.me_c2me_channel_capacity
|
||||
.max(C2ME_CHANNEL_CAPACITY_FALLBACK);
|
||||
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity);
|
||||
let me_pool_c2me = me_pool.clone();
|
||||
let effective_tag = effective_tag;
|
||||
let c2me_sender = tokio::spawn(async move {
|
||||
|
||||
147
src/stats/mod.rs
147
src/stats/mod.rs
@@ -16,7 +16,7 @@ use std::collections::hash_map::DefaultHasher;
|
||||
use std::collections::VecDeque;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::config::MeTelemetryLevel;
|
||||
use crate::config::{MeTelemetryLevel, MeWriterPickMode};
|
||||
use self::telemetry::TelemetryPolicy;
|
||||
|
||||
// ============= Stats =============
|
||||
@@ -95,6 +95,18 @@ pub struct Stats {
|
||||
me_route_drop_queue_full: AtomicU64,
|
||||
me_route_drop_queue_full_base: AtomicU64,
|
||||
me_route_drop_queue_full_high: AtomicU64,
|
||||
me_writer_pick_sorted_rr_success_try_total: AtomicU64,
|
||||
me_writer_pick_sorted_rr_success_fallback_total: AtomicU64,
|
||||
me_writer_pick_sorted_rr_full_total: AtomicU64,
|
||||
me_writer_pick_sorted_rr_closed_total: AtomicU64,
|
||||
me_writer_pick_sorted_rr_no_candidate_total: AtomicU64,
|
||||
me_writer_pick_p2c_success_try_total: AtomicU64,
|
||||
me_writer_pick_p2c_success_fallback_total: AtomicU64,
|
||||
me_writer_pick_p2c_full_total: AtomicU64,
|
||||
me_writer_pick_p2c_closed_total: AtomicU64,
|
||||
me_writer_pick_p2c_no_candidate_total: AtomicU64,
|
||||
me_writer_pick_blocking_fallback_total: AtomicU64,
|
||||
me_writer_pick_mode_switch_total: AtomicU64,
|
||||
me_socks_kdf_strict_reject: AtomicU64,
|
||||
me_socks_kdf_compat_fallback: AtomicU64,
|
||||
secure_padding_invalid: AtomicU64,
|
||||
@@ -497,6 +509,93 @@ impl Stats {
|
||||
self.me_route_drop_queue_full_high.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_writer_pick_success_try_total(&self, mode: MeWriterPickMode) {
|
||||
if !self.telemetry_me_allows_normal() {
|
||||
return;
|
||||
}
|
||||
match mode {
|
||||
MeWriterPickMode::SortedRr => {
|
||||
self.me_writer_pick_sorted_rr_success_try_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
MeWriterPickMode::P2c => {
|
||||
self.me_writer_pick_p2c_success_try_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn increment_me_writer_pick_success_fallback_total(&self, mode: MeWriterPickMode) {
|
||||
if !self.telemetry_me_allows_normal() {
|
||||
return;
|
||||
}
|
||||
match mode {
|
||||
MeWriterPickMode::SortedRr => {
|
||||
self.me_writer_pick_sorted_rr_success_fallback_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
MeWriterPickMode::P2c => {
|
||||
self.me_writer_pick_p2c_success_fallback_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn increment_me_writer_pick_full_total(&self, mode: MeWriterPickMode) {
|
||||
if !self.telemetry_me_allows_normal() {
|
||||
return;
|
||||
}
|
||||
match mode {
|
||||
MeWriterPickMode::SortedRr => {
|
||||
self.me_writer_pick_sorted_rr_full_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
MeWriterPickMode::P2c => {
|
||||
self.me_writer_pick_p2c_full_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn increment_me_writer_pick_closed_total(&self, mode: MeWriterPickMode) {
|
||||
if !self.telemetry_me_allows_normal() {
|
||||
return;
|
||||
}
|
||||
match mode {
|
||||
MeWriterPickMode::SortedRr => {
|
||||
self.me_writer_pick_sorted_rr_closed_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
MeWriterPickMode::P2c => {
|
||||
self.me_writer_pick_p2c_closed_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn increment_me_writer_pick_no_candidate_total(&self, mode: MeWriterPickMode) {
|
||||
if !self.telemetry_me_allows_normal() {
|
||||
return;
|
||||
}
|
||||
match mode {
|
||||
MeWriterPickMode::SortedRr => {
|
||||
self.me_writer_pick_sorted_rr_no_candidate_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
MeWriterPickMode::P2c => {
|
||||
self.me_writer_pick_p2c_no_candidate_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn increment_me_writer_pick_blocking_fallback_total(&self) {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.me_writer_pick_blocking_fallback_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_writer_pick_mode_switch_total(&self) {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.me_writer_pick_mode_switch_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_socks_kdf_strict_reject(&self) {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.me_socks_kdf_strict_reject.fetch_add(1, Ordering::Relaxed);
|
||||
@@ -1001,6 +1100,52 @@ impl Stats {
|
||||
pub fn get_me_route_drop_queue_full_high(&self) -> u64 {
|
||||
self.me_route_drop_queue_full_high.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_writer_pick_sorted_rr_success_try_total(&self) -> u64 {
|
||||
self.me_writer_pick_sorted_rr_success_try_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_writer_pick_sorted_rr_success_fallback_total(&self) -> u64 {
|
||||
self.me_writer_pick_sorted_rr_success_fallback_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_writer_pick_sorted_rr_full_total(&self) -> u64 {
|
||||
self.me_writer_pick_sorted_rr_full_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_writer_pick_sorted_rr_closed_total(&self) -> u64 {
|
||||
self.me_writer_pick_sorted_rr_closed_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_writer_pick_sorted_rr_no_candidate_total(&self) -> u64 {
|
||||
self.me_writer_pick_sorted_rr_no_candidate_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_writer_pick_p2c_success_try_total(&self) -> u64 {
|
||||
self.me_writer_pick_p2c_success_try_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_writer_pick_p2c_success_fallback_total(&self) -> u64 {
|
||||
self.me_writer_pick_p2c_success_fallback_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_writer_pick_p2c_full_total(&self) -> u64 {
|
||||
self.me_writer_pick_p2c_full_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_writer_pick_p2c_closed_total(&self) -> u64 {
|
||||
self.me_writer_pick_p2c_closed_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_writer_pick_p2c_no_candidate_total(&self) -> u64 {
|
||||
self.me_writer_pick_p2c_no_candidate_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_writer_pick_blocking_fallback_total(&self) -> u64 {
|
||||
self.me_writer_pick_blocking_fallback_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_writer_pick_mode_switch_total(&self) -> u64 {
|
||||
self.me_writer_pick_mode_switch_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_socks_kdf_strict_reject(&self) -> u64 {
|
||||
self.me_socks_kdf_strict_reject.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
@@ -306,6 +306,8 @@ async fn run_update_cycle(
|
||||
cfg.general.me_bind_stale_ttl_secs,
|
||||
cfg.general.me_secret_atomic_snapshot,
|
||||
cfg.general.me_deterministic_writer_sort,
|
||||
cfg.general.me_writer_pick_mode,
|
||||
cfg.general.me_writer_pick_sample_size,
|
||||
cfg.general.me_single_endpoint_shadow_writers,
|
||||
cfg.general.me_single_endpoint_outage_mode_enabled,
|
||||
cfg.general.me_single_endpoint_outage_disable_quarantine,
|
||||
@@ -325,6 +327,9 @@ async fn run_update_cycle(
|
||||
cfg.general.me_adaptive_floor_max_warm_writers_per_core,
|
||||
cfg.general.me_adaptive_floor_max_active_writers_global,
|
||||
cfg.general.me_adaptive_floor_max_warm_writers_global,
|
||||
cfg.general.me_health_interval_ms_unhealthy,
|
||||
cfg.general.me_health_interval_ms_healthy,
|
||||
cfg.general.me_warn_rate_limit_ms,
|
||||
);
|
||||
|
||||
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
|
||||
@@ -527,6 +532,8 @@ pub async fn me_config_updater(
|
||||
cfg.general.me_bind_stale_ttl_secs,
|
||||
cfg.general.me_secret_atomic_snapshot,
|
||||
cfg.general.me_deterministic_writer_sort,
|
||||
cfg.general.me_writer_pick_mode,
|
||||
cfg.general.me_writer_pick_sample_size,
|
||||
cfg.general.me_single_endpoint_shadow_writers,
|
||||
cfg.general.me_single_endpoint_outage_mode_enabled,
|
||||
cfg.general.me_single_endpoint_outage_disable_quarantine,
|
||||
@@ -546,6 +553,9 @@ pub async fn me_config_updater(
|
||||
cfg.general.me_adaptive_floor_max_warm_writers_per_core,
|
||||
cfg.general.me_adaptive_floor_max_active_writers_global,
|
||||
cfg.general.me_adaptive_floor_max_warm_writers_global,
|
||||
cfg.general.me_health_interval_ms_unhealthy,
|
||||
cfg.general.me_health_interval_ms_healthy,
|
||||
cfg.general.me_warn_rate_limit_ms,
|
||||
);
|
||||
let new_secs = cfg.general.effective_update_every_secs().max(1);
|
||||
if new_secs == update_every_secs {
|
||||
|
||||
@@ -13,7 +13,6 @@ use crate::network::IpFamily;
|
||||
|
||||
use super::MePool;
|
||||
|
||||
const HEALTH_INTERVAL_SECS: u64 = 1;
|
||||
const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff
|
||||
#[allow(dead_code)]
|
||||
const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1;
|
||||
@@ -62,11 +61,18 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||
let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||
let mut degraded_interval = true;
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
|
||||
let interval = if degraded_interval {
|
||||
pool.health_interval_unhealthy()
|
||||
} else {
|
||||
pool.health_interval_healthy()
|
||||
};
|
||||
tokio::time::sleep(interval).await;
|
||||
pool.prune_closed_writers().await;
|
||||
reap_draining_writers(&pool).await;
|
||||
check_family(
|
||||
let v4_degraded = check_family(
|
||||
IpFamily::V4,
|
||||
&pool,
|
||||
&rng,
|
||||
@@ -80,9 +86,10 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||
&mut idle_refresh_next_attempt,
|
||||
&mut adaptive_idle_since,
|
||||
&mut adaptive_recover_until,
|
||||
&mut floor_warn_next_allowed,
|
||||
)
|
||||
.await;
|
||||
check_family(
|
||||
let v6_degraded = check_family(
|
||||
IpFamily::V6,
|
||||
&pool,
|
||||
&rng,
|
||||
@@ -96,8 +103,10 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||
&mut idle_refresh_next_attempt,
|
||||
&mut adaptive_idle_since,
|
||||
&mut adaptive_recover_until,
|
||||
&mut floor_warn_next_allowed,
|
||||
)
|
||||
.await;
|
||||
degraded_interval = v4_degraded || v6_degraded;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,15 +146,18 @@ async fn check_family(
|
||||
idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
) {
|
||||
floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
) -> bool {
|
||||
let enabled = match family {
|
||||
IpFamily::V4 => pool.decision.ipv4_me,
|
||||
IpFamily::V6 => pool.decision.ipv6_me,
|
||||
};
|
||||
if !enabled {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
let mut family_degraded = false;
|
||||
|
||||
let mut dc_endpoints = HashMap::<i32, Vec<SocketAddr>>::new();
|
||||
let map_guard = match family {
|
||||
IpFamily::V4 => pool.proxy_map_v4.read().await,
|
||||
@@ -234,6 +246,7 @@ async fn check_family(
|
||||
.sum::<usize>();
|
||||
|
||||
if endpoints.len() == 1 && pool.single_endpoint_outage_mode_enabled() && alive == 0 {
|
||||
family_degraded = true;
|
||||
if single_endpoint_outage.insert(key) {
|
||||
pool.stats.increment_me_single_endpoint_outage_enter_total();
|
||||
warn!(
|
||||
@@ -310,6 +323,7 @@ async fn check_family(
|
||||
continue;
|
||||
}
|
||||
let missing = required - alive;
|
||||
family_degraded = true;
|
||||
|
||||
let now = Instant::now();
|
||||
if reconnect_budget == 0 {
|
||||
@@ -438,15 +452,23 @@ async fn check_family(
|
||||
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
|
||||
next_attempt.insert(key, now + wait);
|
||||
if pool.is_runtime_ready() {
|
||||
warn!(
|
||||
dc = %dc,
|
||||
?family,
|
||||
alive = now_alive,
|
||||
required,
|
||||
endpoint_count = endpoints.len(),
|
||||
backoff_ms = next_ms,
|
||||
"DC writer floor is below required level, scheduled reconnect"
|
||||
);
|
||||
let warn_cooldown = pool.warn_rate_limit_duration();
|
||||
if should_emit_rate_limited_warn(
|
||||
floor_warn_next_allowed,
|
||||
key,
|
||||
now,
|
||||
warn_cooldown,
|
||||
) {
|
||||
warn!(
|
||||
dc = %dc,
|
||||
?family,
|
||||
alive = now_alive,
|
||||
required,
|
||||
endpoint_count = endpoints.len(),
|
||||
backoff_ms = next_ms,
|
||||
"DC writer floor is below required level, scheduled reconnect"
|
||||
);
|
||||
}
|
||||
} else {
|
||||
info!(
|
||||
dc = %dc,
|
||||
@@ -463,6 +485,8 @@ async fn check_family(
|
||||
*v = v.saturating_sub(1);
|
||||
}
|
||||
}
|
||||
|
||||
family_degraded
|
||||
}
|
||||
|
||||
fn health_reconnect_budget(pool: &Arc<MePool>, dc_groups: usize) -> usize {
|
||||
@@ -474,6 +498,23 @@ fn health_reconnect_budget(pool: &Arc<MePool>, dc_groups: usize) -> usize {
|
||||
.clamp(HEALTH_RECONNECT_BUDGET_MIN, HEALTH_RECONNECT_BUDGET_MAX)
|
||||
}
|
||||
|
||||
fn should_emit_rate_limited_warn(
|
||||
next_allowed: &mut HashMap<(i32, IpFamily), Instant>,
|
||||
key: (i32, IpFamily),
|
||||
now: Instant,
|
||||
cooldown: Duration,
|
||||
) -> bool {
|
||||
let Some(ready_at) = next_allowed.get(&key).copied() else {
|
||||
next_allowed.insert(key, now + cooldown);
|
||||
return true;
|
||||
};
|
||||
if now >= ready_at {
|
||||
next_allowed.insert(key, now + cooldown);
|
||||
return true;
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
fn adaptive_floor_class_min(
|
||||
pool: &Arc<MePool>,
|
||||
endpoint_count: usize,
|
||||
|
||||
@@ -7,7 +7,9 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
use tokio::sync::{Mutex, Notify, RwLock, mpsc};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::config::{MeBindStaleMode, MeFloorMode, MeRouteNoWriterMode, MeSocksKdfPolicy};
|
||||
use crate::config::{
|
||||
MeBindStaleMode, MeFloorMode, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode,
|
||||
};
|
||||
use crate::crypto::SecureRandom;
|
||||
use crate::network::IpFamily;
|
||||
use crate::network::probe::NetworkDecision;
|
||||
@@ -39,6 +41,7 @@ pub struct MeWriter {
|
||||
pub tx: mpsc::Sender<WriterCommand>,
|
||||
pub cancel: CancellationToken,
|
||||
pub degraded: Arc<AtomicBool>,
|
||||
pub rtt_ema_ms_x10: Arc<AtomicU32>,
|
||||
pub draining: Arc<AtomicBool>,
|
||||
pub draining_started_at_epoch_secs: Arc<AtomicU64>,
|
||||
pub drain_deadline_epoch_secs: Arc<AtomicU64>,
|
||||
@@ -103,6 +106,7 @@ pub struct MePool {
|
||||
pub(super) me_keepalive_jitter: Duration,
|
||||
pub(super) me_keepalive_payload_random: bool,
|
||||
pub(super) rpc_proxy_req_every_secs: AtomicU64,
|
||||
pub(super) writer_cmd_channel_capacity: usize,
|
||||
pub(super) me_warmup_stagger_enabled: bool,
|
||||
pub(super) me_warmup_step_delay: Duration,
|
||||
pub(super) me_warmup_step_jitter: Duration,
|
||||
@@ -176,13 +180,19 @@ pub struct MePool {
|
||||
pub(super) me_bind_stale_ttl_secs: AtomicU64,
|
||||
pub(super) secret_atomic_snapshot: AtomicBool,
|
||||
pub(super) me_deterministic_writer_sort: AtomicBool,
|
||||
pub(super) me_writer_pick_mode: AtomicU8,
|
||||
pub(super) me_writer_pick_sample_size: AtomicU8,
|
||||
pub(super) me_socks_kdf_policy: AtomicU8,
|
||||
pub(super) me_route_no_writer_mode: AtomicU8,
|
||||
pub(super) me_route_no_writer_wait: Duration,
|
||||
pub(super) me_route_inline_recovery_attempts: u32,
|
||||
pub(super) me_route_inline_recovery_wait: Duration,
|
||||
pub(super) me_health_interval_ms_unhealthy: AtomicU64,
|
||||
pub(super) me_health_interval_ms_healthy: AtomicU64,
|
||||
pub(super) me_warn_rate_limit_ms: AtomicU64,
|
||||
pub(super) runtime_ready: AtomicBool,
|
||||
pool_size: usize,
|
||||
pub(super) preferred_endpoints_by_dc: Arc<RwLock<HashMap<i32, Vec<SocketAddr>>>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -269,17 +279,28 @@ impl MePool {
|
||||
me_bind_stale_ttl_secs: u64,
|
||||
me_secret_atomic_snapshot: bool,
|
||||
me_deterministic_writer_sort: bool,
|
||||
me_writer_pick_mode: MeWriterPickMode,
|
||||
me_writer_pick_sample_size: u8,
|
||||
me_socks_kdf_policy: MeSocksKdfPolicy,
|
||||
me_writer_cmd_channel_capacity: usize,
|
||||
me_route_channel_capacity: usize,
|
||||
me_route_backpressure_base_timeout_ms: u64,
|
||||
me_route_backpressure_high_timeout_ms: u64,
|
||||
me_route_backpressure_high_watermark_pct: u8,
|
||||
me_health_interval_ms_unhealthy: u64,
|
||||
me_health_interval_ms_healthy: u64,
|
||||
me_warn_rate_limit_ms: u64,
|
||||
me_route_no_writer_mode: MeRouteNoWriterMode,
|
||||
me_route_no_writer_wait_ms: u64,
|
||||
me_route_inline_recovery_attempts: u32,
|
||||
me_route_inline_recovery_wait_ms: u64,
|
||||
) -> Arc<Self> {
|
||||
let endpoint_dc_map = Self::build_endpoint_dc_map_from_maps(&proxy_map_v4, &proxy_map_v6);
|
||||
let registry = Arc::new(ConnRegistry::new());
|
||||
let preferred_endpoints_by_dc =
|
||||
Self::build_preferred_endpoints_by_dc(&decision, &proxy_map_v4, &proxy_map_v6);
|
||||
let registry = Arc::new(ConnRegistry::with_route_channel_capacity(
|
||||
me_route_channel_capacity,
|
||||
));
|
||||
registry.update_route_backpressure_policy(
|
||||
me_route_backpressure_base_timeout_ms,
|
||||
me_route_backpressure_high_timeout_ms,
|
||||
@@ -326,6 +347,7 @@ impl MePool {
|
||||
me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs),
|
||||
me_keepalive_payload_random,
|
||||
rpc_proxy_req_every_secs: AtomicU64::new(rpc_proxy_req_every_secs),
|
||||
writer_cmd_channel_capacity: me_writer_cmd_channel_capacity.max(1),
|
||||
me_warmup_stagger_enabled,
|
||||
me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms),
|
||||
me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms),
|
||||
@@ -435,12 +457,18 @@ impl MePool {
|
||||
me_bind_stale_ttl_secs: AtomicU64::new(me_bind_stale_ttl_secs),
|
||||
secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot),
|
||||
me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort),
|
||||
me_writer_pick_mode: AtomicU8::new(me_writer_pick_mode.as_u8()),
|
||||
me_writer_pick_sample_size: AtomicU8::new(me_writer_pick_sample_size.clamp(2, 4)),
|
||||
me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()),
|
||||
me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()),
|
||||
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
|
||||
me_route_inline_recovery_attempts,
|
||||
me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms),
|
||||
me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)),
|
||||
me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)),
|
||||
me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)),
|
||||
runtime_ready: AtomicBool::new(false),
|
||||
preferred_endpoints_by_dc: Arc::new(RwLock::new(preferred_endpoints_by_dc)),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -470,6 +498,8 @@ impl MePool {
|
||||
bind_stale_ttl_secs: u64,
|
||||
secret_atomic_snapshot: bool,
|
||||
deterministic_writer_sort: bool,
|
||||
writer_pick_mode: MeWriterPickMode,
|
||||
writer_pick_sample_size: u8,
|
||||
single_endpoint_shadow_writers: u8,
|
||||
single_endpoint_outage_mode_enabled: bool,
|
||||
single_endpoint_outage_disable_quarantine: bool,
|
||||
@@ -489,6 +519,9 @@ impl MePool {
|
||||
adaptive_floor_max_warm_writers_per_core: u16,
|
||||
adaptive_floor_max_active_writers_global: u32,
|
||||
adaptive_floor_max_warm_writers_global: u32,
|
||||
me_health_interval_ms_unhealthy: u64,
|
||||
me_health_interval_ms_healthy: u64,
|
||||
me_warn_rate_limit_ms: u64,
|
||||
) {
|
||||
self.hardswap.store(hardswap, Ordering::Relaxed);
|
||||
self.me_pool_drain_ttl_secs
|
||||
@@ -513,6 +546,14 @@ impl MePool {
|
||||
.store(secret_atomic_snapshot, Ordering::Relaxed);
|
||||
self.me_deterministic_writer_sort
|
||||
.store(deterministic_writer_sort, Ordering::Relaxed);
|
||||
let previous_writer_pick_mode = self.writer_pick_mode();
|
||||
self.me_writer_pick_mode
|
||||
.store(writer_pick_mode.as_u8(), Ordering::Relaxed);
|
||||
self.me_writer_pick_sample_size
|
||||
.store(writer_pick_sample_size.clamp(2, 4), Ordering::Relaxed);
|
||||
if previous_writer_pick_mode != writer_pick_mode {
|
||||
self.stats.increment_me_writer_pick_mode_switch_total();
|
||||
}
|
||||
self.me_single_endpoint_shadow_writers
|
||||
.store(single_endpoint_shadow_writers, Ordering::Relaxed);
|
||||
self.me_single_endpoint_outage_mode_enabled
|
||||
@@ -564,6 +605,12 @@ impl MePool {
|
||||
.store(adaptive_floor_max_active_writers_global, Ordering::Relaxed);
|
||||
self.me_adaptive_floor_max_warm_writers_global
|
||||
.store(adaptive_floor_max_warm_writers_global, Ordering::Relaxed);
|
||||
self.me_health_interval_ms_unhealthy
|
||||
.store(me_health_interval_ms_unhealthy.max(1), Ordering::Relaxed);
|
||||
self.me_health_interval_ms_healthy
|
||||
.store(me_health_interval_ms_healthy.max(1), Ordering::Relaxed);
|
||||
self.me_warn_rate_limit_ms
|
||||
.store(me_warn_rate_limit_ms.max(1), Ordering::Relaxed);
|
||||
if previous_floor_mode != floor_mode {
|
||||
self.stats.increment_me_floor_mode_switch_total();
|
||||
match (previous_floor_mode, floor_mode) {
|
||||
@@ -664,6 +711,16 @@ impl MePool {
|
||||
MeBindStaleMode::from_u8(self.me_bind_stale_mode.load(Ordering::Relaxed))
|
||||
}
|
||||
|
||||
pub(super) fn writer_pick_mode(&self) -> MeWriterPickMode {
|
||||
MeWriterPickMode::from_u8(self.me_writer_pick_mode.load(Ordering::Relaxed))
|
||||
}
|
||||
|
||||
pub(super) fn writer_pick_sample_size(&self) -> usize {
|
||||
self.me_writer_pick_sample_size
|
||||
.load(Ordering::Relaxed)
|
||||
.clamp(2, 4) as usize
|
||||
}
|
||||
|
||||
pub(super) fn required_writers_for_dc(&self, endpoint_count: usize) -> usize {
|
||||
if endpoint_count == 0 {
|
||||
return 0;
|
||||
@@ -1042,6 +1099,62 @@ impl MePool {
|
||||
}
|
||||
}
|
||||
|
||||
fn build_preferred_endpoints_by_dc(
|
||||
decision: &NetworkDecision,
|
||||
map_v4: &HashMap<i32, Vec<(IpAddr, u16)>>,
|
||||
map_v6: &HashMap<i32, Vec<(IpAddr, u16)>>,
|
||||
) -> HashMap<i32, Vec<SocketAddr>> {
|
||||
let mut out = HashMap::<i32, Vec<SocketAddr>>::new();
|
||||
let mut dcs = HashSet::<i32>::new();
|
||||
dcs.extend(map_v4.keys().copied());
|
||||
dcs.extend(map_v6.keys().copied());
|
||||
|
||||
for dc in dcs {
|
||||
let v4 = map_v4
|
||||
.get(&dc)
|
||||
.map(|items| {
|
||||
items
|
||||
.iter()
|
||||
.map(|(ip, port)| SocketAddr::new(*ip, *port))
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
let v6 = map_v6
|
||||
.get(&dc)
|
||||
.map(|items| {
|
||||
items
|
||||
.iter()
|
||||
.map(|(ip, port)| SocketAddr::new(*ip, *port))
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut selected = if decision.effective_multipath {
|
||||
let mut both = Vec::<SocketAddr>::with_capacity(v4.len().saturating_add(v6.len()));
|
||||
if decision.prefer_ipv6() {
|
||||
both.extend(v6.iter().copied());
|
||||
both.extend(v4.iter().copied());
|
||||
} else {
|
||||
both.extend(v4.iter().copied());
|
||||
both.extend(v6.iter().copied());
|
||||
}
|
||||
both
|
||||
} else if decision.prefer_ipv6() {
|
||||
if !v6.is_empty() { v6 } else { v4 }
|
||||
} else if !v4.is_empty() {
|
||||
v4
|
||||
} else {
|
||||
v6
|
||||
};
|
||||
|
||||
selected.sort_unstable();
|
||||
selected.dedup();
|
||||
out.insert(dc, selected);
|
||||
}
|
||||
|
||||
out
|
||||
}
|
||||
|
||||
fn build_endpoint_dc_map_from_maps(
|
||||
map_v4: &HashMap<i32, Vec<(IpAddr, u16)>>,
|
||||
map_v6: &HashMap<i32, Vec<(IpAddr, u16)>>,
|
||||
@@ -1064,6 +1177,25 @@ impl MePool {
|
||||
let map_v4 = self.proxy_map_v4.read().await.clone();
|
||||
let map_v6 = self.proxy_map_v6.read().await.clone();
|
||||
let rebuilt = Self::build_endpoint_dc_map_from_maps(&map_v4, &map_v6);
|
||||
let preferred = Self::build_preferred_endpoints_by_dc(&self.decision, &map_v4, &map_v6);
|
||||
*self.endpoint_dc_map.write().await = rebuilt;
|
||||
*self.preferred_endpoints_by_dc.write().await = preferred;
|
||||
}
|
||||
|
||||
pub(super) async fn preferred_endpoints_for_dc(&self, dc: i32) -> Vec<SocketAddr> {
|
||||
let guard = self.preferred_endpoints_by_dc.read().await;
|
||||
guard.get(&dc).cloned().unwrap_or_default()
|
||||
}
|
||||
|
||||
pub(super) fn health_interval_unhealthy(&self) -> Duration {
|
||||
Duration::from_millis(self.me_health_interval_ms_unhealthy.load(Ordering::Relaxed).max(1))
|
||||
}
|
||||
|
||||
pub(super) fn health_interval_healthy(&self) -> Duration {
|
||||
Duration::from_millis(self.me_health_interval_ms_healthy.load(Ordering::Relaxed).max(1))
|
||||
}
|
||||
|
||||
pub(super) fn warn_rate_limit_duration(&self) -> Duration {
|
||||
Duration::from_millis(self.me_warn_rate_limit_ms.load(Ordering::Relaxed).max(1))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::Ordering;
|
||||
@@ -113,10 +113,35 @@ impl MePool {
|
||||
contour: WriterContour,
|
||||
allow_coverage_override: bool,
|
||||
) -> bool {
|
||||
let candidates = self.connectable_endpoints(endpoints).await;
|
||||
let mut candidates = self.connectable_endpoints(endpoints).await;
|
||||
if candidates.is_empty() {
|
||||
return false;
|
||||
}
|
||||
if candidates.len() > 1 {
|
||||
let mut active_by_endpoint = HashMap::<SocketAddr, usize>::new();
|
||||
let ws = self.writers.read().await;
|
||||
for writer in ws.iter() {
|
||||
if writer.draining.load(Ordering::Relaxed) {
|
||||
continue;
|
||||
}
|
||||
if writer.writer_dc != dc {
|
||||
continue;
|
||||
}
|
||||
if !matches!(
|
||||
super::pool::WriterContour::from_u8(
|
||||
writer.contour.load(Ordering::Relaxed),
|
||||
),
|
||||
super::pool::WriterContour::Active
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
if candidates.contains(&writer.addr) {
|
||||
*active_by_endpoint.entry(writer.addr).or_insert(0) += 1;
|
||||
}
|
||||
}
|
||||
drop(ws);
|
||||
candidates.sort_by_key(|addr| (active_by_endpoint.get(addr).copied().unwrap_or(0), *addr));
|
||||
}
|
||||
let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % candidates.len();
|
||||
for offset in 0..candidates.len() {
|
||||
let idx = (start + offset) % candidates.len();
|
||||
|
||||
@@ -25,6 +25,7 @@ pub(crate) struct MeApiWriterStatusSnapshot {
|
||||
pub(crate) struct MeApiDcStatusSnapshot {
|
||||
pub dc: i16,
|
||||
pub endpoints: Vec<SocketAddr>,
|
||||
pub endpoint_writers: Vec<MeApiDcEndpointWriterSnapshot>,
|
||||
pub available_endpoints: usize,
|
||||
pub available_pct: f64,
|
||||
pub required_writers: usize,
|
||||
@@ -38,6 +39,12 @@ pub(crate) struct MeApiDcStatusSnapshot {
|
||||
pub load: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct MeApiDcEndpointWriterSnapshot {
|
||||
pub endpoint: SocketAddr,
|
||||
pub active_writers: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct MeApiStatusSnapshot {
|
||||
pub generated_at_epoch_secs: u64,
|
||||
@@ -118,6 +125,8 @@ pub(crate) struct MeApiRuntimeSnapshot {
|
||||
pub me_single_endpoint_outage_backoff_max_ms: u64,
|
||||
pub me_single_endpoint_shadow_rotate_every_secs: u64,
|
||||
pub me_deterministic_writer_sort: bool,
|
||||
pub me_writer_pick_mode: &'static str,
|
||||
pub me_writer_pick_sample_size: u8,
|
||||
pub me_socks_kdf_policy: &'static str,
|
||||
pub quarantined_endpoints: Vec<MeApiQuarantinedEndpointSnapshot>,
|
||||
pub network_path: Vec<MeApiDcPathSnapshot>,
|
||||
@@ -338,6 +347,16 @@ impl MePool {
|
||||
|
||||
dcs.push(MeApiDcStatusSnapshot {
|
||||
dc,
|
||||
endpoint_writers: endpoints
|
||||
.iter()
|
||||
.map(|endpoint| MeApiDcEndpointWriterSnapshot {
|
||||
endpoint: *endpoint,
|
||||
active_writers: live_writers_by_dc_endpoint
|
||||
.get(&(dc, *endpoint))
|
||||
.copied()
|
||||
.unwrap_or(0),
|
||||
})
|
||||
.collect(),
|
||||
endpoints: endpoints.into_iter().collect(),
|
||||
available_endpoints: dc_available_endpoints,
|
||||
available_pct: ratio_pct(dc_available_endpoints, endpoint_count),
|
||||
@@ -522,6 +541,8 @@ impl MePool {
|
||||
me_deterministic_writer_sort: self
|
||||
.me_deterministic_writer_sort
|
||||
.load(Ordering::Relaxed),
|
||||
me_writer_pick_mode: writer_pick_mode_label(self.writer_pick_mode()),
|
||||
me_writer_pick_sample_size: self.writer_pick_sample_size() as u8,
|
||||
me_socks_kdf_policy: socks_kdf_policy_label(self.socks_kdf_policy()),
|
||||
quarantined_endpoints,
|
||||
network_path,
|
||||
@@ -570,6 +591,13 @@ fn bind_stale_mode_label(mode: MeBindStaleMode) -> &'static str {
|
||||
}
|
||||
}
|
||||
|
||||
fn writer_pick_mode_label(mode: crate::config::MeWriterPickMode) -> &'static str {
|
||||
match mode {
|
||||
crate::config::MeWriterPickMode::SortedRr => "sorted_rr",
|
||||
crate::config::MeWriterPickMode::P2c => "p2c",
|
||||
}
|
||||
}
|
||||
|
||||
fn socks_kdf_policy_label(policy: MeSocksKdfPolicy) -> &'static str {
|
||||
match policy {
|
||||
MeSocksKdfPolicy::Strict => "strict",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::io::ErrorKind;
|
||||
|
||||
@@ -128,11 +128,12 @@ impl MePool {
|
||||
let contour = Arc::new(AtomicU8::new(contour.as_u8()));
|
||||
let cancel = CancellationToken::new();
|
||||
let degraded = Arc::new(AtomicBool::new(false));
|
||||
let rtt_ema_ms_x10 = Arc::new(AtomicU32::new(0));
|
||||
let draining = Arc::new(AtomicBool::new(false));
|
||||
let draining_started_at_epoch_secs = Arc::new(AtomicU64::new(0));
|
||||
let drain_deadline_epoch_secs = Arc::new(AtomicU64::new(0));
|
||||
let allow_drain_fallback = Arc::new(AtomicBool::new(false));
|
||||
let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
|
||||
let (tx, mut rx) = mpsc::channel::<WriterCommand>(self.writer_cmd_channel_capacity);
|
||||
let mut rpc_writer = RpcWriter {
|
||||
writer: hs.wr,
|
||||
key: hs.write_key,
|
||||
@@ -169,6 +170,7 @@ impl MePool {
|
||||
tx: tx.clone(),
|
||||
cancel: cancel.clone(),
|
||||
degraded: degraded.clone(),
|
||||
rtt_ema_ms_x10: rtt_ema_ms_x10.clone(),
|
||||
draining: draining.clone(),
|
||||
draining_started_at_epoch_secs: draining_started_at_epoch_secs.clone(),
|
||||
drain_deadline_epoch_secs: drain_deadline_epoch_secs.clone(),
|
||||
@@ -222,6 +224,7 @@ impl MePool {
|
||||
stats_reader,
|
||||
writer_id,
|
||||
degraded.clone(),
|
||||
rtt_ema_ms_x10.clone(),
|
||||
cancel_reader_token.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io::ErrorKind;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
|
||||
use std::time::Instant;
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
@@ -34,6 +34,7 @@ pub(crate) async fn reader_loop(
|
||||
stats: Arc<Stats>,
|
||||
_writer_id: u64,
|
||||
degraded: Arc<AtomicBool>,
|
||||
writer_rtt_ema_ms_x10: Arc<AtomicU32>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<()> {
|
||||
let mut raw = enc_leftover;
|
||||
@@ -208,6 +209,8 @@ pub(crate) async fn reader_loop(
|
||||
}
|
||||
let degraded_now = entry.1 > entry.0 * 2.0;
|
||||
degraded.store(degraded_now, Ordering::Relaxed);
|
||||
writer_rtt_ema_ms_x10
|
||||
.store((entry.1 * 10.0).clamp(0.0, u32::MAX as f64) as u32, Ordering::Relaxed);
|
||||
trace!(writer_id = wid, rtt_ms = rtt, ema_ms = entry.1, base_ms = entry.0, degraded = degraded_now, "ME RTT sample");
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -9,7 +9,6 @@ use tokio::sync::mpsc::error::TrySendError;
|
||||
use super::codec::WriterCommand;
|
||||
use super::MeResponse;
|
||||
|
||||
const ROUTE_CHANNEL_CAPACITY: usize = 4096;
|
||||
const ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS: u64 = 25;
|
||||
const ROUTE_BACKPRESSURE_HIGH_TIMEOUT_MS: u64 = 120;
|
||||
const ROUTE_BACKPRESSURE_HIGH_WATERMARK_PCT: u8 = 80;
|
||||
@@ -78,6 +77,7 @@ impl RegistryInner {
|
||||
pub struct ConnRegistry {
|
||||
inner: RwLock<RegistryInner>,
|
||||
next_id: AtomicU64,
|
||||
route_channel_capacity: usize,
|
||||
route_backpressure_base_timeout_ms: AtomicU64,
|
||||
route_backpressure_high_timeout_ms: AtomicU64,
|
||||
route_backpressure_high_watermark_pct: AtomicU8,
|
||||
@@ -91,11 +91,12 @@ impl ConnRegistry {
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
pub fn new() -> Self {
|
||||
pub fn with_route_channel_capacity(route_channel_capacity: usize) -> Self {
|
||||
let start = rand::random::<u64>() | 1;
|
||||
Self {
|
||||
inner: RwLock::new(RegistryInner::new()),
|
||||
next_id: AtomicU64::new(start),
|
||||
route_channel_capacity: route_channel_capacity.max(1),
|
||||
route_backpressure_base_timeout_ms: AtomicU64::new(
|
||||
ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS,
|
||||
),
|
||||
@@ -108,6 +109,11 @@ impl ConnRegistry {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn new() -> Self {
|
||||
Self::with_route_channel_capacity(4096)
|
||||
}
|
||||
|
||||
pub fn update_route_backpressure_policy(
|
||||
&self,
|
||||
base_timeout_ms: u64,
|
||||
@@ -127,7 +133,7 @@ impl ConnRegistry {
|
||||
|
||||
pub async fn register(&self) -> (u64, mpsc::Receiver<MeResponse>) {
|
||||
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
|
||||
let (tx, rx) = mpsc::channel(ROUTE_CHANNEL_CAPACITY);
|
||||
let (tx, rx) = mpsc::channel(self.route_channel_capacity);
|
||||
self.inner.write().await.map.insert(id, tx);
|
||||
(id, rx)
|
||||
}
|
||||
@@ -179,11 +185,11 @@ impl ConnRegistry {
|
||||
.route_backpressure_high_watermark_pct
|
||||
.load(Ordering::Relaxed)
|
||||
.clamp(1, 100);
|
||||
let used = ROUTE_CHANNEL_CAPACITY.saturating_sub(tx.capacity());
|
||||
let used_pct = if ROUTE_CHANNEL_CAPACITY == 0 {
|
||||
let used = self.route_channel_capacity.saturating_sub(tx.capacity());
|
||||
let used_pct = if self.route_channel_capacity == 0 {
|
||||
100
|
||||
} else {
|
||||
(used.saturating_mul(100) / ROUTE_CHANNEL_CAPACITY) as u8
|
||||
(used.saturating_mul(100) / self.route_channel_capacity) as u8
|
||||
};
|
||||
let high_profile = used_pct >= high_watermark_pct;
|
||||
let timeout_ms = if high_profile {
|
||||
|
||||
@@ -9,7 +9,7 @@ use bytes::Bytes;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use crate::config::MeRouteNoWriterMode;
|
||||
use crate::config::{MeRouteNoWriterMode, MeWriterPickMode};
|
||||
use crate::error::{ProxyError, Result};
|
||||
use crate::network::IpFamily;
|
||||
use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32};
|
||||
@@ -24,6 +24,10 @@ use super::registry::ConnMeta;
|
||||
const IDLE_WRITER_PENALTY_MID_SECS: u64 = 45;
|
||||
const IDLE_WRITER_PENALTY_HIGH_SECS: u64 = 55;
|
||||
const HYBRID_GLOBAL_BURST_PERIOD_ROUNDS: u32 = 4;
|
||||
const PICK_PENALTY_WARM: u64 = 200;
|
||||
const PICK_PENALTY_DRAINING: u64 = 600;
|
||||
const PICK_PENALTY_STALE: u64 = 300;
|
||||
const PICK_PENALTY_DEGRADED: u64 = 250;
|
||||
|
||||
impl MePool {
|
||||
/// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default.
|
||||
@@ -181,6 +185,7 @@ impl MePool {
|
||||
.await;
|
||||
}
|
||||
if candidate_indices.is_empty() {
|
||||
let pick_mode = self.writer_pick_mode();
|
||||
match no_writer_mode {
|
||||
MeRouteNoWriterMode::AsyncRecoveryFailfast => {
|
||||
let deadline = *no_writer_deadline.get_or_insert_with(|| {
|
||||
@@ -196,6 +201,7 @@ impl MePool {
|
||||
if self.wait_for_candidate_until(routed_dc, deadline).await {
|
||||
continue;
|
||||
}
|
||||
self.stats.increment_me_writer_pick_no_candidate_total(pick_mode);
|
||||
self.stats.increment_me_no_writer_failfast_total();
|
||||
return Err(ProxyError::Proxy(
|
||||
"No ME writers available for target DC in failfast window".into(),
|
||||
@@ -209,10 +215,12 @@ impl MePool {
|
||||
if self.wait_for_candidate_until(routed_dc, deadline).await {
|
||||
continue;
|
||||
}
|
||||
self.stats.increment_me_writer_pick_no_candidate_total(pick_mode);
|
||||
self.stats.increment_me_no_writer_failfast_total();
|
||||
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
|
||||
}
|
||||
if emergency_attempts >= self.me_route_inline_recovery_attempts.max(1) {
|
||||
self.stats.increment_me_writer_pick_no_candidate_total(pick_mode);
|
||||
self.stats.increment_me_no_writer_failfast_total();
|
||||
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
|
||||
}
|
||||
@@ -237,6 +245,7 @@ impl MePool {
|
||||
.await;
|
||||
}
|
||||
if candidate_indices.is_empty() {
|
||||
self.stats.increment_me_writer_pick_no_candidate_total(pick_mode);
|
||||
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
|
||||
}
|
||||
}
|
||||
@@ -259,6 +268,8 @@ impl MePool {
|
||||
}
|
||||
}
|
||||
hybrid_wait_current = hybrid_wait_step;
|
||||
let pick_mode = self.writer_pick_mode();
|
||||
let pick_sample_size = self.writer_pick_sample_size();
|
||||
let writer_ids: Vec<u64> = candidate_indices
|
||||
.iter()
|
||||
.map(|idx| writers_snapshot[*idx].id)
|
||||
@@ -268,69 +279,84 @@ impl MePool {
|
||||
.writer_idle_since_for_writer_ids(&writer_ids)
|
||||
.await;
|
||||
let now_epoch_secs = Self::now_epoch_secs();
|
||||
|
||||
if self.me_deterministic_writer_sort.load(Ordering::Relaxed) {
|
||||
candidate_indices.sort_by(|lhs, rhs| {
|
||||
let left = &writers_snapshot[*lhs];
|
||||
let right = &writers_snapshot[*rhs];
|
||||
let left_key = (
|
||||
self.writer_contour_rank_for_selection(left),
|
||||
(left.generation < self.current_generation()) as usize,
|
||||
left.degraded.load(Ordering::Relaxed) as usize,
|
||||
self.writer_idle_rank_for_selection(
|
||||
left,
|
||||
&writer_idle_since,
|
||||
now_epoch_secs,
|
||||
),
|
||||
Reverse(left.tx.capacity()),
|
||||
left.addr,
|
||||
left.id,
|
||||
);
|
||||
let right_key = (
|
||||
self.writer_contour_rank_for_selection(right),
|
||||
(right.generation < self.current_generation()) as usize,
|
||||
right.degraded.load(Ordering::Relaxed) as usize,
|
||||
self.writer_idle_rank_for_selection(
|
||||
right,
|
||||
&writer_idle_since,
|
||||
now_epoch_secs,
|
||||
),
|
||||
Reverse(right.tx.capacity()),
|
||||
right.addr,
|
||||
right.id,
|
||||
);
|
||||
left_key.cmp(&right_key)
|
||||
});
|
||||
} else {
|
||||
candidate_indices.sort_by_key(|idx| {
|
||||
let w = &writers_snapshot[*idx];
|
||||
let degraded = w.degraded.load(Ordering::Relaxed);
|
||||
let stale = (w.generation < self.current_generation()) as usize;
|
||||
(
|
||||
self.writer_contour_rank_for_selection(w),
|
||||
stale,
|
||||
degraded as usize,
|
||||
self.writer_idle_rank_for_selection(
|
||||
w,
|
||||
&writer_idle_since,
|
||||
now_epoch_secs,
|
||||
),
|
||||
Reverse(w.tx.capacity()),
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
let start = self.rr.fetch_add(1, Ordering::Relaxed) as usize % candidate_indices.len();
|
||||
let ordered_candidate_indices = if pick_mode == MeWriterPickMode::P2c {
|
||||
self.p2c_ordered_candidate_indices(
|
||||
&candidate_indices,
|
||||
&writers_snapshot,
|
||||
&writer_idle_since,
|
||||
now_epoch_secs,
|
||||
start,
|
||||
pick_sample_size,
|
||||
)
|
||||
} else {
|
||||
if self.me_deterministic_writer_sort.load(Ordering::Relaxed) {
|
||||
candidate_indices.sort_by(|lhs, rhs| {
|
||||
let left = &writers_snapshot[*lhs];
|
||||
let right = &writers_snapshot[*rhs];
|
||||
let left_key = (
|
||||
self.writer_contour_rank_for_selection(left),
|
||||
(left.generation < self.current_generation()) as usize,
|
||||
left.degraded.load(Ordering::Relaxed) as usize,
|
||||
self.writer_idle_rank_for_selection(
|
||||
left,
|
||||
&writer_idle_since,
|
||||
now_epoch_secs,
|
||||
),
|
||||
Reverse(left.tx.capacity()),
|
||||
left.addr,
|
||||
left.id,
|
||||
);
|
||||
let right_key = (
|
||||
self.writer_contour_rank_for_selection(right),
|
||||
(right.generation < self.current_generation()) as usize,
|
||||
right.degraded.load(Ordering::Relaxed) as usize,
|
||||
self.writer_idle_rank_for_selection(
|
||||
right,
|
||||
&writer_idle_since,
|
||||
now_epoch_secs,
|
||||
),
|
||||
Reverse(right.tx.capacity()),
|
||||
right.addr,
|
||||
right.id,
|
||||
);
|
||||
left_key.cmp(&right_key)
|
||||
});
|
||||
} else {
|
||||
candidate_indices.sort_by_key(|idx| {
|
||||
let w = &writers_snapshot[*idx];
|
||||
let degraded = w.degraded.load(Ordering::Relaxed);
|
||||
let stale = (w.generation < self.current_generation()) as usize;
|
||||
(
|
||||
self.writer_contour_rank_for_selection(w),
|
||||
stale,
|
||||
degraded as usize,
|
||||
self.writer_idle_rank_for_selection(
|
||||
w,
|
||||
&writer_idle_since,
|
||||
now_epoch_secs,
|
||||
),
|
||||
Reverse(w.tx.capacity()),
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
let mut ordered = Vec::<usize>::with_capacity(candidate_indices.len());
|
||||
for offset in 0..candidate_indices.len() {
|
||||
ordered.push(candidate_indices[(start + offset) % candidate_indices.len()]);
|
||||
}
|
||||
ordered
|
||||
};
|
||||
let mut fallback_blocking_idx: Option<usize> = None;
|
||||
|
||||
for offset in 0..candidate_indices.len() {
|
||||
let idx = candidate_indices[(start + offset) % candidate_indices.len()];
|
||||
for idx in ordered_candidate_indices {
|
||||
let w = &writers_snapshot[idx];
|
||||
if !self.writer_accepts_new_binding(w) {
|
||||
continue;
|
||||
}
|
||||
match w.tx.try_send(WriterCommand::Data(payload.clone())) {
|
||||
Ok(()) => {
|
||||
self.stats.increment_me_writer_pick_success_try_total(pick_mode);
|
||||
self.registry
|
||||
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
|
||||
.await;
|
||||
@@ -352,6 +378,7 @@ impl MePool {
|
||||
}
|
||||
}
|
||||
Err(TrySendError::Closed(_)) => {
|
||||
self.stats.increment_me_writer_pick_closed_total(pick_mode);
|
||||
warn!(writer_id = w.id, "ME writer channel closed");
|
||||
self.remove_writer_and_close_clients(w.id).await;
|
||||
continue;
|
||||
@@ -360,15 +387,20 @@ impl MePool {
|
||||
}
|
||||
|
||||
let Some(blocking_idx) = fallback_blocking_idx else {
|
||||
self.stats.increment_me_writer_pick_full_total(pick_mode);
|
||||
continue;
|
||||
};
|
||||
|
||||
let w = writers_snapshot[blocking_idx].clone();
|
||||
if !self.writer_accepts_new_binding(&w) {
|
||||
self.stats.increment_me_writer_pick_full_total(pick_mode);
|
||||
continue;
|
||||
}
|
||||
self.stats.increment_me_writer_pick_blocking_fallback_total();
|
||||
match w.tx.send(WriterCommand::Data(payload.clone())).await {
|
||||
Ok(()) => {
|
||||
self.stats
|
||||
.increment_me_writer_pick_success_fallback_total(pick_mode);
|
||||
self.registry
|
||||
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
|
||||
.await;
|
||||
@@ -378,6 +410,7 @@ impl MePool {
|
||||
return Ok(());
|
||||
}
|
||||
Err(_) => {
|
||||
self.stats.increment_me_writer_pick_closed_total(pick_mode);
|
||||
warn!(writer_id = w.id, "ME writer channel closed (blocking)");
|
||||
self.remove_writer_and_close_clients(w.id).await;
|
||||
}
|
||||
@@ -480,31 +513,7 @@ impl MePool {
|
||||
}
|
||||
|
||||
async fn endpoint_candidates_for_target_dc(&self, routed_dc: i32) -> Vec<SocketAddr> {
|
||||
let mut preferred = Vec::<SocketAddr>::new();
|
||||
let mut seen = HashSet::<SocketAddr>::new();
|
||||
|
||||
for family in self.family_order() {
|
||||
let map_guard = match family {
|
||||
IpFamily::V4 => self.proxy_map_v4.read().await,
|
||||
IpFamily::V6 => self.proxy_map_v6.read().await,
|
||||
};
|
||||
let mut family_selected = Vec::<SocketAddr>::new();
|
||||
if let Some(addrs) = map_guard.get(&routed_dc) {
|
||||
for (ip, port) in addrs {
|
||||
family_selected.push(SocketAddr::new(*ip, *port));
|
||||
}
|
||||
}
|
||||
for addr in family_selected {
|
||||
if seen.insert(addr) {
|
||||
preferred.push(addr);
|
||||
}
|
||||
}
|
||||
if !preferred.is_empty() && !self.decision.effective_multipath {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
preferred
|
||||
self.preferred_endpoints_for_dc(routed_dc).await
|
||||
}
|
||||
|
||||
async fn maybe_trigger_hybrid_recovery(
|
||||
@@ -591,28 +600,7 @@ impl MePool {
|
||||
routed_dc: i32,
|
||||
include_warm: bool,
|
||||
) -> Vec<usize> {
|
||||
let mut preferred = HashSet::<SocketAddr>::new();
|
||||
|
||||
for family in self.family_order() {
|
||||
let map_guard = match family {
|
||||
IpFamily::V4 => self.proxy_map_v4.read().await,
|
||||
IpFamily::V6 => self.proxy_map_v6.read().await,
|
||||
};
|
||||
let mut family_selected = Vec::<SocketAddr>::new();
|
||||
if let Some(v) = map_guard.get(&routed_dc) {
|
||||
family_selected.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
|
||||
}
|
||||
for endpoint in family_selected {
|
||||
preferred.insert(endpoint);
|
||||
}
|
||||
|
||||
drop(map_guard);
|
||||
|
||||
if !preferred.is_empty() && !self.decision.effective_multipath {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let preferred = self.preferred_endpoints_for_dc(routed_dc).await;
|
||||
if preferred.is_empty() {
|
||||
return Vec::new();
|
||||
}
|
||||
@@ -622,7 +610,7 @@ impl MePool {
|
||||
if !self.writer_eligible_for_selection(w, include_warm) {
|
||||
continue;
|
||||
}
|
||||
if w.writer_dc == routed_dc && preferred.contains(&w.addr) {
|
||||
if w.writer_dc == routed_dc && preferred.iter().any(|endpoint| *endpoint == w.addr) {
|
||||
out.push(idx);
|
||||
}
|
||||
}
|
||||
@@ -671,4 +659,87 @@ impl MePool {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
fn writer_pick_score(
|
||||
&self,
|
||||
writer: &super::pool::MeWriter,
|
||||
idle_since_by_writer: &HashMap<u64, u64>,
|
||||
now_epoch_secs: u64,
|
||||
) -> u64 {
|
||||
let contour_penalty = match WriterContour::from_u8(writer.contour.load(Ordering::Relaxed)) {
|
||||
WriterContour::Active => 0,
|
||||
WriterContour::Warm => PICK_PENALTY_WARM,
|
||||
WriterContour::Draining => PICK_PENALTY_DRAINING,
|
||||
};
|
||||
let stale_penalty = if writer.generation < self.current_generation() {
|
||||
PICK_PENALTY_STALE
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let degraded_penalty = if writer.degraded.load(Ordering::Relaxed) {
|
||||
PICK_PENALTY_DEGRADED
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let idle_penalty =
|
||||
(self.writer_idle_rank_for_selection(writer, idle_since_by_writer, now_epoch_secs) as u64)
|
||||
* 100;
|
||||
let queue_cap = self.writer_cmd_channel_capacity.max(1) as u64;
|
||||
let queue_remaining = writer.tx.capacity() as u64;
|
||||
let queue_used = queue_cap.saturating_sub(queue_remaining.min(queue_cap));
|
||||
let queue_util_pct = queue_used.saturating_mul(100) / queue_cap;
|
||||
let queue_penalty = queue_util_pct.saturating_mul(4);
|
||||
let rtt_penalty = ((writer.rtt_ema_ms_x10.load(Ordering::Relaxed) as u64).saturating_add(5) / 10)
|
||||
.min(400);
|
||||
|
||||
contour_penalty
|
||||
.saturating_add(stale_penalty)
|
||||
.saturating_add(degraded_penalty)
|
||||
.saturating_add(idle_penalty)
|
||||
.saturating_add(queue_penalty)
|
||||
.saturating_add(rtt_penalty)
|
||||
}
|
||||
|
||||
fn p2c_ordered_candidate_indices(
|
||||
&self,
|
||||
candidate_indices: &[usize],
|
||||
writers_snapshot: &[super::pool::MeWriter],
|
||||
idle_since_by_writer: &HashMap<u64, u64>,
|
||||
now_epoch_secs: u64,
|
||||
start: usize,
|
||||
sample_size: usize,
|
||||
) -> Vec<usize> {
|
||||
let total = candidate_indices.len();
|
||||
if total == 0 {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let mut sampled = Vec::<usize>::with_capacity(sample_size.min(total));
|
||||
let mut seen = HashSet::<usize>::with_capacity(total);
|
||||
for offset in 0..sample_size.min(total) {
|
||||
let idx = candidate_indices[(start + offset) % total];
|
||||
if seen.insert(idx) {
|
||||
sampled.push(idx);
|
||||
}
|
||||
}
|
||||
|
||||
sampled.sort_by_key(|idx| {
|
||||
let writer = &writers_snapshot[*idx];
|
||||
(
|
||||
self.writer_pick_score(writer, idle_since_by_writer, now_epoch_secs),
|
||||
writer.addr,
|
||||
writer.id,
|
||||
)
|
||||
});
|
||||
|
||||
let mut ordered = Vec::<usize>::with_capacity(total);
|
||||
ordered.extend(sampled.iter().copied());
|
||||
for offset in 0..total {
|
||||
let idx = candidate_indices[(start + offset) % total];
|
||||
if seen.insert(idx) {
|
||||
ordered.push(idx);
|
||||
}
|
||||
}
|
||||
ordered
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,16 @@
|
||||
[Unit]
|
||||
Description=Telemt
|
||||
After=network.target
|
||||
After=network-online.target
|
||||
Wants=network-online.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
WorkingDirectory=/bin
|
||||
WorkingDirectory=/etc/telemt
|
||||
ExecStart=/bin/telemt /etc/telemt.toml
|
||||
Restart=on-failure
|
||||
LimitNOFILE=65536
|
||||
LimitNOFILE=262144
|
||||
TasksMax=8192
|
||||
MemoryAccounting=yes
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
|
||||
Reference in New Issue
Block a user