Compare commits

..

4 Commits

Author SHA1 Message Date
Alexey
9abaf9006c Prioritize Cancellation in MP select paths 2026-05-22 16:47:54 +03:00
Alexey
231f04a810 Bump 2026-05-22 11:00:41 +03:00
Alexey
b32daf79bc Update Cargo.lock 2026-05-22 11:00:18 +03:00
Alexey
f668759c05 API Fixes + Exclusive Mask + Startup Speed-up + IDN + Decomposing hot-path modules: merge pull request #796 from telemt/flow
API Fixes + Exclusive Mask + Startup Speed-up + IDN + Decomposing hot-path modules
2026-05-22 10:55:26 +03:00
5 changed files with 14 additions and 6 deletions

2
Cargo.lock generated
View File

@@ -2791,7 +2791,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "telemt"
version = "3.4.11"
version = "3.4.12"
dependencies = [
"aes",
"anyhow",

View File

@@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.4.11"
version = "3.4.12"
edition = "2024"
[features]

View File

@@ -41,11 +41,12 @@ pub(super) async fn reserve_user_quota_with_yield(
return Err(MiddleQuotaReserveError::DeadlineExceeded);
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {}
biased;
_ = cancel.cancelled() => {
stats.increment_quota_acquire_cancelled_total();
return Err(MiddleQuotaReserveError::Cancelled);
}
_ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {}
}
backoff_rounds = backoff_rounds.saturating_add(1);
if backoff_rounds >= QUOTA_RESERVE_MAX_BACKOFF_ROUNDS {
@@ -128,11 +129,12 @@ pub(super) async fn wait_for_traffic_budget_or_cancel(
return Err(ProxyError::TrafficBudgetWaitDeadlineExceeded);
}
tokio::select! {
_ = tokio::time::sleep(next_refill_delay()) => {}
biased;
_ = cancel.cancelled() => {
stats.increment_flow_wait_middle_rate_limit_cancelled_total();
return Err(ProxyError::TrafficBudgetWaitCancelled);
}
_ = tokio::time::sleep(next_refill_delay()) => {}
}
let wait_ms = wait_started_at
.elapsed()

View File

@@ -52,6 +52,8 @@ async fn writer_command_loop(
) -> Result<()> {
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => return Ok(()),
cmd = rx.recv() => {
match cmd {
Some(WriterCommand::Data(payload)) => {
@@ -69,7 +71,6 @@ async fn writer_command_loop(
Some(WriterCommand::Close) | None => return Ok(()),
}
}
_ = cancel.cancelled() => return Ok(()),
}
}
}
@@ -108,6 +109,7 @@ async fn ping_loop(
Duration::from_secs(wait)
};
tokio::select! {
biased;
_ = cancel_ping_token.cancelled() => return,
_ = tokio::time::sleep(startup_jitter) => {}
}
@@ -131,6 +133,7 @@ async fn ping_loop(
Duration::from_secs(secs)
};
tokio::select! {
biased;
_ = cancel_ping_token.cancelled() => return,
_ = tokio::time::sleep(wait) => {}
}
@@ -191,6 +194,7 @@ async fn rpc_proxy_req_signal_loop(
};
tokio::select! {
biased;
_ = cancel_signal.cancelled() => return,
_ = tokio::time::sleep(Duration::from_millis(startup_jitter_ms)) => {}
}
@@ -207,6 +211,7 @@ async fn rpc_proxy_req_signal_loop(
};
tokio::select! {
biased;
_ = cancel_signal.cancelled() => return,
_ = tokio::time::sleep(wait) => {}
}

View File

@@ -269,12 +269,13 @@ pub(crate) async fn reader_loop(
fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed));
let mut retry_only = false;
let n = tokio::select! {
biased;
_ = cancel.cancelled() => return Ok(()),
res = rd.read(&mut tmp) => res.map_err(ProxyError::Io)?,
_ = tokio::time::sleep(backlog_retry_delay), if backlog_retry_enabled => {
retry_only = true;
0usize
},
_ = cancel.cancelled() => return Ok(()),
};
if retry_only {
let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);