mirror of
https://github.com/telemt/telemt.git
synced 2026-05-13 15:21:44 +03:00
Bounded Rate Bursts + Cancel ME Waits
This commit is contained in:
@@ -543,10 +543,17 @@ pub struct GeneralConfig {
|
|||||||
pub me_d2c_frame_buf_shrink_threshold_bytes: usize,
|
pub me_d2c_frame_buf_shrink_threshold_bytes: usize,
|
||||||
|
|
||||||
/// Copy buffer size for client->DC direction in direct relay.
|
/// Copy buffer size for client->DC direction in direct relay.
|
||||||
|
///
|
||||||
|
/// This is also the upper bound for one amortized upload rate-limit burst:
|
||||||
|
/// upload debt is settled before the next relay read instead of blocking
|
||||||
|
/// inside the completed read path.
|
||||||
#[serde(default = "default_direct_relay_copy_buf_c2s_bytes")]
|
#[serde(default = "default_direct_relay_copy_buf_c2s_bytes")]
|
||||||
pub direct_relay_copy_buf_c2s_bytes: usize,
|
pub direct_relay_copy_buf_c2s_bytes: usize,
|
||||||
|
|
||||||
/// Copy buffer size for DC->client direction in direct relay.
|
/// Copy buffer size for DC->client direction in direct relay.
|
||||||
|
///
|
||||||
|
/// This bounds one direct download rate-limit grant because writes are
|
||||||
|
/// clipped to the currently available shaper budget.
|
||||||
#[serde(default = "default_direct_relay_copy_buf_s2c_bytes")]
|
#[serde(default = "default_direct_relay_copy_buf_s2c_bytes")]
|
||||||
pub direct_relay_copy_buf_s2c_bytes: usize,
|
pub direct_relay_copy_buf_s2c_bytes: usize,
|
||||||
|
|
||||||
@@ -1891,14 +1898,17 @@ pub struct AccessConfig {
|
|||||||
///
|
///
|
||||||
/// Each entry supports independent upload (`up_bps`) and download
|
/// Each entry supports independent upload (`up_bps`) and download
|
||||||
/// (`down_bps`) ceilings. A value of `0` in one direction means
|
/// (`down_bps`) ceilings. A value of `0` in one direction means
|
||||||
/// "unlimited" for that direction.
|
/// "unlimited" for that direction. Limits are amortized: a relay quantum
|
||||||
|
/// may pass as a bounded burst, and the limiter applies the resulting wait
|
||||||
|
/// before later traffic in the same direction proceeds.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub user_rate_limits: HashMap<String, RateLimitBps>,
|
pub user_rate_limits: HashMap<String, RateLimitBps>,
|
||||||
|
|
||||||
/// Per-CIDR aggregate transport rate limits in bits-per-second.
|
/// Per-CIDR aggregate transport rate limits in bits-per-second.
|
||||||
///
|
///
|
||||||
/// Matching uses longest-prefix-wins semantics. A value of `0` in one
|
/// Matching uses longest-prefix-wins semantics. A value of `0` in one
|
||||||
/// direction means "unlimited" for that direction.
|
/// direction means "unlimited" for that direction. Limits are amortized
|
||||||
|
/// with the same bounded-burst contract as per-user rate limits.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub cidr_rate_limits: HashMap<IpNetwork, RateLimitBps>,
|
pub cidr_rate_limits: HashMap<IpNetwork, RateLimitBps>,
|
||||||
|
|
||||||
|
|||||||
@@ -899,6 +899,32 @@ async fn render_metrics(
|
|||||||
);
|
);
|
||||||
|
|
||||||
let limiter_metrics = shared_state.traffic_limiter.metrics_snapshot();
|
let limiter_metrics = shared_state.traffic_limiter.metrics_snapshot();
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_rate_limiter_burst_bound_bytes Configured upper bound for one direct relay rate-limit burst"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_rate_limiter_burst_bound_bytes gauge"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_rate_limiter_burst_bound_bytes{{direction=\"up\"}} {}",
|
||||||
|
if core_enabled {
|
||||||
|
config.general.direct_relay_copy_buf_c2s_bytes
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_rate_limiter_burst_bound_bytes{{direction=\"down\"}} {}",
|
||||||
|
if core_enabled {
|
||||||
|
config.general.direct_relay_copy_buf_s2c_bytes
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
out,
|
out,
|
||||||
"# HELP telemt_rate_limiter_throttle_total Traffic limiter throttle events by scope and direction"
|
"# HELP telemt_rate_limiter_throttle_total Traffic limiter throttle events by scope and direction"
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ use std::time::{Duration, Instant};
|
|||||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot, watch};
|
use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot, watch};
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, info, trace, warn};
|
use tracing::{debug, info, trace, warn};
|
||||||
|
|
||||||
use crate::config::{ConntrackPressureProfile, ProxyConfig};
|
use crate::config::{ConntrackPressureProfile, ProxyConfig};
|
||||||
@@ -659,12 +660,12 @@ async fn wait_for_traffic_budget(
|
|||||||
lease: Option<&Arc<TrafficLease>>,
|
lease: Option<&Arc<TrafficLease>>,
|
||||||
direction: RateDirection,
|
direction: RateDirection,
|
||||||
bytes: u64,
|
bytes: u64,
|
||||||
) {
|
) -> Result<()> {
|
||||||
if bytes == 0 {
|
if bytes == 0 {
|
||||||
return;
|
return Ok(());
|
||||||
}
|
}
|
||||||
let Some(lease) = lease else {
|
let Some(lease) = lease else {
|
||||||
return;
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut remaining = bytes;
|
let mut remaining = bytes;
|
||||||
@@ -688,6 +689,51 @@ async fn wait_for_traffic_budget(
|
|||||||
wait_ms,
|
wait_ms,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn wait_for_traffic_budget_or_cancel(
|
||||||
|
lease: Option<&Arc<TrafficLease>>,
|
||||||
|
direction: RateDirection,
|
||||||
|
bytes: u64,
|
||||||
|
cancel: &CancellationToken,
|
||||||
|
) -> Result<()> {
|
||||||
|
if bytes == 0 {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
let Some(lease) = lease else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut remaining = bytes;
|
||||||
|
while remaining > 0 {
|
||||||
|
let consume = lease.try_consume(direction, remaining);
|
||||||
|
if consume.granted > 0 {
|
||||||
|
remaining = remaining.saturating_sub(consume.granted);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let wait_started_at = Instant::now();
|
||||||
|
tokio::select! {
|
||||||
|
_ = tokio::time::sleep(next_refill_delay()) => {}
|
||||||
|
_ = cancel.cancelled() => {
|
||||||
|
return Err(ProxyError::Proxy("traffic budget wait cancelled".into()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let wait_ms = wait_started_at
|
||||||
|
.elapsed()
|
||||||
|
.as_millis()
|
||||||
|
.min(u128::from(u64::MAX)) as u64;
|
||||||
|
lease.observe_wait_ms(
|
||||||
|
direction,
|
||||||
|
consume.blocked_user,
|
||||||
|
consume.blocked_cidr,
|
||||||
|
wait_ms,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn classify_me_d2c_flush_reason(
|
fn classify_me_d2c_flush_reason(
|
||||||
@@ -1217,12 +1263,14 @@ where
|
|||||||
});
|
});
|
||||||
|
|
||||||
let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
|
let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
|
||||||
|
let flow_cancel = CancellationToken::new();
|
||||||
let mut me_rx_task = me_rx;
|
let mut me_rx_task = me_rx;
|
||||||
let stats_clone = stats.clone();
|
let stats_clone = stats.clone();
|
||||||
let rng_clone = rng.clone();
|
let rng_clone = rng.clone();
|
||||||
let user_clone = user.clone();
|
let user_clone = user.clone();
|
||||||
let quota_user_stats_me_writer = quota_user_stats.clone();
|
let quota_user_stats_me_writer = quota_user_stats.clone();
|
||||||
let traffic_lease_me_writer = traffic_lease.clone();
|
let traffic_lease_me_writer = traffic_lease.clone();
|
||||||
|
let flow_cancel_me_writer = flow_cancel.clone();
|
||||||
let last_downstream_activity_ms_clone = last_downstream_activity_ms.clone();
|
let last_downstream_activity_ms_clone = last_downstream_activity_ms.clone();
|
||||||
let bytes_me2c_clone = bytes_me2c.clone();
|
let bytes_me2c_clone = bytes_me2c.clone();
|
||||||
let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config);
|
let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config);
|
||||||
@@ -1268,6 +1316,7 @@ where
|
|||||||
quota_limit,
|
quota_limit,
|
||||||
d2c_flush_policy.quota_soft_overshoot_bytes,
|
d2c_flush_policy.quota_soft_overshoot_bytes,
|
||||||
traffic_lease_me_writer.as_ref(),
|
traffic_lease_me_writer.as_ref(),
|
||||||
|
&flow_cancel_me_writer,
|
||||||
bytes_me2c_clone.as_ref(),
|
bytes_me2c_clone.as_ref(),
|
||||||
conn_id,
|
conn_id,
|
||||||
d2c_flush_policy.ack_flush_immediate,
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
@@ -1329,6 +1378,7 @@ where
|
|||||||
quota_limit,
|
quota_limit,
|
||||||
d2c_flush_policy.quota_soft_overshoot_bytes,
|
d2c_flush_policy.quota_soft_overshoot_bytes,
|
||||||
traffic_lease_me_writer.as_ref(),
|
traffic_lease_me_writer.as_ref(),
|
||||||
|
&flow_cancel_me_writer,
|
||||||
bytes_me2c_clone.as_ref(),
|
bytes_me2c_clone.as_ref(),
|
||||||
conn_id,
|
conn_id,
|
||||||
d2c_flush_policy.ack_flush_immediate,
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
@@ -1393,6 +1443,7 @@ where
|
|||||||
quota_limit,
|
quota_limit,
|
||||||
d2c_flush_policy.quota_soft_overshoot_bytes,
|
d2c_flush_policy.quota_soft_overshoot_bytes,
|
||||||
traffic_lease_me_writer.as_ref(),
|
traffic_lease_me_writer.as_ref(),
|
||||||
|
&flow_cancel_me_writer,
|
||||||
bytes_me2c_clone.as_ref(),
|
bytes_me2c_clone.as_ref(),
|
||||||
conn_id,
|
conn_id,
|
||||||
d2c_flush_policy.ack_flush_immediate,
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
@@ -1459,6 +1510,7 @@ where
|
|||||||
quota_limit,
|
quota_limit,
|
||||||
d2c_flush_policy.quota_soft_overshoot_bytes,
|
d2c_flush_policy.quota_soft_overshoot_bytes,
|
||||||
traffic_lease_me_writer.as_ref(),
|
traffic_lease_me_writer.as_ref(),
|
||||||
|
&flow_cancel_me_writer,
|
||||||
bytes_me2c_clone.as_ref(),
|
bytes_me2c_clone.as_ref(),
|
||||||
conn_id,
|
conn_id,
|
||||||
d2c_flush_policy.ack_flush_immediate,
|
d2c_flush_policy.ack_flush_immediate,
|
||||||
@@ -1654,7 +1706,7 @@ where
|
|||||||
RateDirection::Up,
|
RateDirection::Up,
|
||||||
payload.len() as u64,
|
payload.len() as u64,
|
||||||
)
|
)
|
||||||
.await;
|
.await?;
|
||||||
forensics.bytes_c2me = forensics
|
forensics.bytes_c2me = forensics
|
||||||
.bytes_c2me
|
.bytes_c2me
|
||||||
.saturating_add(payload.len() as u64);
|
.saturating_add(payload.len() as u64);
|
||||||
@@ -1762,6 +1814,7 @@ where
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
flow_cancel.cancel();
|
||||||
let _ = stop_tx.send(());
|
let _ = stop_tx.send(());
|
||||||
let mut writer_result = match timeout(ME_CHILD_JOIN_TIMEOUT, &mut me_writer).await {
|
let mut writer_result = match timeout(ME_CHILD_JOIN_TIMEOUT, &mut me_writer).await {
|
||||||
Ok(joined) => {
|
Ok(joined) => {
|
||||||
@@ -2337,6 +2390,7 @@ where
|
|||||||
quota_limit,
|
quota_limit,
|
||||||
quota_soft_overshoot_bytes,
|
quota_soft_overshoot_bytes,
|
||||||
None,
|
None,
|
||||||
|
&CancellationToken::new(),
|
||||||
bytes_me2c,
|
bytes_me2c,
|
||||||
conn_id,
|
conn_id,
|
||||||
ack_flush_immediate,
|
ack_flush_immediate,
|
||||||
@@ -2357,6 +2411,7 @@ async fn process_me_writer_response_with_traffic_lease<W>(
|
|||||||
quota_limit: Option<u64>,
|
quota_limit: Option<u64>,
|
||||||
quota_soft_overshoot_bytes: u64,
|
quota_soft_overshoot_bytes: u64,
|
||||||
traffic_lease: Option<&Arc<TrafficLease>>,
|
traffic_lease: Option<&Arc<TrafficLease>>,
|
||||||
|
cancel: &CancellationToken,
|
||||||
bytes_me2c: &AtomicU64,
|
bytes_me2c: &AtomicU64,
|
||||||
conn_id: u64,
|
conn_id: u64,
|
||||||
ack_flush_immediate: bool,
|
ack_flush_immediate: bool,
|
||||||
@@ -2390,7 +2445,13 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
wait_for_traffic_budget(traffic_lease, RateDirection::Down, data_len).await;
|
wait_for_traffic_budget_or_cancel(
|
||||||
|
traffic_lease,
|
||||||
|
RateDirection::Down,
|
||||||
|
data_len,
|
||||||
|
cancel,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let write_mode =
|
let write_mode =
|
||||||
match write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf)
|
match write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf)
|
||||||
@@ -2428,7 +2489,8 @@ where
|
|||||||
} else {
|
} else {
|
||||||
trace!(conn_id, confirm, "ME->C quickack");
|
trace!(conn_id, confirm, "ME->C quickack");
|
||||||
}
|
}
|
||||||
wait_for_traffic_budget(traffic_lease, RateDirection::Down, 4).await;
|
wait_for_traffic_budget_or_cancel(traffic_lease, RateDirection::Down, 4, cancel)
|
||||||
|
.await?;
|
||||||
write_client_ack(client_writer, proto_tag, confirm).await?;
|
write_client_ack(client_writer, proto_tag, confirm).await?;
|
||||||
stats.increment_me_d2c_ack_frames_total();
|
stats.increment_me_d2c_ack_frames_total();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user