Shrink Session Vec

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-31 19:29:47 +03:00
parent e594d6f079
commit 729ffa0fcd
No known key found for this signature in database
1 changed files with 18 additions and 0 deletions

View File

@ -841,11 +841,23 @@ where
let me_writer = tokio::spawn(async move { let me_writer = tokio::spawn(async move {
let mut writer = crypto_writer; let mut writer = crypto_writer;
let mut frame_buf = Vec::with_capacity(16 * 1024); let mut frame_buf = Vec::with_capacity(16 * 1024);
let shrink_threshold = d2c_flush_policy.frame_buf_shrink_threshold_bytes;
fn shrink_session_vec(buf: &mut Vec<u8>, threshold: usize) {
if buf.capacity() > threshold {
buf.clear();
buf.shrink_to(threshold);
} else {
buf.clear();
}
}
loop { loop {
tokio::select! { tokio::select! {
msg = me_rx_task.recv() => { msg = me_rx_task.recv() => {
let Some(first) = msg else { let Some(first) = msg else {
debug!(conn_id, "ME channel closed"); debug!(conn_id, "ME channel closed");
shrink_session_vec(&mut frame_buf, shrink_threshold);
return Err(ProxyError::Proxy("ME connection lost".into())); return Err(ProxyError::Proxy("ME connection lost".into()));
}; };
@ -901,6 +913,7 @@ where
batch_bytes, batch_bytes,
flush_duration_us, flush_duration_us,
); );
shrink_session_vec(&mut frame_buf, shrink_threshold);
return Ok(()); return Ok(());
} }
} }
@ -962,6 +975,7 @@ where
batch_bytes, batch_bytes,
flush_duration_us, flush_duration_us,
); );
shrink_session_vec(&mut frame_buf, shrink_threshold);
return Ok(()); return Ok(());
} }
} }
@ -1027,6 +1041,7 @@ where
batch_bytes, batch_bytes,
flush_duration_us, flush_duration_us,
); );
shrink_session_vec(&mut frame_buf, shrink_threshold);
return Ok(()); return Ok(());
} }
} }
@ -1091,6 +1106,7 @@ where
batch_bytes, batch_bytes,
flush_duration_us, flush_duration_us,
); );
shrink_session_vec(&mut frame_buf, shrink_threshold);
return Ok(()); return Ok(());
} }
} }
@ -1098,6 +1114,7 @@ where
} }
Ok(None) => { Ok(None) => {
debug!(conn_id, "ME channel closed"); debug!(conn_id, "ME channel closed");
shrink_session_vec(&mut frame_buf, shrink_threshold);
return Err(ProxyError::Proxy("ME connection lost".into())); return Err(ProxyError::Proxy("ME connection lost".into()));
} }
Err(_) => { Err(_) => {
@ -1147,6 +1164,7 @@ where
} }
_ = &mut stop_rx => { _ = &mut stop_rx => {
debug!(conn_id, "ME writer stop signal"); debug!(conn_id, "ME writer stop signal");
shrink_session_vec(&mut frame_buf, shrink_threshold);
return Ok(()); return Ok(());
} }
} }