mirror of
https://github.com/telemt/telemt.git
synced 2026-05-22 19:51:43 +03:00
Compare commits
4 Commits
4d9e835fa2
...
flow
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9abaf9006c | ||
|
|
231f04a810 | ||
|
|
b32daf79bc | ||
|
|
f668759c05 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2791,7 +2791,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "telemt"
|
name = "telemt"
|
||||||
version = "3.4.11"
|
version = "3.4.12"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aes",
|
"aes",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "telemt"
|
name = "telemt"
|
||||||
version = "3.4.11"
|
version = "3.4.12"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
|
|||||||
@@ -41,11 +41,12 @@ pub(super) async fn reserve_user_quota_with_yield(
|
|||||||
return Err(MiddleQuotaReserveError::DeadlineExceeded);
|
return Err(MiddleQuotaReserveError::DeadlineExceeded);
|
||||||
}
|
}
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {}
|
biased;
|
||||||
_ = cancel.cancelled() => {
|
_ = cancel.cancelled() => {
|
||||||
stats.increment_quota_acquire_cancelled_total();
|
stats.increment_quota_acquire_cancelled_total();
|
||||||
return Err(MiddleQuotaReserveError::Cancelled);
|
return Err(MiddleQuotaReserveError::Cancelled);
|
||||||
}
|
}
|
||||||
|
_ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {}
|
||||||
}
|
}
|
||||||
backoff_rounds = backoff_rounds.saturating_add(1);
|
backoff_rounds = backoff_rounds.saturating_add(1);
|
||||||
if backoff_rounds >= QUOTA_RESERVE_MAX_BACKOFF_ROUNDS {
|
if backoff_rounds >= QUOTA_RESERVE_MAX_BACKOFF_ROUNDS {
|
||||||
@@ -128,11 +129,12 @@ pub(super) async fn wait_for_traffic_budget_or_cancel(
|
|||||||
return Err(ProxyError::TrafficBudgetWaitDeadlineExceeded);
|
return Err(ProxyError::TrafficBudgetWaitDeadlineExceeded);
|
||||||
}
|
}
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = tokio::time::sleep(next_refill_delay()) => {}
|
biased;
|
||||||
_ = cancel.cancelled() => {
|
_ = cancel.cancelled() => {
|
||||||
stats.increment_flow_wait_middle_rate_limit_cancelled_total();
|
stats.increment_flow_wait_middle_rate_limit_cancelled_total();
|
||||||
return Err(ProxyError::TrafficBudgetWaitCancelled);
|
return Err(ProxyError::TrafficBudgetWaitCancelled);
|
||||||
}
|
}
|
||||||
|
_ = tokio::time::sleep(next_refill_delay()) => {}
|
||||||
}
|
}
|
||||||
let wait_ms = wait_started_at
|
let wait_ms = wait_started_at
|
||||||
.elapsed()
|
.elapsed()
|
||||||
|
|||||||
@@ -52,6 +52,8 @@ async fn writer_command_loop(
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
biased;
|
||||||
|
_ = cancel.cancelled() => return Ok(()),
|
||||||
cmd = rx.recv() => {
|
cmd = rx.recv() => {
|
||||||
match cmd {
|
match cmd {
|
||||||
Some(WriterCommand::Data(payload)) => {
|
Some(WriterCommand::Data(payload)) => {
|
||||||
@@ -69,7 +71,6 @@ async fn writer_command_loop(
|
|||||||
Some(WriterCommand::Close) | None => return Ok(()),
|
Some(WriterCommand::Close) | None => return Ok(()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = cancel.cancelled() => return Ok(()),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -108,6 +109,7 @@ async fn ping_loop(
|
|||||||
Duration::from_secs(wait)
|
Duration::from_secs(wait)
|
||||||
};
|
};
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
biased;
|
||||||
_ = cancel_ping_token.cancelled() => return,
|
_ = cancel_ping_token.cancelled() => return,
|
||||||
_ = tokio::time::sleep(startup_jitter) => {}
|
_ = tokio::time::sleep(startup_jitter) => {}
|
||||||
}
|
}
|
||||||
@@ -131,6 +133,7 @@ async fn ping_loop(
|
|||||||
Duration::from_secs(secs)
|
Duration::from_secs(secs)
|
||||||
};
|
};
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
biased;
|
||||||
_ = cancel_ping_token.cancelled() => return,
|
_ = cancel_ping_token.cancelled() => return,
|
||||||
_ = tokio::time::sleep(wait) => {}
|
_ = tokio::time::sleep(wait) => {}
|
||||||
}
|
}
|
||||||
@@ -191,6 +194,7 @@ async fn rpc_proxy_req_signal_loop(
|
|||||||
};
|
};
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
biased;
|
||||||
_ = cancel_signal.cancelled() => return,
|
_ = cancel_signal.cancelled() => return,
|
||||||
_ = tokio::time::sleep(Duration::from_millis(startup_jitter_ms)) => {}
|
_ = tokio::time::sleep(Duration::from_millis(startup_jitter_ms)) => {}
|
||||||
}
|
}
|
||||||
@@ -207,6 +211,7 @@ async fn rpc_proxy_req_signal_loop(
|
|||||||
};
|
};
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
biased;
|
||||||
_ = cancel_signal.cancelled() => return,
|
_ = cancel_signal.cancelled() => return,
|
||||||
_ = tokio::time::sleep(wait) => {}
|
_ = tokio::time::sleep(wait) => {}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -269,12 +269,13 @@ pub(crate) async fn reader_loop(
|
|||||||
fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed));
|
fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed));
|
||||||
let mut retry_only = false;
|
let mut retry_only = false;
|
||||||
let n = tokio::select! {
|
let n = tokio::select! {
|
||||||
|
biased;
|
||||||
|
_ = cancel.cancelled() => return Ok(()),
|
||||||
res = rd.read(&mut tmp) => res.map_err(ProxyError::Io)?,
|
res = rd.read(&mut tmp) => res.map_err(ProxyError::Io)?,
|
||||||
_ = tokio::time::sleep(backlog_retry_delay), if backlog_retry_enabled => {
|
_ = tokio::time::sleep(backlog_retry_delay), if backlog_retry_enabled => {
|
||||||
retry_only = true;
|
retry_only = true;
|
||||||
0usize
|
0usize
|
||||||
},
|
},
|
||||||
_ = cancel.cancelled() => return Ok(()),
|
|
||||||
};
|
};
|
||||||
if retry_only {
|
if retry_only {
|
||||||
let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
|
let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed);
|
||||||
|
|||||||
Reference in New Issue
Block a user