mirror of
https://github.com/telemt/telemt.git
synced 2026-04-22 21:14:10 +03:00
Fairshare Disabled semantics fix
This commit is contained in:
@@ -138,6 +138,9 @@ impl WorkerFairnessState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn set_backpressure_enabled(&mut self, enabled: bool) {
|
pub(crate) fn set_backpressure_enabled(&mut self, enabled: bool) {
|
||||||
|
if self.config.backpressure_enabled == enabled {
|
||||||
|
return;
|
||||||
|
}
|
||||||
self.config.backpressure_enabled = enabled;
|
self.config.backpressure_enabled = enabled;
|
||||||
self.config.pressure.backpressure_enabled = enabled;
|
self.config.pressure.backpressure_enabled = enabled;
|
||||||
self.evaluate_pressure(Instant::now(), true);
|
self.evaluate_pressure(Instant::now(), true);
|
||||||
|
|||||||
@@ -262,9 +262,8 @@ pub(crate) async fn reader_loop(
|
|||||||
let fairshare_enabled = route_fairshare_enabled.load(Ordering::Relaxed);
|
let fairshare_enabled = route_fairshare_enabled.load(Ordering::Relaxed);
|
||||||
fairness.set_backpressure_enabled(backpressure_enabled);
|
fairness.set_backpressure_enabled(backpressure_enabled);
|
||||||
let fairness_has_backlog = should_schedule_fairness_retry(&fairness_snapshot);
|
let fairness_has_backlog = should_schedule_fairness_retry(&fairness_snapshot);
|
||||||
let fairshare_active = fairshare_enabled || fairness_has_backlog;
|
|
||||||
let mut tmp = [0u8; 65_536];
|
let mut tmp = [0u8; 65_536];
|
||||||
let backlog_retry_enabled = fairshare_active && fairness_has_backlog;
|
let backlog_retry_enabled = fairness_has_backlog;
|
||||||
let backlog_retry_delay =
|
let backlog_retry_delay =
|
||||||
fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed));
|
fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed));
|
||||||
let mut retry_only = false;
|
let mut retry_only = false;
|
||||||
@@ -368,7 +367,7 @@ pub(crate) async fn reader_loop(
|
|||||||
let data = body.slice(12..);
|
let data = body.slice(12..);
|
||||||
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
|
trace!(cid, flags, len = data.len(), "RPC_PROXY_ANS");
|
||||||
|
|
||||||
if fairshare_active {
|
if fairshare_enabled {
|
||||||
let admission = fairness.enqueue_data(cid, flags, data, Instant::now());
|
let admission = fairness.enqueue_data(cid, flags, data, Instant::now());
|
||||||
if !matches!(admission, AdmissionDecision::Admit) {
|
if !matches!(admission, AdmissionDecision::Admit) {
|
||||||
stats.increment_me_route_drop_queue_full();
|
stats.increment_me_route_drop_queue_full();
|
||||||
|
|||||||
Reference in New Issue
Block a user