Merge upstream/main into flow-sec rehearsal: resolve config and middle-proxy health conflicts

This commit is contained in:
David Osipov 2026-03-17 18:29:56 +04:00
parent 1357f3cc4c
commit 4808a30185
No known key found for this signature in database
GPG Key ID: 0E55C4A47454E82E
1 changed files with 106 additions and 27 deletions

View File

@ -124,12 +124,12 @@ pub(super) async fn reap_draining_writers(
let drain_threshold = pool
.me_pool_drain_threshold
.load(std::sync::atomic::Ordering::Relaxed);
let writers = pool.writers.read().await.clone();
let activity = pool.registry.writer_activity_snapshot().await;
let mut draining_writers = Vec::new();
let mut draining_writers = Vec::<DrainingWriterSnapshot>::new();
let mut empty_writer_ids = Vec::<u64>::new();
let mut force_close_writer_ids = Vec::<u64>::new();
for writer in writers {
let writers = pool.writers.read().await;
for writer in writers.iter() {
if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) {
continue;
}
@ -143,23 +143,38 @@ pub(super) async fn reap_draining_writers(
empty_writer_ids.push(writer.id);
continue;
}
draining_writers.push(writer);
draining_writers.push(DrainingWriterSnapshot {
id: writer.id,
writer_dc: writer.writer_dc,
addr: writer.addr,
generation: writer.generation,
created_at: writer.created_at,
draining_started_at_epoch_secs: writer
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed),
drain_deadline_epoch_secs: writer
.drain_deadline_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed),
allow_drain_fallback: writer
.allow_drain_fallback
.load(std::sync::atomic::Ordering::Relaxed),
});
}
drop(writers);
if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize {
let overflow = if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize {
draining_writers.len().saturating_sub(drain_threshold as usize)
} else {
0
};
if overflow > 0 {
draining_writers.sort_by(|left, right| {
let left_started = left
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
let right_started = right
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
left_started
.cmp(&right_started)
left.draining_started_at_epoch_secs
.cmp(&right.draining_started_at_epoch_secs)
.then_with(|| left.created_at.cmp(&right.created_at))
.then_with(|| left.id.cmp(&right.id))
});
let overflow = draining_writers.len().saturating_sub(drain_threshold as usize);
warn!(
draining_writers = draining_writers.len(),
me_pool_drain_threshold = drain_threshold,
@ -174,12 +189,9 @@ pub(super) async fn reap_draining_writers(
let mut active_draining_writer_ids = HashSet::with_capacity(draining_writers.len());
for writer in draining_writers {
active_draining_writer_ids.insert(writer.id);
let drain_started_at_epoch_secs = writer
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if drain_ttl_secs > 0
&& drain_started_at_epoch_secs != 0
&& now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > drain_ttl_secs
&& writer.draining_started_at_epoch_secs != 0
&& now_epoch_secs.saturating_sub(writer.draining_started_at_epoch_secs) > drain_ttl_secs
&& should_emit_writer_warn(
warn_next_allowed,
writer.id,
@ -194,14 +206,12 @@ pub(super) async fn reap_draining_writers(
generation = writer.generation,
drain_ttl_secs,
force_close_secs = pool.me_pool_force_close_secs.load(std::sync::atomic::Ordering::Relaxed),
allow_drain_fallback = writer.allow_drain_fallback.load(std::sync::atomic::Ordering::Relaxed),
allow_drain_fallback = writer.allow_drain_fallback,
"ME draining writer remains non-empty past drain TTL"
);
}
let deadline_epoch_secs = writer
.drain_deadline_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if deadline_epoch_secs != 0 && now_epoch_secs >= deadline_epoch_secs {
if writer.drain_deadline_epoch_secs != 0 && now_epoch_secs >= writer.drain_deadline_epoch_secs
{
warn!(writer_id = writer.id, "Drain timeout, force-closing");
force_close_writer_ids.push(writer.id);
active_draining_writer_ids.remove(&writer.id);
@ -258,6 +268,18 @@ pub(super) fn health_drain_close_budget() -> usize {
.clamp(HEALTH_DRAIN_CLOSE_BUDGET_MIN, HEALTH_DRAIN_CLOSE_BUDGET_MAX)
}
#[derive(Debug, Clone)]
struct DrainingWriterSnapshot {
id: u64,
writer_dc: i32,
addr: SocketAddr,
generation: u64,
created_at: Instant,
draining_started_at_epoch_secs: u64,
drain_deadline_epoch_secs: u64,
allow_drain_fallback: bool,
}
fn should_emit_writer_warn(
next_allowed: &mut HashMap<u64, Instant>,
writer_id: u64,
@ -1391,6 +1413,15 @@ mod tests {
me_pool_drain_threshold,
..GeneralConfig::default()
};
let mut proxy_map_v4 = HashMap::new();
proxy_map_v4.insert(
2,
vec![(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 10)), 443)],
);
let decision = NetworkDecision {
ipv4_me: true,
..NetworkDecision::default()
};
MePool::new(
None,
vec![1u8; 32],
@ -1402,10 +1433,10 @@ mod tests {
None,
12,
1200,
HashMap::new(),
proxy_map_v4,
HashMap::new(),
None,
NetworkDecision::default(),
decision,
None,
Arc::new(SecureRandom::new()),
Arc::new(Stats::default()),
@ -1516,8 +1547,55 @@ mod tests {
conn_id
}
async fn insert_live_writer(pool: &Arc<MePool>, writer_id: u64, writer_dc: i32) {
let (tx, _writer_rx) = mpsc::channel::<WriterCommand>(8);
let writer = MeWriter {
id: writer_id,
addr: SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(203, 0, 113, (writer_id as u8).saturating_add(1))),
4000 + writer_id as u16,
),
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
writer_dc,
generation: 2,
contour: Arc::new(AtomicU8::new(WriterContour::Active.as_u8())),
created_at: Instant::now(),
tx: tx.clone(),
cancel: CancellationToken::new(),
degraded: Arc::new(AtomicBool::new(false)),
rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)),
draining: Arc::new(AtomicBool::new(false)),
draining_started_at_epoch_secs: Arc::new(AtomicU64::new(0)),
drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)),
allow_drain_fallback: Arc::new(AtomicBool::new(false)),
};
pool.writers.write().await.push(writer);
pool.registry.register_writer(writer_id, tx).await;
pool.conn_count.fetch_add(1, Ordering::Relaxed);
}
#[tokio::test]
async fn reap_draining_writers_force_closes_oldest_over_threshold() {
let pool = make_pool(2).await;
insert_live_writer(&pool, 1, 2).await;
let now_epoch_secs = MePool::now_epoch_secs();
let conn_a = insert_draining_writer(&pool, 10, now_epoch_secs.saturating_sub(30)).await;
let conn_b = insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(20)).await;
let conn_c = insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(10)).await;
let mut warn_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
let mut writer_ids: Vec<u64> = pool.writers.read().await.iter().map(|writer| writer.id).collect();
writer_ids.sort_unstable();
assert_eq!(writer_ids, vec![1, 20, 30]);
assert!(pool.registry.get_writer(conn_a).await.is_none());
assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20);
assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30);
}
#[tokio::test]
async fn reap_draining_writers_force_closes_overflow_without_replacement() {
let pool = make_pool(2).await;
let now_epoch_secs = MePool::now_epoch_secs();
let conn_a = insert_draining_writer(&pool, 10, now_epoch_secs.saturating_sub(30)).await;
@ -1527,7 +1605,8 @@ mod tests {
reap_draining_writers(&pool, &mut warn_next_allowed).await;
let writer_ids: Vec<u64> = pool.writers.read().await.iter().map(|writer| writer.id).collect();
let mut writer_ids: Vec<u64> = pool.writers.read().await.iter().map(|writer| writer.id).collect();
writer_ids.sort_unstable();
assert_eq!(writer_ids, vec![20, 30]);
assert!(pool.registry.get_writer(conn_a).await.is_none());
assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20);