diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 9ebc412..e617a43 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -275,10 +275,20 @@ pub(super) struct ReinitCore { pub(super) hardswap: AtomicBool, } +pub(super) struct WriterLifecycleCore { + pub(super) me_keepalive_enabled: bool, + pub(super) me_keepalive_interval: Duration, + pub(super) me_keepalive_jitter: Duration, + pub(super) me_keepalive_payload_random: bool, + pub(super) rpc_proxy_req_every_secs: AtomicU64, + pub(super) writer_cmd_channel_capacity: usize, +} + #[allow(dead_code)] pub struct MePool { pub(super) routing: Arc, pub(super) reinit: Arc, + pub(super) writer_lifecycle: Arc, pub(super) decision: NetworkDecision, pub(super) upstream: Option>, pub(super) rng: Arc, @@ -297,12 +307,6 @@ pub struct MePool { pub(super) stun_backoff_until: Arc>>, pub(super) me_one_retry: u8, pub(super) me_one_timeout: Duration, - pub(super) me_keepalive_enabled: bool, - pub(super) me_keepalive_interval: Duration, - pub(super) me_keepalive_jitter: Duration, - pub(super) me_keepalive_payload_random: bool, - pub(super) rpc_proxy_req_every_secs: AtomicU64, - pub(super) writer_cmd_channel_capacity: usize, pub(super) me_warmup_stagger_enabled: bool, pub(super) me_warmup_step_delay: Duration, pub(super) me_warmup_step_jitter: Duration, @@ -556,6 +560,14 @@ impl MePool { pending_hardswap_map_hash: AtomicU64::new(0), hardswap: AtomicBool::new(hardswap), }), + writer_lifecycle: Arc::new(WriterLifecycleCore { + me_keepalive_enabled, + me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs), + me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs), + me_keepalive_payload_random, + rpc_proxy_req_every_secs: AtomicU64::new(rpc_proxy_req_every_secs), + writer_cmd_channel_capacity: me_writer_cmd_channel_capacity.max(1), + }), decision, upstream, rng, @@ -588,12 +600,6 @@ impl MePool { me_one_retry, me_one_timeout: Duration::from_millis(me_one_timeout_ms), stats, - me_keepalive_enabled, - me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs), - me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs), - me_keepalive_payload_random, - rpc_proxy_req_every_secs: AtomicU64::new(rpc_proxy_req_every_secs), - writer_cmd_channel_capacity: me_writer_cmd_channel_capacity.max(1), me_warmup_stagger_enabled, me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms), me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms), diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index e636b34..7ee4422 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -558,11 +558,14 @@ impl MePool { adaptive_floor_warm_writers_current: self .me_adaptive_floor_warm_writers_current .load(Ordering::Relaxed), - me_keepalive_enabled: self.me_keepalive_enabled, - me_keepalive_interval_secs: self.me_keepalive_interval.as_secs(), - me_keepalive_jitter_secs: self.me_keepalive_jitter.as_secs(), - me_keepalive_payload_random: self.me_keepalive_payload_random, - rpc_proxy_req_every_secs: self.rpc_proxy_req_every_secs.load(Ordering::Relaxed), + me_keepalive_enabled: self.writer_lifecycle.me_keepalive_enabled, + me_keepalive_interval_secs: self.writer_lifecycle.me_keepalive_interval.as_secs(), + me_keepalive_jitter_secs: self.writer_lifecycle.me_keepalive_jitter.as_secs(), + me_keepalive_payload_random: self.writer_lifecycle.me_keepalive_payload_random, + rpc_proxy_req_every_secs: self + .writer_lifecycle + .rpc_proxy_req_every_secs + .load(Ordering::Relaxed), me_reconnect_max_concurrent_per_dc: self.me_reconnect_max_concurrent_per_dc, me_reconnect_backoff_base_ms: self.me_reconnect_backoff_base.as_millis() as u64, me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64, diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index d2d7420..f697257 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -365,7 +365,9 @@ impl MePool { let draining_started_at_epoch_secs = Arc::new(AtomicU64::new(0)); let drain_deadline_epoch_secs = Arc::new(AtomicU64::new(0)); let allow_drain_fallback = Arc::new(AtomicBool::new(false)); - let (tx, rx) = mpsc::channel::(self.writer_cmd_channel_capacity); + let (tx, rx) = mpsc::channel::( + self.writer_lifecycle.writer_cmd_channel_capacity, + ); let rpc_writer = RpcWriter { writer: hs.wr, key: hs.write_key, @@ -414,11 +416,14 @@ impl MePool { let tx_reader = tx.clone(); let tx_ping = tx.clone(); let tx_signal = tx.clone(); - let keepalive_enabled = self.me_keepalive_enabled; - let keepalive_interval = self.me_keepalive_interval; - let keepalive_jitter = self.me_keepalive_jitter; - let keepalive_jitter_signal = self.me_keepalive_jitter; - let rpc_proxy_req_every_secs = self.rpc_proxy_req_every_secs.load(Ordering::Relaxed); + let keepalive_enabled = self.writer_lifecycle.me_keepalive_enabled; + let keepalive_interval = self.writer_lifecycle.me_keepalive_interval; + let keepalive_jitter = self.writer_lifecycle.me_keepalive_jitter; + let keepalive_jitter_signal = self.writer_lifecycle.me_keepalive_jitter; + let rpc_proxy_req_every_secs = self + .writer_lifecycle + .rpc_proxy_req_every_secs + .load(Ordering::Relaxed); let cancel_reader = cancel.clone(); let cancel_writer = cancel.clone(); let cancel_ping = cancel.clone(); diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index d38775f..80fd2ea 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -857,7 +857,7 @@ impl MePool { (self.writer_idle_rank_for_selection(writer, idle_since_by_writer, now_epoch_secs) as u64) * 100; - let queue_cap = self.writer_cmd_channel_capacity.max(1) as u64; + let queue_cap = self.writer_lifecycle.writer_cmd_channel_capacity.max(1) as u64; let queue_remaining = writer.tx.capacity() as u64; let queue_used = queue_cap.saturating_sub(queue_remaining.min(queue_cap)); let queue_util_pct = queue_used.saturating_mul(100) / queue_cap;