ME Writer Lifecycle Core

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-25 19:47:41 +03:00
parent 1c3e0d4e46
commit dc6b6d3f9d
No known key found for this signature in database
4 changed files with 38 additions and 24 deletions

View File

@ -275,10 +275,20 @@ pub(super) struct ReinitCore {
pub(super) hardswap: AtomicBool, pub(super) hardswap: AtomicBool,
} }
pub(super) struct WriterLifecycleCore {
pub(super) me_keepalive_enabled: bool,
pub(super) me_keepalive_interval: Duration,
pub(super) me_keepalive_jitter: Duration,
pub(super) me_keepalive_payload_random: bool,
pub(super) rpc_proxy_req_every_secs: AtomicU64,
pub(super) writer_cmd_channel_capacity: usize,
}
#[allow(dead_code)] #[allow(dead_code)]
pub struct MePool { pub struct MePool {
pub(super) routing: Arc<RoutingCore>, pub(super) routing: Arc<RoutingCore>,
pub(super) reinit: Arc<ReinitCore>, pub(super) reinit: Arc<ReinitCore>,
pub(super) writer_lifecycle: Arc<WriterLifecycleCore>,
pub(super) decision: NetworkDecision, pub(super) decision: NetworkDecision,
pub(super) upstream: Option<Arc<UpstreamManager>>, pub(super) upstream: Option<Arc<UpstreamManager>>,
pub(super) rng: Arc<SecureRandom>, pub(super) rng: Arc<SecureRandom>,
@ -297,12 +307,6 @@ pub struct MePool {
pub(super) stun_backoff_until: Arc<RwLock<Option<Instant>>>, pub(super) stun_backoff_until: Arc<RwLock<Option<Instant>>>,
pub(super) me_one_retry: u8, pub(super) me_one_retry: u8,
pub(super) me_one_timeout: Duration, pub(super) me_one_timeout: Duration,
pub(super) me_keepalive_enabled: bool,
pub(super) me_keepalive_interval: Duration,
pub(super) me_keepalive_jitter: Duration,
pub(super) me_keepalive_payload_random: bool,
pub(super) rpc_proxy_req_every_secs: AtomicU64,
pub(super) writer_cmd_channel_capacity: usize,
pub(super) me_warmup_stagger_enabled: bool, pub(super) me_warmup_stagger_enabled: bool,
pub(super) me_warmup_step_delay: Duration, pub(super) me_warmup_step_delay: Duration,
pub(super) me_warmup_step_jitter: Duration, pub(super) me_warmup_step_jitter: Duration,
@ -556,6 +560,14 @@ impl MePool {
pending_hardswap_map_hash: AtomicU64::new(0), pending_hardswap_map_hash: AtomicU64::new(0),
hardswap: AtomicBool::new(hardswap), hardswap: AtomicBool::new(hardswap),
}), }),
writer_lifecycle: Arc::new(WriterLifecycleCore {
me_keepalive_enabled,
me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs),
me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs),
me_keepalive_payload_random,
rpc_proxy_req_every_secs: AtomicU64::new(rpc_proxy_req_every_secs),
writer_cmd_channel_capacity: me_writer_cmd_channel_capacity.max(1),
}),
decision, decision,
upstream, upstream,
rng, rng,
@ -588,12 +600,6 @@ impl MePool {
me_one_retry, me_one_retry,
me_one_timeout: Duration::from_millis(me_one_timeout_ms), me_one_timeout: Duration::from_millis(me_one_timeout_ms),
stats, stats,
me_keepalive_enabled,
me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs),
me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs),
me_keepalive_payload_random,
rpc_proxy_req_every_secs: AtomicU64::new(rpc_proxy_req_every_secs),
writer_cmd_channel_capacity: me_writer_cmd_channel_capacity.max(1),
me_warmup_stagger_enabled, me_warmup_stagger_enabled,
me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms), me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms),
me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms), me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms),

View File

@ -558,11 +558,14 @@ impl MePool {
adaptive_floor_warm_writers_current: self adaptive_floor_warm_writers_current: self
.me_adaptive_floor_warm_writers_current .me_adaptive_floor_warm_writers_current
.load(Ordering::Relaxed), .load(Ordering::Relaxed),
me_keepalive_enabled: self.me_keepalive_enabled, me_keepalive_enabled: self.writer_lifecycle.me_keepalive_enabled,
me_keepalive_interval_secs: self.me_keepalive_interval.as_secs(), me_keepalive_interval_secs: self.writer_lifecycle.me_keepalive_interval.as_secs(),
me_keepalive_jitter_secs: self.me_keepalive_jitter.as_secs(), me_keepalive_jitter_secs: self.writer_lifecycle.me_keepalive_jitter.as_secs(),
me_keepalive_payload_random: self.me_keepalive_payload_random, me_keepalive_payload_random: self.writer_lifecycle.me_keepalive_payload_random,
rpc_proxy_req_every_secs: self.rpc_proxy_req_every_secs.load(Ordering::Relaxed), rpc_proxy_req_every_secs: self
.writer_lifecycle
.rpc_proxy_req_every_secs
.load(Ordering::Relaxed),
me_reconnect_max_concurrent_per_dc: self.me_reconnect_max_concurrent_per_dc, me_reconnect_max_concurrent_per_dc: self.me_reconnect_max_concurrent_per_dc,
me_reconnect_backoff_base_ms: self.me_reconnect_backoff_base.as_millis() as u64, me_reconnect_backoff_base_ms: self.me_reconnect_backoff_base.as_millis() as u64,
me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64, me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64,

View File

@ -365,7 +365,9 @@ impl MePool {
let draining_started_at_epoch_secs = Arc::new(AtomicU64::new(0)); let draining_started_at_epoch_secs = Arc::new(AtomicU64::new(0));
let drain_deadline_epoch_secs = Arc::new(AtomicU64::new(0)); let drain_deadline_epoch_secs = Arc::new(AtomicU64::new(0));
let allow_drain_fallback = Arc::new(AtomicBool::new(false)); let allow_drain_fallback = Arc::new(AtomicBool::new(false));
let (tx, rx) = mpsc::channel::<WriterCommand>(self.writer_cmd_channel_capacity); let (tx, rx) = mpsc::channel::<WriterCommand>(
self.writer_lifecycle.writer_cmd_channel_capacity,
);
let rpc_writer = RpcWriter { let rpc_writer = RpcWriter {
writer: hs.wr, writer: hs.wr,
key: hs.write_key, key: hs.write_key,
@ -414,11 +416,14 @@ impl MePool {
let tx_reader = tx.clone(); let tx_reader = tx.clone();
let tx_ping = tx.clone(); let tx_ping = tx.clone();
let tx_signal = tx.clone(); let tx_signal = tx.clone();
let keepalive_enabled = self.me_keepalive_enabled; let keepalive_enabled = self.writer_lifecycle.me_keepalive_enabled;
let keepalive_interval = self.me_keepalive_interval; let keepalive_interval = self.writer_lifecycle.me_keepalive_interval;
let keepalive_jitter = self.me_keepalive_jitter; let keepalive_jitter = self.writer_lifecycle.me_keepalive_jitter;
let keepalive_jitter_signal = self.me_keepalive_jitter; let keepalive_jitter_signal = self.writer_lifecycle.me_keepalive_jitter;
let rpc_proxy_req_every_secs = self.rpc_proxy_req_every_secs.load(Ordering::Relaxed); let rpc_proxy_req_every_secs = self
.writer_lifecycle
.rpc_proxy_req_every_secs
.load(Ordering::Relaxed);
let cancel_reader = cancel.clone(); let cancel_reader = cancel.clone();
let cancel_writer = cancel.clone(); let cancel_writer = cancel.clone();
let cancel_ping = cancel.clone(); let cancel_ping = cancel.clone();

View File

@ -857,7 +857,7 @@ impl MePool {
(self.writer_idle_rank_for_selection(writer, idle_since_by_writer, now_epoch_secs) (self.writer_idle_rank_for_selection(writer, idle_since_by_writer, now_epoch_secs)
as u64) as u64)
* 100; * 100;
let queue_cap = self.writer_cmd_channel_capacity.max(1) as u64; let queue_cap = self.writer_lifecycle.writer_cmd_channel_capacity.max(1) as u64;
let queue_remaining = writer.tx.capacity() as u64; let queue_remaining = writer.tx.capacity() as u64;
let queue_used = queue_cap.saturating_sub(queue_remaining.min(queue_cap)); let queue_used = queue_cap.saturating_sub(queue_remaining.min(queue_cap));
let queue_util_pct = queue_used.saturating_mul(100) / queue_cap; let queue_util_pct = queue_used.saturating_mul(100) / queue_cap;