mirror of https://github.com/telemt/telemt.git
ME Writer stuck-up in draining-state fixes
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
parent
049db1196f
commit
6f9aef7bb4
|
|
@ -1692,6 +1692,57 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
|||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_writer_close_signal_drop_total Close-signal drops for already-removed ME writers"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_me_writer_close_signal_drop_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_close_signal_drop_total {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_close_signal_drop_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_writer_close_signal_channel_full_total Close-signal drops caused by full writer command channels"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_close_signal_channel_full_total {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_close_signal_channel_full_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_draining_writers_reap_progress_total Draining-writer removals processed by reap cleanup"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_draining_writers_reap_progress_total {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_draining_writers_reap_progress_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals");
|
||||
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter");
|
||||
let _ = writeln!(
|
||||
|
|
@ -2124,6 +2175,13 @@ mod tests {
|
|||
assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter"));
|
||||
assert!(output.contains(
|
||||
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
|
||||
));
|
||||
assert!(output.contains(
|
||||
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
|
||||
));
|
||||
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter"));
|
||||
assert!(output.contains(
|
||||
|
|
|
|||
|
|
@ -123,6 +123,9 @@ pub struct Stats {
|
|||
pool_drain_soft_evict_total: AtomicU64,
|
||||
pool_drain_soft_evict_writer_total: AtomicU64,
|
||||
pool_stale_pick_total: AtomicU64,
|
||||
me_writer_close_signal_drop_total: AtomicU64,
|
||||
me_writer_close_signal_channel_full_total: AtomicU64,
|
||||
me_draining_writers_reap_progress_total: AtomicU64,
|
||||
me_writer_removed_total: AtomicU64,
|
||||
me_writer_removed_unexpected_total: AtomicU64,
|
||||
me_refill_triggered_total: AtomicU64,
|
||||
|
|
@ -734,6 +737,24 @@ impl Stats {
|
|||
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_writer_close_signal_drop_total(&self) {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.me_writer_close_signal_drop_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_writer_close_signal_channel_full_total(&self) {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.me_writer_close_signal_channel_full_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_draining_writers_reap_progress_total(&self) {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.me_draining_writers_reap_progress_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_writer_removed_total(&self) {
|
||||
if self.telemetry_me_allows_debug() {
|
||||
self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed);
|
||||
|
|
@ -1259,6 +1280,17 @@ impl Stats {
|
|||
pub fn get_pool_stale_pick_total(&self) -> u64 {
|
||||
self.pool_stale_pick_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_writer_close_signal_drop_total(&self) -> u64 {
|
||||
self.me_writer_close_signal_drop_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_writer_close_signal_channel_full_total(&self) -> u64 {
|
||||
self.me_writer_close_signal_channel_full_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_draining_writers_reap_progress_total(&self) -> u64 {
|
||||
self.me_draining_writers_reap_progress_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_writer_removed_total(&self) -> u64 {
|
||||
self.me_writer_removed_total.load(Ordering::Relaxed)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -314,6 +314,8 @@ pub(super) async fn reap_draining_writers(
|
|||
}
|
||||
pool.stats.increment_pool_force_close_total();
|
||||
pool.remove_writer_and_close_clients(writer_id).await;
|
||||
pool.stats
|
||||
.increment_me_draining_writers_reap_progress_total();
|
||||
closed_total = closed_total.saturating_add(1);
|
||||
}
|
||||
for writer_id in empty_writer_ids {
|
||||
|
|
@ -324,6 +326,8 @@ pub(super) async fn reap_draining_writers(
|
|||
continue;
|
||||
}
|
||||
pool.remove_writer_and_close_clients(writer_id).await;
|
||||
pool.stats
|
||||
.increment_me_draining_writers_reap_progress_total();
|
||||
closed_total = closed_total.saturating_add(1);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ use std::sync::Arc;
|
|||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use bytes::Bytes;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
|
|
@ -209,6 +210,89 @@ async fn reap_draining_writers_removes_empty_draining_writers() {
|
|||
assert_eq!(current_writer_ids(&pool).await, vec![3]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reap_draining_writers_does_not_block_on_stuck_writer_close_signal() {
|
||||
let pool = make_pool(128).await;
|
||||
let now_epoch_secs = MePool::now_epoch_secs();
|
||||
|
||||
let (blocked_tx, blocked_rx) = mpsc::channel::<WriterCommand>(1);
|
||||
assert!(
|
||||
blocked_tx
|
||||
.try_send(WriterCommand::Data(Bytes::from_static(b"stuck")))
|
||||
.is_ok()
|
||||
);
|
||||
let blocked_rx_guard = tokio::spawn(async move {
|
||||
let _hold_rx = blocked_rx;
|
||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
||||
});
|
||||
|
||||
let blocked_writer_id = 90u64;
|
||||
let blocked_writer = MeWriter {
|
||||
id: blocked_writer_id,
|
||||
addr: SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::LOCALHOST),
|
||||
4500 + blocked_writer_id as u16,
|
||||
),
|
||||
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
|
||||
writer_dc: 2,
|
||||
generation: 1,
|
||||
contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())),
|
||||
created_at: Instant::now() - Duration::from_secs(blocked_writer_id),
|
||||
tx: blocked_tx.clone(),
|
||||
cancel: CancellationToken::new(),
|
||||
degraded: Arc::new(AtomicBool::new(false)),
|
||||
rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)),
|
||||
draining: Arc::new(AtomicBool::new(true)),
|
||||
draining_started_at_epoch_secs: Arc::new(AtomicU64::new(
|
||||
now_epoch_secs.saturating_sub(120),
|
||||
)),
|
||||
drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)),
|
||||
allow_drain_fallback: Arc::new(AtomicBool::new(false)),
|
||||
};
|
||||
pool.writers.write().await.push(blocked_writer);
|
||||
pool.registry
|
||||
.register_writer(blocked_writer_id, blocked_tx)
|
||||
.await;
|
||||
pool.conn_count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
insert_draining_writer(&pool, 91, now_epoch_secs.saturating_sub(110), 0, 0).await;
|
||||
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
let reap_res = tokio::time::timeout(
|
||||
Duration::from_millis(500),
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed),
|
||||
)
|
||||
.await;
|
||||
blocked_rx_guard.abort();
|
||||
|
||||
assert!(reap_res.is_ok(), "reap should not block on close signal");
|
||||
assert!(current_writer_ids(&pool).await.is_empty());
|
||||
assert_eq!(pool.stats.get_me_writer_close_signal_drop_total(), 2);
|
||||
assert_eq!(pool.stats.get_me_writer_close_signal_channel_full_total(), 1);
|
||||
assert_eq!(pool.stats.get_me_draining_writers_reap_progress_total(), 2);
|
||||
let activity = pool.registry.writer_activity_snapshot().await;
|
||||
assert!(!activity.bound_clients_by_writer.contains_key(&blocked_writer_id));
|
||||
assert!(!activity.bound_clients_by_writer.contains_key(&91));
|
||||
let (probe_conn_id, _rx) = pool.registry.register().await;
|
||||
assert!(
|
||||
!pool.registry
|
||||
.bind_writer(
|
||||
probe_conn_id,
|
||||
blocked_writer_id,
|
||||
ConnMeta {
|
||||
target_dc: 2,
|
||||
client_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6400),
|
||||
our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443),
|
||||
proto_flags: 0,
|
||||
},
|
||||
)
|
||||
.await
|
||||
);
|
||||
let _ = pool.registry.unregister(probe_conn_id).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() {
|
||||
let pool = make_pool(2).await;
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ use bytes::Bytes;
|
|||
use bytes::BytesMut;
|
||||
use rand::Rng;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
|
|
@ -525,6 +526,11 @@ impl MePool {
|
|||
self.conn_count.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
// State invariant:
|
||||
// - writer is removed from `self.writers` (pool visibility),
|
||||
// - writer is removed from registry routing/binding maps via `writer_lost`.
|
||||
// The close command below is only a best-effort accelerator for task shutdown.
|
||||
// Cleanup progress must never depend on command-channel availability.
|
||||
let conns = self.registry.writer_lost(writer_id).await;
|
||||
{
|
||||
let mut tracker = self.ping_tracker.lock().await;
|
||||
|
|
@ -532,7 +538,25 @@ impl MePool {
|
|||
}
|
||||
self.rtt_stats.lock().await.remove(&writer_id);
|
||||
if let Some(tx) = close_tx {
|
||||
let _ = tx.send(WriterCommand::Close).await;
|
||||
match tx.try_send(WriterCommand::Close) {
|
||||
Ok(()) => {}
|
||||
Err(TrySendError::Full(_)) => {
|
||||
self.stats.increment_me_writer_close_signal_drop_total();
|
||||
self.stats
|
||||
.increment_me_writer_close_signal_channel_full_total();
|
||||
debug!(
|
||||
writer_id,
|
||||
"Skipping close signal for removed writer: command channel is full"
|
||||
);
|
||||
}
|
||||
Err(TrySendError::Closed(_)) => {
|
||||
self.stats.increment_me_writer_close_signal_drop_total();
|
||||
debug!(
|
||||
writer_id,
|
||||
"Skipping close signal for removed writer: command channel is closed"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
if trigger_refill
|
||||
&& let Some(addr) = removed_addr
|
||||
|
|
|
|||
Loading…
Reference in New Issue