mirror of
https://github.com/telemt/telemt.git
synced 2026-05-13 15:21:44 +03:00
Expose Quota Contention + Cleanup fallback metrics
This commit is contained in:
@@ -739,6 +739,52 @@ async fn render_metrics(
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_quota_refund_bytes_total Reserved quota bytes returned before commit"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_quota_refund_bytes_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_quota_refund_bytes_total {}",
|
||||
if core_enabled {
|
||||
stats.get_quota_refund_bytes_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_quota_contention_total Quota reservation CAS contention events"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_quota_contention_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_quota_contention_total {}",
|
||||
if core_enabled {
|
||||
stats.get_quota_contention_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_quota_contention_timeout_total Quota reservations that hit the bounded contention budget"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# TYPE telemt_quota_contention_timeout_total counter"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_quota_contention_timeout_total {}",
|
||||
if core_enabled {
|
||||
stats.get_quota_contention_timeout_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_conntrack_control_state Runtime conntrack control state flags"
|
||||
@@ -1955,6 +2001,34 @@ async fn render_metrics(
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_child_join_timeout_total Middle relay child tasks that did not join before cleanup deadline"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_me_child_join_timeout_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_child_join_timeout_total {}",
|
||||
if core_enabled {
|
||||
stats.get_me_child_join_timeout_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_child_abort_total Middle relay child tasks aborted after bounded cleanup timeout"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_me_child_abort_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_child_abort_total {}",
|
||||
if core_enabled {
|
||||
stats.get_me_child_abort_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
|
||||
@@ -624,6 +624,7 @@ async fn reserve_user_quota_with_yield(
|
||||
user_stats: &UserStats,
|
||||
bytes: u64,
|
||||
limit: u64,
|
||||
stats: &Stats,
|
||||
) -> std::result::Result<u64, QuotaReserveError> {
|
||||
let mut backoff_ms = QUOTA_RESERVE_BACKOFF_MIN_MS;
|
||||
let mut backoff_rounds = 0usize;
|
||||
@@ -634,7 +635,10 @@ async fn reserve_user_quota_with_yield(
|
||||
Err(QuotaReserveError::LimitExceeded) => {
|
||||
return Err(QuotaReserveError::LimitExceeded);
|
||||
}
|
||||
Err(QuotaReserveError::Contended) => std::hint::spin_loop(),
|
||||
Err(QuotaReserveError::Contended) => {
|
||||
stats.increment_quota_contention_total();
|
||||
std::hint::spin_loop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -642,6 +646,7 @@ async fn reserve_user_quota_with_yield(
|
||||
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
|
||||
backoff_rounds = backoff_rounds.saturating_add(1);
|
||||
if backoff_rounds >= QUOTA_RESERVE_MAX_BACKOFF_ROUNDS {
|
||||
stats.increment_quota_contention_timeout_total();
|
||||
return Err(QuotaReserveError::Contended);
|
||||
}
|
||||
backoff_ms = backoff_ms
|
||||
@@ -1656,19 +1661,28 @@ where
|
||||
if let (Some(limit), Some(user_stats)) =
|
||||
(quota_limit, quota_user_stats.as_deref())
|
||||
{
|
||||
if reserve_user_quota_with_yield(
|
||||
match reserve_user_quota_with_yield(
|
||||
user_stats,
|
||||
payload.len() as u64,
|
||||
limit,
|
||||
stats.as_ref(),
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
Ok(_) => {}
|
||||
Err(QuotaReserveError::LimitExceeded) => {
|
||||
main_result = Err(ProxyError::DataQuotaExceeded {
|
||||
user: user.clone(),
|
||||
});
|
||||
break;
|
||||
}
|
||||
Err(QuotaReserveError::Contended) => {
|
||||
main_result = Err(ProxyError::Proxy(
|
||||
"ME C->ME quota reservation contended".into(),
|
||||
));
|
||||
break;
|
||||
}
|
||||
}
|
||||
stats.add_user_octets_from_handle(user_stats, payload.len() as u64);
|
||||
} else {
|
||||
stats.add_user_octets_from(&user, payload.len() as u64);
|
||||
@@ -1741,6 +1755,8 @@ where
|
||||
joined.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME sender join error: {e}"))))
|
||||
}
|
||||
Err(_) => {
|
||||
stats.increment_me_child_join_timeout_total();
|
||||
stats.increment_me_child_abort_total();
|
||||
c2me_sender.abort();
|
||||
Err(ProxyError::Proxy("ME sender join timeout".into()))
|
||||
}
|
||||
@@ -1752,6 +1768,8 @@ where
|
||||
joined.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}"))))
|
||||
}
|
||||
Err(_) => {
|
||||
stats.increment_me_child_join_timeout_total();
|
||||
stats.increment_me_child_abort_total();
|
||||
me_writer.abort();
|
||||
Err(ProxyError::Proxy("ME writer join timeout".into()))
|
||||
}
|
||||
@@ -2357,15 +2375,20 @@ where
|
||||
let data_len = data.len() as u64;
|
||||
if let (Some(limit), Some(user_stats)) = (quota_limit, quota_user_stats) {
|
||||
let soft_limit = quota_soft_cap(limit, quota_soft_overshoot_bytes);
|
||||
if reserve_user_quota_with_yield(user_stats, data_len, soft_limit)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
match reserve_user_quota_with_yield(user_stats, data_len, soft_limit, stats).await {
|
||||
Ok(_) => {}
|
||||
Err(QuotaReserveError::LimitExceeded) => {
|
||||
stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite);
|
||||
return Err(ProxyError::DataQuotaExceeded {
|
||||
user: user.to_string(),
|
||||
});
|
||||
}
|
||||
Err(QuotaReserveError::Contended) => {
|
||||
return Err(ProxyError::Proxy(
|
||||
"ME D->C quota reservation contended".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
wait_for_traffic_budget(traffic_lease, RateDirection::Down, data_len).await;
|
||||
|
||||
|
||||
@@ -471,13 +471,16 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
|
||||
this.quota_exceeded.store(true, Ordering::Release);
|
||||
return Poll::Ready(Err(quota_io_error()));
|
||||
}
|
||||
Err(crate::stats::QuotaReserveError::Contended) => {}
|
||||
Err(crate::stats::QuotaReserveError::Contended) => {
|
||||
this.stats.increment_quota_contention_total();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if reserved_read_bytes == 0 {
|
||||
reserve_rounds = reserve_rounds.saturating_add(1);
|
||||
if reserve_rounds >= QUOTA_RESERVE_MAX_ROUNDS {
|
||||
this.stats.increment_quota_contention_timeout_total();
|
||||
if this.arm_quota_wait(cx).is_pending() {
|
||||
return Poll::Pending;
|
||||
}
|
||||
@@ -514,10 +517,12 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
|
||||
match read_result {
|
||||
Poll::Ready(Ok(n)) => {
|
||||
if reserved_read_bytes > n as u64 {
|
||||
let refund_bytes = reserved_read_bytes - n as u64;
|
||||
refund_reserved_quota_bytes(
|
||||
this.user_stats.as_ref(),
|
||||
reserved_read_bytes - n as u64,
|
||||
refund_bytes,
|
||||
);
|
||||
this.stats.add_quota_refund_bytes_total(refund_bytes);
|
||||
}
|
||||
if n > 0 {
|
||||
let n_to_charge = n as u64;
|
||||
@@ -565,12 +570,14 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
|
||||
Poll::Pending => {
|
||||
if reserved_read_bytes > 0 {
|
||||
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_read_bytes);
|
||||
this.stats.add_quota_refund_bytes_total(reserved_read_bytes);
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
Poll::Ready(Err(err)) => {
|
||||
if reserved_read_bytes > 0 {
|
||||
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_read_bytes);
|
||||
this.stats.add_quota_refund_bytes_total(reserved_read_bytes);
|
||||
}
|
||||
Poll::Ready(Err(err))
|
||||
}
|
||||
@@ -655,6 +662,7 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
|
||||
break;
|
||||
}
|
||||
Err(crate::stats::QuotaReserveError::Contended) => {
|
||||
this.stats.increment_quota_contention_total();
|
||||
saw_contention = true;
|
||||
}
|
||||
}
|
||||
@@ -663,6 +671,7 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
|
||||
if reserved_bytes == 0 {
|
||||
reserve_rounds = reserve_rounds.saturating_add(1);
|
||||
if reserve_rounds >= QUOTA_RESERVE_MAX_ROUNDS {
|
||||
this.stats.increment_quota_contention_timeout_total();
|
||||
if let Some(lease) = this.traffic_lease.as_ref() {
|
||||
lease.refund(RateDirection::Down, shaper_reserved_bytes);
|
||||
}
|
||||
@@ -690,10 +699,12 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
|
||||
match Pin::new(&mut this.inner).poll_write(cx, write_buf) {
|
||||
Poll::Ready(Ok(n)) => {
|
||||
if reserved_bytes > n as u64 {
|
||||
let refund_bytes = reserved_bytes - n as u64;
|
||||
refund_reserved_quota_bytes(
|
||||
this.user_stats.as_ref(),
|
||||
reserved_bytes - n as u64,
|
||||
refund_bytes,
|
||||
);
|
||||
this.stats.add_quota_refund_bytes_total(refund_bytes);
|
||||
}
|
||||
if shaper_reserved_bytes > n as u64
|
||||
&& let Some(lease) = this.traffic_lease.as_ref()
|
||||
@@ -744,6 +755,7 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
|
||||
Poll::Ready(Err(err)) => {
|
||||
if reserved_bytes > 0 {
|
||||
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_bytes);
|
||||
this.stats.add_quota_refund_bytes_total(reserved_bytes);
|
||||
}
|
||||
if shaper_reserved_bytes > 0
|
||||
&& let Some(lease) = this.traffic_lease.as_ref()
|
||||
@@ -755,6 +767,7 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
|
||||
Poll::Pending => {
|
||||
if reserved_bytes > 0 {
|
||||
refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_bytes);
|
||||
this.stats.add_quota_refund_bytes_total(reserved_bytes);
|
||||
}
|
||||
if shaper_reserved_bytes > 0
|
||||
&& let Some(lease) = this.traffic_lease.as_ref()
|
||||
|
||||
@@ -274,8 +274,13 @@ pub struct Stats {
|
||||
me_inline_recovery_total: AtomicU64,
|
||||
ip_reservation_rollback_tcp_limit_total: AtomicU64,
|
||||
ip_reservation_rollback_quota_limit_total: AtomicU64,
|
||||
quota_refund_bytes_total: AtomicU64,
|
||||
quota_contention_total: AtomicU64,
|
||||
quota_contention_timeout_total: AtomicU64,
|
||||
quota_write_fail_bytes_total: AtomicU64,
|
||||
quota_write_fail_events_total: AtomicU64,
|
||||
me_child_join_timeout_total: AtomicU64,
|
||||
me_child_abort_total: AtomicU64,
|
||||
telemetry_core_enabled: AtomicBool,
|
||||
telemetry_user_enabled: AtomicBool,
|
||||
telemetry_me_level: AtomicU8,
|
||||
@@ -1437,6 +1442,23 @@ impl Stats {
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn add_quota_refund_bytes_total(&self, bytes: u64) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.quota_refund_bytes_total
|
||||
.fetch_add(bytes, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_quota_contention_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.quota_contention_total.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_quota_contention_timeout_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.quota_contention_timeout_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn add_quota_write_fail_bytes_total(&self, bytes: u64) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.quota_write_fail_bytes_total
|
||||
@@ -1449,6 +1471,17 @@ impl Stats {
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_child_join_timeout_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.me_child_join_timeout_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_child_abort_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.me_child_abort_total.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_me_endpoint_quarantine_total(&self) {
|
||||
if self.telemetry_me_allows_normal() {
|
||||
self.me_endpoint_quarantine_total
|
||||
@@ -2283,12 +2316,28 @@ impl Stats {
|
||||
self.ip_reservation_rollback_quota_limit_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_quota_refund_bytes_total(&self) -> u64 {
|
||||
self.quota_refund_bytes_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_quota_contention_total(&self) -> u64 {
|
||||
self.quota_contention_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_quota_contention_timeout_total(&self) -> u64 {
|
||||
self.quota_contention_timeout_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_quota_write_fail_bytes_total(&self) -> u64 {
|
||||
self.quota_write_fail_bytes_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_quota_write_fail_events_total(&self) -> u64 {
|
||||
self.quota_write_fail_events_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_child_join_timeout_total(&self) -> u64 {
|
||||
self.me_child_join_timeout_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_me_child_abort_total(&self) -> u64 {
|
||||
self.me_child_abort_total.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn increment_user_connects(&self, user: &str) {
|
||||
if !self.telemetry_user_enabled() {
|
||||
|
||||
Reference in New Issue
Block a user