diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index aebc628..01833fc 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -841,11 +841,23 @@ where let me_writer = tokio::spawn(async move { let mut writer = crypto_writer; let mut frame_buf = Vec::with_capacity(16 * 1024); + let shrink_threshold = d2c_flush_policy.frame_buf_shrink_threshold_bytes; + + fn shrink_session_vec(buf: &mut Vec, threshold: usize) { + if buf.capacity() > threshold { + buf.clear(); + buf.shrink_to(threshold); + } else { + buf.clear(); + } + } + loop { tokio::select! { msg = me_rx_task.recv() => { let Some(first) = msg else { debug!(conn_id, "ME channel closed"); + shrink_session_vec(&mut frame_buf, shrink_threshold); return Err(ProxyError::Proxy("ME connection lost".into())); }; @@ -901,6 +913,7 @@ where batch_bytes, flush_duration_us, ); + shrink_session_vec(&mut frame_buf, shrink_threshold); return Ok(()); } } @@ -962,6 +975,7 @@ where batch_bytes, flush_duration_us, ); + shrink_session_vec(&mut frame_buf, shrink_threshold); return Ok(()); } } @@ -1027,6 +1041,7 @@ where batch_bytes, flush_duration_us, ); + shrink_session_vec(&mut frame_buf, shrink_threshold); return Ok(()); } } @@ -1091,6 +1106,7 @@ where batch_bytes, flush_duration_us, ); + shrink_session_vec(&mut frame_buf, shrink_threshold); return Ok(()); } } @@ -1098,6 +1114,7 @@ where } Ok(None) => { debug!(conn_id, "ME channel closed"); + shrink_session_vec(&mut frame_buf, shrink_threshold); return Err(ProxyError::Proxy("ME connection lost".into())); } Err(_) => { @@ -1147,6 +1164,7 @@ where } _ = &mut stop_rx => { debug!(conn_id, "ME writer stop signal"); + shrink_session_vec(&mut frame_buf, shrink_threshold); return Ok(()); } }