mirror of
https://github.com/telemt/telemt.git
synced 2026-06-25 20:31:11 +03:00
Teardown Monitoring in API
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
@@ -16,6 +16,9 @@ use crate::config::MeBindStaleMode;
|
||||
use crate::crypto::SecureRandom;
|
||||
use crate::error::{ProxyError, Result};
|
||||
use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32};
|
||||
use crate::stats::{
|
||||
MeWriterCleanupSideEffectStep, MeWriterTeardownMode, MeWriterTeardownReason,
|
||||
};
|
||||
|
||||
use super::codec::{RpcWriter, WriterCommand};
|
||||
use super::pool::{MePool, MeWriter, WriterContour};
|
||||
@@ -28,7 +31,7 @@ const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5;
|
||||
const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum WriterTeardownMode {
|
||||
enum WriterRemoveGuardMode {
|
||||
Any,
|
||||
DrainingOnly,
|
||||
}
|
||||
@@ -49,9 +52,16 @@ impl MePool {
|
||||
|
||||
for writer_id in closed_writer_ids {
|
||||
if self.registry.is_writer_empty(writer_id).await {
|
||||
let _ = self.remove_writer_only(writer_id).await;
|
||||
let _ = self
|
||||
.remove_writer_only(writer_id, MeWriterTeardownReason::PruneClosedWriter)
|
||||
.await;
|
||||
} else {
|
||||
let _ = self.remove_writer_and_close_clients(writer_id).await;
|
||||
let _ = self
|
||||
.remove_writer_and_close_clients(
|
||||
writer_id,
|
||||
MeWriterTeardownReason::PruneClosedWriter,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -173,7 +183,11 @@ impl MePool {
|
||||
.is_ok()
|
||||
{
|
||||
if let Some(pool) = pool_writer_task.upgrade() {
|
||||
pool.remove_writer_and_close_clients(writer_id).await;
|
||||
pool.remove_writer_and_close_clients(
|
||||
writer_id,
|
||||
MeWriterTeardownReason::WriterTaskExit,
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
cancel_wr.cancel();
|
||||
}
|
||||
@@ -264,7 +278,11 @@ impl MePool {
|
||||
.is_ok()
|
||||
{
|
||||
if let Some(pool) = pool.upgrade() {
|
||||
pool.remove_writer_and_close_clients(writer_id).await;
|
||||
pool.remove_writer_and_close_clients(
|
||||
writer_id,
|
||||
MeWriterTeardownReason::ReaderExit,
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
// Fallback for shutdown races: make writer task exit quickly so stale
|
||||
// channels are observable by periodic prune.
|
||||
@@ -372,7 +390,11 @@ impl MePool {
|
||||
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
pool.remove_writer_and_close_clients(writer_id).await;
|
||||
pool.remove_writer_and_close_clients(
|
||||
writer_id,
|
||||
MeWriterTeardownReason::PingSendFail,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -465,7 +487,11 @@ impl MePool {
|
||||
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
pool.remove_writer_and_close_clients(writer_id).await;
|
||||
pool.remove_writer_and_close_clients(
|
||||
writer_id,
|
||||
MeWriterTeardownReason::SignalSendFail,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -499,7 +525,11 @@ impl MePool {
|
||||
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
pool.remove_writer_and_close_clients(writer_id).await;
|
||||
pool.remove_writer_and_close_clients(
|
||||
writer_id,
|
||||
MeWriterTeardownReason::SignalSendFail,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -512,25 +542,48 @@ impl MePool {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
|
||||
pub(crate) async fn remove_writer_and_close_clients(
|
||||
self: &Arc<Self>,
|
||||
writer_id: u64,
|
||||
reason: MeWriterTeardownReason,
|
||||
) -> bool {
|
||||
// Full client cleanup now happens inside `registry.writer_lost` to keep
|
||||
// writer reap/remove paths strictly non-blocking per connection.
|
||||
let _ = self
|
||||
.remove_writer_with_mode(writer_id, WriterTeardownMode::Any)
|
||||
.await;
|
||||
self.remove_writer_with_mode(
|
||||
writer_id,
|
||||
reason,
|
||||
MeWriterTeardownMode::Normal,
|
||||
WriterRemoveGuardMode::Any,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(super) async fn remove_draining_writer_hard_detach(
|
||||
self: &Arc<Self>,
|
||||
writer_id: u64,
|
||||
reason: MeWriterTeardownReason,
|
||||
) -> bool {
|
||||
self.remove_writer_with_mode(writer_id, WriterTeardownMode::DrainingOnly)
|
||||
.await
|
||||
self.remove_writer_with_mode(
|
||||
writer_id,
|
||||
reason,
|
||||
MeWriterTeardownMode::HardDetach,
|
||||
WriterRemoveGuardMode::DrainingOnly,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> bool {
|
||||
self.remove_writer_with_mode(writer_id, WriterTeardownMode::Any)
|
||||
.await
|
||||
async fn remove_writer_only(
|
||||
self: &Arc<Self>,
|
||||
writer_id: u64,
|
||||
reason: MeWriterTeardownReason,
|
||||
) -> bool {
|
||||
self.remove_writer_with_mode(
|
||||
writer_id,
|
||||
reason,
|
||||
MeWriterTeardownMode::Normal,
|
||||
WriterRemoveGuardMode::Any,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
// Authoritative teardown primitive shared by normal cleanup and watchdog path.
|
||||
@@ -542,8 +595,13 @@ impl MePool {
|
||||
async fn remove_writer_with_mode(
|
||||
self: &Arc<Self>,
|
||||
writer_id: u64,
|
||||
mode: WriterTeardownMode,
|
||||
reason: MeWriterTeardownReason,
|
||||
mode: MeWriterTeardownMode,
|
||||
guard_mode: WriterRemoveGuardMode,
|
||||
) -> bool {
|
||||
let started_at = Instant::now();
|
||||
self.stats
|
||||
.increment_me_writer_teardown_attempt_total(reason, mode);
|
||||
let mut close_tx: Option<mpsc::Sender<WriterCommand>> = None;
|
||||
let mut removed_addr: Option<SocketAddr> = None;
|
||||
let mut removed_dc: Option<i32> = None;
|
||||
@@ -553,9 +611,12 @@ impl MePool {
|
||||
{
|
||||
let mut ws = self.writers.write().await;
|
||||
if let Some(pos) = ws.iter().position(|w| w.id == writer_id) {
|
||||
if matches!(mode, WriterTeardownMode::DrainingOnly)
|
||||
if matches!(guard_mode, WriterRemoveGuardMode::DrainingOnly)
|
||||
&& !ws[pos].draining.load(Ordering::Relaxed)
|
||||
{
|
||||
self.stats.increment_me_writer_teardown_noop_total();
|
||||
self.stats
|
||||
.observe_me_writer_teardown_duration(mode, started_at.elapsed());
|
||||
return false;
|
||||
}
|
||||
let w = ws.remove(pos);
|
||||
@@ -595,6 +656,9 @@ impl MePool {
|
||||
self.stats.increment_me_writer_close_signal_drop_total();
|
||||
self.stats
|
||||
.increment_me_writer_close_signal_channel_full_total();
|
||||
self.stats.increment_me_writer_cleanup_side_effect_failures_total(
|
||||
MeWriterCleanupSideEffectStep::CloseSignalChannelFull,
|
||||
);
|
||||
debug!(
|
||||
writer_id,
|
||||
"Skipping close signal for removed writer: command channel is full"
|
||||
@@ -602,6 +666,9 @@ impl MePool {
|
||||
}
|
||||
Err(TrySendError::Closed(_)) => {
|
||||
self.stats.increment_me_writer_close_signal_drop_total();
|
||||
self.stats.increment_me_writer_cleanup_side_effect_failures_total(
|
||||
MeWriterCleanupSideEffectStep::CloseSignalChannelClosed,
|
||||
);
|
||||
debug!(
|
||||
writer_id,
|
||||
"Skipping close signal for removed writer: command channel is closed"
|
||||
@@ -619,6 +686,13 @@ impl MePool {
|
||||
self.trigger_immediate_refill_for_dc(addr, writer_dc);
|
||||
}
|
||||
}
|
||||
if removed {
|
||||
self.stats.increment_me_writer_teardown_success_total(mode);
|
||||
} else {
|
||||
self.stats.increment_me_writer_teardown_noop_total();
|
||||
}
|
||||
self.stats
|
||||
.observe_me_writer_teardown_duration(mode, started_at.elapsed());
|
||||
removed
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user