mirror of https://github.com/telemt/telemt.git
Performance improvements
This commit is contained in:
parent
fb0f75df43
commit
09bdafa718
|
|
@ -129,6 +129,10 @@ pub(crate) fn default_unknown_dc_log_path() -> Option<String> {
|
|||
Some("unknown-dc.txt".to_string())
|
||||
}
|
||||
|
||||
pub(crate) fn default_unknown_dc_file_log_enabled() -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
pub(crate) fn default_pool_size() -> usize {
|
||||
8
|
||||
}
|
||||
|
|
@ -273,6 +277,18 @@ pub(crate) fn default_me_route_backpressure_high_watermark_pct() -> u8 {
|
|||
80
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_route_no_writer_wait_ms() -> u64 {
|
||||
250
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_route_inline_recovery_attempts() -> u32 {
|
||||
3
|
||||
}
|
||||
|
||||
pub(crate) fn default_me_route_inline_recovery_wait_ms() -> u64 {
|
||||
3000
|
||||
}
|
||||
|
||||
pub(crate) fn default_beobachten_minutes() -> u64 {
|
||||
10
|
||||
}
|
||||
|
|
|
|||
|
|
@ -381,6 +381,22 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
|
|||
warned = true;
|
||||
warn!("config reload: general.middle_proxy_pool_size changed; restart required");
|
||||
}
|
||||
if old.general.me_route_no_writer_mode != new.general.me_route_no_writer_mode
|
||||
|| old.general.me_route_no_writer_wait_ms != new.general.me_route_no_writer_wait_ms
|
||||
|| old.general.me_route_inline_recovery_attempts
|
||||
!= new.general.me_route_inline_recovery_attempts
|
||||
|| old.general.me_route_inline_recovery_wait_ms
|
||||
!= new.general.me_route_inline_recovery_wait_ms
|
||||
{
|
||||
warned = true;
|
||||
warn!("config reload: general.me_route_no_writer_* changed; restart required");
|
||||
}
|
||||
if old.general.unknown_dc_log_path != new.general.unknown_dc_log_path
|
||||
|| old.general.unknown_dc_file_log_enabled != new.general.unknown_dc_file_log_enabled
|
||||
{
|
||||
warned = true;
|
||||
warn!("config reload: general.unknown_dc_* changed; restart required");
|
||||
}
|
||||
if old.general.me_init_retry_attempts != new.general.me_init_retry_attempts {
|
||||
warned = true;
|
||||
warn!("config reload: general.me_init_retry_attempts changed; restart required");
|
||||
|
|
|
|||
|
|
@ -410,6 +410,24 @@ impl ProxyConfig {
|
|||
));
|
||||
}
|
||||
|
||||
if !(10..=5000).contains(&config.general.me_route_no_writer_wait_ms) {
|
||||
return Err(ProxyError::Config(
|
||||
"general.me_route_no_writer_wait_ms must be within [10, 5000]".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if config.general.me_route_inline_recovery_attempts == 0 {
|
||||
return Err(ProxyError::Config(
|
||||
"general.me_route_inline_recovery_attempts must be > 0".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if !(10..=30000).contains(&config.general.me_route_inline_recovery_wait_ms) {
|
||||
return Err(ProxyError::Config(
|
||||
"general.me_route_inline_recovery_wait_ms must be within [10, 30000]".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if config.server.api.request_body_limit_bytes == 0 {
|
||||
return Err(ProxyError::Config(
|
||||
"server.api.request_body_limit_bytes must be > 0".to_string(),
|
||||
|
|
@ -1206,6 +1224,49 @@ mod tests {
|
|||
let _ = std::fs::remove_file(path_valid);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn me_route_no_writer_wait_ms_out_of_range_is_rejected() {
|
||||
let toml = r#"
|
||||
[general]
|
||||
me_route_no_writer_wait_ms = 5
|
||||
|
||||
[censorship]
|
||||
tls_domain = "example.com"
|
||||
|
||||
[access.users]
|
||||
user = "00000000000000000000000000000000"
|
||||
"#;
|
||||
let dir = std::env::temp_dir();
|
||||
let path = dir.join("telemt_me_route_no_writer_wait_ms_out_of_range_test.toml");
|
||||
std::fs::write(&path, toml).unwrap();
|
||||
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||
assert!(err.contains("general.me_route_no_writer_wait_ms must be within [10, 5000]"));
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn me_route_no_writer_mode_is_parsed() {
|
||||
let toml = r#"
|
||||
[general]
|
||||
me_route_no_writer_mode = "inline_recovery_legacy"
|
||||
|
||||
[censorship]
|
||||
tls_domain = "example.com"
|
||||
|
||||
[access.users]
|
||||
user = "00000000000000000000000000000000"
|
||||
"#;
|
||||
let dir = std::env::temp_dir();
|
||||
let path = dir.join("telemt_me_route_no_writer_mode_parse_test.toml");
|
||||
std::fs::write(&path, toml).unwrap();
|
||||
let cfg = ProxyConfig::load(&path).unwrap();
|
||||
assert_eq!(
|
||||
cfg.general.me_route_no_writer_mode,
|
||||
crate::config::MeRouteNoWriterMode::InlineRecoveryLegacy
|
||||
);
|
||||
let _ = std::fs::remove_file(path);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn me_hardswap_warmup_defaults_are_set() {
|
||||
let toml = r#"
|
||||
|
|
|
|||
|
|
@ -183,6 +183,31 @@ impl MeFloorMode {
|
|||
}
|
||||
}
|
||||
|
||||
/// Middle-End route behavior when no writer is immediately available.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum MeRouteNoWriterMode {
|
||||
#[default]
|
||||
AsyncRecoveryFailfast,
|
||||
InlineRecoveryLegacy,
|
||||
}
|
||||
|
||||
impl MeRouteNoWriterMode {
|
||||
pub fn as_u8(self) -> u8 {
|
||||
match self {
|
||||
MeRouteNoWriterMode::AsyncRecoveryFailfast => 0,
|
||||
MeRouteNoWriterMode::InlineRecoveryLegacy => 1,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_u8(raw: u8) -> Self {
|
||||
match raw {
|
||||
1 => MeRouteNoWriterMode::InlineRecoveryLegacy,
|
||||
_ => MeRouteNoWriterMode::AsyncRecoveryFailfast,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-user unique source IP limit mode.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
|
|
@ -511,6 +536,10 @@ pub struct GeneralConfig {
|
|||
#[serde(default = "default_unknown_dc_log_path")]
|
||||
pub unknown_dc_log_path: Option<String>,
|
||||
|
||||
/// Enable unknown-DC file logging.
|
||||
#[serde(default = "default_unknown_dc_file_log_enabled")]
|
||||
pub unknown_dc_file_log_enabled: bool,
|
||||
|
||||
#[serde(default)]
|
||||
pub log_level: LogLevel,
|
||||
|
||||
|
|
@ -538,6 +567,22 @@ pub struct GeneralConfig {
|
|||
#[serde(default = "default_me_route_backpressure_high_watermark_pct")]
|
||||
pub me_route_backpressure_high_watermark_pct: u8,
|
||||
|
||||
/// ME route behavior when no writer is immediately available.
|
||||
#[serde(default)]
|
||||
pub me_route_no_writer_mode: MeRouteNoWriterMode,
|
||||
|
||||
/// Maximum wait time in milliseconds for async-recovery failfast mode.
|
||||
#[serde(default = "default_me_route_no_writer_wait_ms")]
|
||||
pub me_route_no_writer_wait_ms: u64,
|
||||
|
||||
/// Number of inline recovery attempts in legacy mode.
|
||||
#[serde(default = "default_me_route_inline_recovery_attempts")]
|
||||
pub me_route_inline_recovery_attempts: u32,
|
||||
|
||||
/// Maximum wait time in milliseconds for inline recovery in legacy mode.
|
||||
#[serde(default = "default_me_route_inline_recovery_wait_ms")]
|
||||
pub me_route_inline_recovery_wait_ms: u64,
|
||||
|
||||
/// [general.links] — proxy link generation overrides.
|
||||
#[serde(default)]
|
||||
pub links: LinksConfig,
|
||||
|
|
@ -719,6 +764,7 @@ impl Default for GeneralConfig {
|
|||
upstream_connect_failfast_hard_errors: default_upstream_connect_failfast_hard_errors(),
|
||||
stun_iface_mismatch_ignore: false,
|
||||
unknown_dc_log_path: default_unknown_dc_log_path(),
|
||||
unknown_dc_file_log_enabled: default_unknown_dc_file_log_enabled(),
|
||||
log_level: LogLevel::Normal,
|
||||
disable_colors: false,
|
||||
telemetry: TelemetryConfig::default(),
|
||||
|
|
@ -726,6 +772,10 @@ impl Default for GeneralConfig {
|
|||
me_route_backpressure_base_timeout_ms: default_me_route_backpressure_base_timeout_ms(),
|
||||
me_route_backpressure_high_timeout_ms: default_me_route_backpressure_high_timeout_ms(),
|
||||
me_route_backpressure_high_watermark_pct: default_me_route_backpressure_high_watermark_pct(),
|
||||
me_route_no_writer_mode: MeRouteNoWriterMode::default(),
|
||||
me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(),
|
||||
me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(),
|
||||
me_route_inline_recovery_wait_ms: default_me_route_inline_recovery_wait_ms(),
|
||||
links: LinksConfig::default(),
|
||||
crypto_pending_buffer: default_crypto_pending_buffer(),
|
||||
max_client_frame: default_max_client_frame(),
|
||||
|
|
|
|||
|
|
@ -617,6 +617,10 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||
config.general.me_route_backpressure_base_timeout_ms,
|
||||
config.general.me_route_backpressure_high_timeout_ms,
|
||||
config.general.me_route_backpressure_high_watermark_pct,
|
||||
config.general.me_route_no_writer_mode,
|
||||
config.general.me_route_no_writer_wait_ms,
|
||||
config.general.me_route_inline_recovery_attempts,
|
||||
config.general.me_route_inline_recovery_wait_ms,
|
||||
);
|
||||
|
||||
match pool.init(pool_size, &rng).await {
|
||||
|
|
|
|||
|
|
@ -1199,6 +1199,48 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
|||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_no_writer_failfast_total ME route failfast errors due to missing writer in bounded wait window"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_me_no_writer_failfast_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_no_writer_failfast_total {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_no_writer_failfast_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_async_recovery_trigger_total Async ME recovery trigger attempts from route path"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_me_async_recovery_trigger_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_async_recovery_trigger_total {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_async_recovery_trigger_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_inline_recovery_total Legacy inline ME recovery attempts from route path"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_me_inline_recovery_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_inline_recovery_total {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_inline_recovery_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let unresolved_writer_losses = if me_allows_normal {
|
||||
stats
|
||||
|
|
@ -1237,6 +1279,29 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
|||
let _ = writeln!(out, "# TYPE telemt_user_msgs_from_client counter");
|
||||
let _ = writeln!(out, "# HELP telemt_user_msgs_to_client Per-user messages sent");
|
||||
let _ = writeln!(out, "# TYPE telemt_user_msgs_to_client counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_ip_reservation_rollback_total IP reservation rollbacks caused by later limit checks"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_ip_reservation_rollback_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_ip_reservation_rollback_total{{reason=\"tcp_limit\"}} {}",
|
||||
if core_enabled {
|
||||
stats.get_ip_reservation_rollback_tcp_limit_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_ip_reservation_rollback_total{{reason=\"quota_limit\"}} {}",
|
||||
if core_enabled {
|
||||
stats.get_ip_reservation_rollback_quota_limit_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag"
|
||||
|
|
|
|||
|
|
@ -672,42 +672,16 @@ impl RunningClientHandler {
|
|||
R: AsyncRead + Unpin + Send + 'static,
|
||||
W: AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
let user = &success.user;
|
||||
let user = success.user.clone();
|
||||
|
||||
if let Err(e) = Self::check_user_limits_static(user, &config, &stats, peer_addr, &ip_tracker).await {
|
||||
if let Err(e) = Self::check_user_limits_static(&user, &config, &stats, peer_addr, &ip_tracker).await {
|
||||
warn!(user = %user, error = %e, "User limit exceeded");
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
// IP Cleanup Guard: автоматически удаляет IP при выходе из scope
|
||||
struct IpCleanupGuard {
|
||||
tracker: Arc<UserIpTracker>,
|
||||
user: String,
|
||||
ip: std::net::IpAddr,
|
||||
}
|
||||
|
||||
impl Drop for IpCleanupGuard {
|
||||
fn drop(&mut self) {
|
||||
let tracker = self.tracker.clone();
|
||||
let user = self.user.clone();
|
||||
let ip = self.ip;
|
||||
tokio::spawn(async move {
|
||||
tracker.remove_ip(&user, ip).await;
|
||||
debug!(user = %user, ip = %ip, "IP cleaned up on disconnect");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let _cleanup = IpCleanupGuard {
|
||||
tracker: ip_tracker,
|
||||
user: user.clone(),
|
||||
ip: peer_addr.ip(),
|
||||
};
|
||||
|
||||
// Decide: middle proxy or direct
|
||||
if config.general.use_middle_proxy {
|
||||
let relay_result = if config.general.use_middle_proxy {
|
||||
if let Some(ref pool) = me_pool {
|
||||
return handle_via_middle_proxy(
|
||||
handle_via_middle_proxy(
|
||||
client_reader,
|
||||
client_writer,
|
||||
success,
|
||||
|
|
@ -718,23 +692,38 @@ impl RunningClientHandler {
|
|||
local_addr,
|
||||
rng,
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
} else {
|
||||
warn!("use_middle_proxy=true but MePool not initialized, falling back to direct");
|
||||
handle_via_direct(
|
||||
client_reader,
|
||||
client_writer,
|
||||
success,
|
||||
upstream_manager,
|
||||
stats,
|
||||
config,
|
||||
buffer_pool,
|
||||
rng,
|
||||
)
|
||||
.await
|
||||
}
|
||||
warn!("use_middle_proxy=true but MePool not initialized, falling back to direct");
|
||||
}
|
||||
} else {
|
||||
// Direct mode (original behavior)
|
||||
handle_via_direct(
|
||||
client_reader,
|
||||
client_writer,
|
||||
success,
|
||||
upstream_manager,
|
||||
stats,
|
||||
config,
|
||||
buffer_pool,
|
||||
rng,
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
||||
// Direct mode (original behavior)
|
||||
handle_via_direct(
|
||||
client_reader,
|
||||
client_writer,
|
||||
success,
|
||||
upstream_manager,
|
||||
stats,
|
||||
config,
|
||||
buffer_pool,
|
||||
rng,
|
||||
)
|
||||
.await
|
||||
ip_tracker.remove_ip(&user, peer_addr.ip()).await;
|
||||
relay_result
|
||||
}
|
||||
|
||||
async fn check_user_limits_static(
|
||||
|
|
@ -752,22 +741,32 @@ impl RunningClientHandler {
|
|||
});
|
||||
}
|
||||
|
||||
let mut ip_reserved = false;
|
||||
// IP limit check
|
||||
if let Err(reason) = ip_tracker.check_and_add(user, peer_addr.ip()).await {
|
||||
warn!(
|
||||
user = %user,
|
||||
ip = %peer_addr.ip(),
|
||||
reason = %reason,
|
||||
"IP limit exceeded"
|
||||
);
|
||||
return Err(ProxyError::ConnectionLimitExceeded {
|
||||
user: user.to_string(),
|
||||
});
|
||||
match ip_tracker.check_and_add(user, peer_addr.ip()).await {
|
||||
Ok(()) => {
|
||||
ip_reserved = true;
|
||||
}
|
||||
Err(reason) => {
|
||||
warn!(
|
||||
user = %user,
|
||||
ip = %peer_addr.ip(),
|
||||
reason = %reason,
|
||||
"IP limit exceeded"
|
||||
);
|
||||
return Err(ProxyError::ConnectionLimitExceeded {
|
||||
user: user.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(limit) = config.access.user_max_tcp_conns.get(user)
|
||||
&& stats.get_user_curr_connects(user) >= *limit as u64
|
||||
{
|
||||
if ip_reserved {
|
||||
ip_tracker.remove_ip(user, peer_addr.ip()).await;
|
||||
stats.increment_ip_reservation_rollback_tcp_limit_total();
|
||||
}
|
||||
return Err(ProxyError::ConnectionLimitExceeded {
|
||||
user: user.to_string(),
|
||||
});
|
||||
|
|
@ -776,6 +775,10 @@ impl RunningClientHandler {
|
|||
if let Some(quota) = config.access.user_data_quota.get(user)
|
||||
&& stats.get_user_total_octets(user) >= *quota
|
||||
{
|
||||
if ip_reserved {
|
||||
ip_tracker.remove_ip(user, peer_addr.ip()).await;
|
||||
stats.increment_ip_reservation_rollback_quota_limit_total();
|
||||
}
|
||||
return Err(ProxyError::DataQuotaExceeded {
|
||||
user: user.to_string(),
|
||||
});
|
||||
|
|
|
|||
|
|
@ -118,10 +118,16 @@ fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result<SocketAddr> {
|
|||
// Unknown DC requested by client without override: log and fall back.
|
||||
if !config.dc_overrides.contains_key(&dc_key) {
|
||||
warn!(dc_idx = dc_idx, "Requested non-standard DC with no override; falling back to default cluster");
|
||||
if let Some(path) = &config.general.unknown_dc_log_path
|
||||
&& let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path)
|
||||
if config.general.unknown_dc_file_log_enabled
|
||||
&& let Some(path) = &config.general.unknown_dc_log_path
|
||||
&& let Ok(handle) = tokio::runtime::Handle::try_current()
|
||||
{
|
||||
let _ = writeln!(file, "dc_idx={dc_idx}");
|
||||
let path = path.clone();
|
||||
handle.spawn_blocking(move || {
|
||||
if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) {
|
||||
let _ = writeln!(file, "dc_idx={dc_idx}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -100,6 +100,11 @@ pub struct Stats {
|
|||
me_refill_failed_total: AtomicU64,
|
||||
me_writer_restored_same_endpoint_total: AtomicU64,
|
||||
me_writer_restored_fallback_total: AtomicU64,
|
||||
me_no_writer_failfast_total: AtomicU64,
|
||||
me_async_recovery_trigger_total: AtomicU64,
|
||||
me_inline_recovery_total: AtomicU64,
|
||||
ip_reservation_rollback_tcp_limit_total: AtomicU64,
|
||||
ip_reservation_rollback_quota_limit_total: AtomicU64,
|
||||
telemetry_core_enabled: AtomicBool,
|
||||
telemetry_user_enabled: AtomicBool,
|
||||
telemetry_me_level: AtomicU8,
|
||||
|
|
@ -522,6 +527,34 @@ impl Stats {
|
|||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_no_writer_failfast_total(&self) {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.me_no_writer_failfast_total.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_async_recovery_trigger_total(&self) {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.me_async_recovery_trigger_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_inline_recovery_total(&self) {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.me_inline_recovery_total.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_ip_reservation_rollback_tcp_limit_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.ip_reservation_rollback_tcp_limit_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_ip_reservation_rollback_quota_limit_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.ip_reservation_rollback_quota_limit_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_endpoint_quarantine_total(&self) {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.me_endpoint_quarantine_total
|
||||
|
|
@ -791,6 +824,23 @@ impl Stats {
|
|||
pub fn get_me_writer_restored_fallback_total(&self) -> u64 {
|
||||
self.me_writer_restored_fallback_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_no_writer_failfast_total(&self) -> u64 {
|
||||
self.me_no_writer_failfast_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_async_recovery_trigger_total(&self) -> u64 {
|
||||
self.me_async_recovery_trigger_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_inline_recovery_total(&self) -> u64 {
|
||||
self.me_inline_recovery_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_ip_reservation_rollback_tcp_limit_total(&self) -> u64 {
|
||||
self.ip_reservation_rollback_tcp_limit_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_ip_reservation_rollback_quota_limit_total(&self) -> u64 {
|
||||
self.ip_reservation_rollback_quota_limit_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn increment_user_connects(&self, user: &str) {
|
||||
if !self.telemetry_user_enabled() {
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
|||
use tokio::sync::{Mutex, Notify, RwLock, mpsc};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::config::{MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy};
|
||||
use crate::config::{MeBindStaleMode, MeFloorMode, MeRouteNoWriterMode, MeSocksKdfPolicy};
|
||||
use crate::crypto::SecureRandom;
|
||||
use crate::network::IpFamily;
|
||||
use crate::network::probe::NetworkDecision;
|
||||
|
|
@ -145,6 +145,10 @@ pub struct MePool {
|
|||
pub(super) secret_atomic_snapshot: AtomicBool,
|
||||
pub(super) me_deterministic_writer_sort: AtomicBool,
|
||||
pub(super) me_socks_kdf_policy: AtomicU8,
|
||||
pub(super) me_route_no_writer_mode: AtomicU8,
|
||||
pub(super) me_route_no_writer_wait: Duration,
|
||||
pub(super) me_route_inline_recovery_attempts: u32,
|
||||
pub(super) me_route_inline_recovery_wait: Duration,
|
||||
pool_size: usize,
|
||||
}
|
||||
|
||||
|
|
@ -227,6 +231,10 @@ impl MePool {
|
|||
me_route_backpressure_base_timeout_ms: u64,
|
||||
me_route_backpressure_high_timeout_ms: u64,
|
||||
me_route_backpressure_high_watermark_pct: u8,
|
||||
me_route_no_writer_mode: MeRouteNoWriterMode,
|
||||
me_route_no_writer_wait_ms: u64,
|
||||
me_route_inline_recovery_attempts: u32,
|
||||
me_route_inline_recovery_wait_ms: u64,
|
||||
) -> Arc<Self> {
|
||||
let registry = Arc::new(ConnRegistry::new());
|
||||
registry.update_route_backpressure_policy(
|
||||
|
|
@ -343,6 +351,10 @@ impl MePool {
|
|||
secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot),
|
||||
me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort),
|
||||
me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()),
|
||||
me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()),
|
||||
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
|
||||
me_route_inline_recovery_attempts,
|
||||
me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,13 +1,14 @@
|
|||
use std::cmp::Reverse;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use crate::config::MeRouteNoWriterMode;
|
||||
use crate::error::{ProxyError, Result};
|
||||
use crate::network::IpFamily;
|
||||
use crate::protocol::constants::RPC_CLOSE_EXT_U32;
|
||||
|
|
@ -49,7 +50,11 @@ impl MePool {
|
|||
our_addr,
|
||||
proto_flags,
|
||||
};
|
||||
let mut emergency_attempts = 0;
|
||||
let no_writer_mode =
|
||||
MeRouteNoWriterMode::from_u8(self.me_route_no_writer_mode.load(Ordering::Relaxed));
|
||||
let mut no_writer_deadline: Option<Instant> = None;
|
||||
let mut emergency_attempts = 0u32;
|
||||
let mut async_recovery_triggered = false;
|
||||
|
||||
loop {
|
||||
if let Some(current) = self.registry.get_writer(conn_id).await {
|
||||
|
|
@ -74,34 +79,66 @@ impl MePool {
|
|||
let mut writers_snapshot = {
|
||||
let ws = self.writers.read().await;
|
||||
if ws.is_empty() {
|
||||
// Create waiter before recovery attempts so notify_one permits are not missed.
|
||||
let waiter = self.writer_available.notified();
|
||||
drop(ws);
|
||||
for family in self.family_order() {
|
||||
let map = match family {
|
||||
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
|
||||
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
|
||||
};
|
||||
for (_dc, addrs) in map.iter() {
|
||||
for (ip, port) in addrs {
|
||||
let addr = SocketAddr::new(*ip, *port);
|
||||
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
|
||||
self.writer_available.notify_one();
|
||||
match no_writer_mode {
|
||||
MeRouteNoWriterMode::AsyncRecoveryFailfast => {
|
||||
let deadline = *no_writer_deadline.get_or_insert_with(|| {
|
||||
Instant::now() + self.me_route_no_writer_wait
|
||||
});
|
||||
if !async_recovery_triggered {
|
||||
let triggered =
|
||||
self.trigger_async_recovery_for_target_dc(target_dc).await;
|
||||
if !triggered {
|
||||
self.trigger_async_recovery_global().await;
|
||||
}
|
||||
async_recovery_triggered = true;
|
||||
}
|
||||
if self.wait_for_writer_until(deadline).await {
|
||||
continue;
|
||||
}
|
||||
self.stats.increment_me_no_writer_failfast_total();
|
||||
return Err(ProxyError::Proxy(
|
||||
"No ME writer available in failfast window".into(),
|
||||
));
|
||||
}
|
||||
MeRouteNoWriterMode::InlineRecoveryLegacy => {
|
||||
self.stats.increment_me_inline_recovery_total();
|
||||
for _ in 0..self.me_route_inline_recovery_attempts.max(1) {
|
||||
for family in self.family_order() {
|
||||
let map = match family {
|
||||
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
|
||||
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
|
||||
};
|
||||
for (_dc, addrs) in &map {
|
||||
for (ip, port) in addrs {
|
||||
let addr = SocketAddr::new(*ip, *port);
|
||||
let _ = self.connect_one(addr, self.rng.as_ref()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
if !self.writers.read().await.is_empty() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if !self.writers.read().await.is_empty() {
|
||||
continue;
|
||||
}
|
||||
if tokio::time::timeout(Duration::from_secs(3), waiter).await.is_err() {
|
||||
if !self.writers.read().await.is_empty() {
|
||||
if !self.writers.read().await.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let waiter = self.writer_available.notified();
|
||||
if tokio::time::timeout(self.me_route_inline_recovery_wait, waiter)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
if !self.writers.read().await.is_empty() {
|
||||
continue;
|
||||
}
|
||||
self.stats.increment_me_no_writer_failfast_total();
|
||||
return Err(ProxyError::Proxy(
|
||||
"All ME connections dead (legacy wait timeout)".into(),
|
||||
));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
return Err(ProxyError::Proxy("All ME connections dead (waited 3s)".into()));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
ws.clone()
|
||||
};
|
||||
|
|
@ -115,46 +152,70 @@ impl MePool {
|
|||
.await;
|
||||
}
|
||||
if candidate_indices.is_empty() {
|
||||
// Emergency connect-on-demand
|
||||
if emergency_attempts >= 3 {
|
||||
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
|
||||
}
|
||||
emergency_attempts += 1;
|
||||
for family in self.family_order() {
|
||||
let map_guard = match family {
|
||||
IpFamily::V4 => self.proxy_map_v4.read().await,
|
||||
IpFamily::V6 => self.proxy_map_v6.read().await,
|
||||
};
|
||||
if let Some(addrs) = map_guard.get(&(target_dc as i32)) {
|
||||
let mut shuffled = addrs.clone();
|
||||
shuffled.shuffle(&mut rand::rng());
|
||||
drop(map_guard);
|
||||
for (ip, port) in shuffled {
|
||||
let addr = SocketAddr::new(ip, port);
|
||||
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
|
||||
break;
|
||||
match no_writer_mode {
|
||||
MeRouteNoWriterMode::AsyncRecoveryFailfast => {
|
||||
let deadline = *no_writer_deadline.get_or_insert_with(|| {
|
||||
Instant::now() + self.me_route_no_writer_wait
|
||||
});
|
||||
if !async_recovery_triggered {
|
||||
let triggered = self.trigger_async_recovery_for_target_dc(target_dc).await;
|
||||
if !triggered {
|
||||
self.trigger_async_recovery_global().await;
|
||||
}
|
||||
async_recovery_triggered = true;
|
||||
}
|
||||
if self.wait_for_candidate_until(target_dc, deadline).await {
|
||||
continue;
|
||||
}
|
||||
self.stats.increment_me_no_writer_failfast_total();
|
||||
return Err(ProxyError::Proxy(
|
||||
"No ME writers available for target DC in failfast window".into(),
|
||||
));
|
||||
}
|
||||
MeRouteNoWriterMode::InlineRecoveryLegacy => {
|
||||
self.stats.increment_me_inline_recovery_total();
|
||||
if emergency_attempts >= self.me_route_inline_recovery_attempts.max(1) {
|
||||
self.stats.increment_me_no_writer_failfast_total();
|
||||
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
|
||||
}
|
||||
emergency_attempts += 1;
|
||||
for family in self.family_order() {
|
||||
let map_guard = match family {
|
||||
IpFamily::V4 => self.proxy_map_v4.read().await,
|
||||
IpFamily::V6 => self.proxy_map_v6.read().await,
|
||||
};
|
||||
if let Some(addrs) = map_guard.get(&(target_dc as i32)) {
|
||||
let mut shuffled = addrs.clone();
|
||||
shuffled.shuffle(&mut rand::rng());
|
||||
drop(map_guard);
|
||||
for (ip, port) in shuffled {
|
||||
let addr = SocketAddr::new(ip, port);
|
||||
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64)).await;
|
||||
let ws2 = self.writers.read().await;
|
||||
writers_snapshot = ws2.clone();
|
||||
drop(ws2);
|
||||
candidate_indices = self
|
||||
.candidate_indices_for_dc(&writers_snapshot, target_dc, false)
|
||||
.await;
|
||||
if candidate_indices.is_empty() {
|
||||
candidate_indices = self
|
||||
.candidate_indices_for_dc(&writers_snapshot, target_dc, true)
|
||||
.await;
|
||||
}
|
||||
if !candidate_indices.is_empty() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100 * emergency_attempts)).await;
|
||||
let ws2 = self.writers.read().await;
|
||||
writers_snapshot = ws2.clone();
|
||||
drop(ws2);
|
||||
candidate_indices = self
|
||||
.candidate_indices_for_dc(&writers_snapshot, target_dc, false)
|
||||
.await;
|
||||
if candidate_indices.is_empty() {
|
||||
candidate_indices = self
|
||||
.candidate_indices_for_dc(&writers_snapshot, target_dc, true)
|
||||
.await;
|
||||
}
|
||||
if !candidate_indices.is_empty() {
|
||||
break;
|
||||
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
|
||||
}
|
||||
}
|
||||
}
|
||||
if candidate_indices.is_empty() {
|
||||
return Err(ProxyError::Proxy("No ME writers available for target DC".into()));
|
||||
}
|
||||
}
|
||||
let writer_idle_since = self.registry.writer_idle_since_snapshot().await;
|
||||
let now_epoch_secs = Self::now_epoch_secs();
|
||||
|
|
@ -275,6 +336,129 @@ impl MePool {
|
|||
}
|
||||
}
|
||||
|
||||
async fn wait_for_writer_until(&self, deadline: Instant) -> bool {
|
||||
let waiter = self.writer_available.notified();
|
||||
if !self.writers.read().await.is_empty() {
|
||||
return true;
|
||||
}
|
||||
let now = Instant::now();
|
||||
if now >= deadline {
|
||||
return !self.writers.read().await.is_empty();
|
||||
}
|
||||
let timeout = deadline.saturating_duration_since(now);
|
||||
if tokio::time::timeout(timeout, waiter).await.is_ok() {
|
||||
return true;
|
||||
}
|
||||
!self.writers.read().await.is_empty()
|
||||
}
|
||||
|
||||
async fn wait_for_candidate_until(&self, target_dc: i16, deadline: Instant) -> bool {
|
||||
loop {
|
||||
if self.has_candidate_for_target_dc(target_dc).await {
|
||||
return true;
|
||||
}
|
||||
|
||||
let now = Instant::now();
|
||||
if now >= deadline {
|
||||
return self.has_candidate_for_target_dc(target_dc).await;
|
||||
}
|
||||
|
||||
let remaining = deadline.saturating_duration_since(now);
|
||||
let sleep_for = remaining.min(Duration::from_millis(25));
|
||||
let waiter = self.writer_available.notified();
|
||||
tokio::select! {
|
||||
_ = waiter => {}
|
||||
_ = tokio::time::sleep(sleep_for) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn has_candidate_for_target_dc(&self, target_dc: i16) -> bool {
|
||||
let writers_snapshot = {
|
||||
let ws = self.writers.read().await;
|
||||
if ws.is_empty() {
|
||||
return false;
|
||||
}
|
||||
ws.clone()
|
||||
};
|
||||
let mut candidate_indices = self
|
||||
.candidate_indices_for_dc(&writers_snapshot, target_dc, false)
|
||||
.await;
|
||||
if candidate_indices.is_empty() {
|
||||
candidate_indices = self
|
||||
.candidate_indices_for_dc(&writers_snapshot, target_dc, true)
|
||||
.await;
|
||||
}
|
||||
!candidate_indices.is_empty()
|
||||
}
|
||||
|
||||
async fn trigger_async_recovery_for_target_dc(self: &Arc<Self>, target_dc: i16) -> bool {
|
||||
let endpoints = self.endpoint_candidates_for_target_dc(target_dc).await;
|
||||
if endpoints.is_empty() {
|
||||
return false;
|
||||
}
|
||||
self.stats.increment_me_async_recovery_trigger_total();
|
||||
for addr in endpoints.into_iter().take(8) {
|
||||
self.trigger_immediate_refill(addr);
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
async fn trigger_async_recovery_global(self: &Arc<Self>) {
|
||||
self.stats.increment_me_async_recovery_trigger_total();
|
||||
let mut seen = HashSet::<SocketAddr>::new();
|
||||
for family in self.family_order() {
|
||||
let map = match family {
|
||||
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
|
||||
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
|
||||
};
|
||||
for addrs in map.values() {
|
||||
for (ip, port) in addrs {
|
||||
let addr = SocketAddr::new(*ip, *port);
|
||||
if seen.insert(addr) {
|
||||
self.trigger_immediate_refill(addr);
|
||||
}
|
||||
if seen.len() >= 8 {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn endpoint_candidates_for_target_dc(&self, target_dc: i16) -> Vec<SocketAddr> {
|
||||
let key = target_dc as i32;
|
||||
let mut preferred = Vec::<SocketAddr>::new();
|
||||
let mut seen = HashSet::<SocketAddr>::new();
|
||||
|
||||
for family in self.family_order() {
|
||||
let map = match family {
|
||||
IpFamily::V4 => self.proxy_map_v4.read().await.clone(),
|
||||
IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
|
||||
};
|
||||
let mut lookup_keys = vec![key, key.abs(), -key.abs()];
|
||||
let def = self.default_dc.load(Ordering::Relaxed);
|
||||
if def != 0 {
|
||||
lookup_keys.push(def);
|
||||
}
|
||||
for lookup in lookup_keys {
|
||||
if let Some(addrs) = map.get(&lookup) {
|
||||
for (ip, port) in addrs {
|
||||
let addr = SocketAddr::new(*ip, *port);
|
||||
if seen.insert(addr) {
|
||||
preferred.push(addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if !preferred.is_empty() && !self.decision.effective_multipath {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
preferred
|
||||
}
|
||||
|
||||
pub async fn send_close(self: &Arc<Self>, conn_id: u64) -> Result<()> {
|
||||
if let Some(w) = self.registry.get_writer(conn_id).await {
|
||||
let mut p = Vec::with_capacity(12);
|
||||
|
|
|
|||
Loading…
Reference in New Issue