Cleanup Methods for Memory Consistency

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-03-31 18:40:04 +03:00
parent 2df6b8704d
commit ecd6a19246
4 changed files with 221 additions and 10 deletions

View File

@@ -35,6 +35,10 @@ pub struct BufferPool {
misses: AtomicUsize,
/// Number of successful reuses
hits: AtomicUsize,
/// Number of non-standard buffers replaced with a fresh default-sized buffer
replaced_nonstandard: AtomicUsize,
/// Number of buffers dropped because the pool queue was full
dropped_pool_full: AtomicUsize,
}
impl BufferPool {
@@ -52,6 +56,8 @@ impl BufferPool {
allocated: AtomicUsize::new(0),
misses: AtomicUsize::new(0),
hits: AtomicUsize::new(0),
replaced_nonstandard: AtomicUsize::new(0),
dropped_pool_full: AtomicUsize::new(0),
}
}
@@ -91,17 +97,36 @@ impl BufferPool {
/// Return a buffer to the pool
fn return_buffer(&self, mut buffer: BytesMut) {
// Clear the buffer but keep capacity
buffer.clear();
const MAX_RETAINED_BUFFER_FACTOR: usize = 2;
// Only return if we haven't exceeded max and buffer is right size
if buffer.capacity() >= self.buffer_size {
// Try to push to pool, if full just drop
let _ = self.buffers.push(buffer);
// Clear the buffer but keep capacity.
buffer.clear();
let max_retained_capacity = self
.buffer_size
.saturating_mul(MAX_RETAINED_BUFFER_FACTOR)
.max(self.buffer_size);
// Keep only near-default capacities in the pool. Oversized buffers keep
// RSS elevated for hours under churn; replace them with default-sized
// buffers before re-pooling.
if buffer.capacity() < self.buffer_size || buffer.capacity() > max_retained_capacity {
self.replaced_nonstandard.fetch_add(1, Ordering::Relaxed);
buffer = BytesMut::with_capacity(self.buffer_size);
}
// If buffer was dropped (pool full), decrement allocated
// Actually we don't decrement here because the buffer might have been
// grown beyond our size - we just let it go
// Try to return into the queue; if full, drop and update accounting.
if self.buffers.push(buffer).is_err() {
self.dropped_pool_full.fetch_add(1, Ordering::Relaxed);
self.decrement_allocated();
}
}
fn decrement_allocated(&self) {
let _ = self
.allocated
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
Some(current.saturating_sub(1))
});
}
/// Get pool statistics
@@ -113,6 +138,8 @@ impl BufferPool {
buffer_size: self.buffer_size,
hits: self.hits.load(Ordering::Relaxed),
misses: self.misses.load(Ordering::Relaxed),
replaced_nonstandard: self.replaced_nonstandard.load(Ordering::Relaxed),
dropped_pool_full: self.dropped_pool_full.load(Ordering::Relaxed),
}
}
@@ -160,6 +187,10 @@ pub struct PoolStats {
pub hits: usize,
/// Number of cache misses (new allocation)
pub misses: usize,
/// Number of non-standard buffers replaced during return
pub replaced_nonstandard: usize,
/// Number of buffers dropped because the pool queue was full
pub dropped_pool_full: usize,
}
impl PoolStats {
@@ -185,6 +216,7 @@ pub struct PooledBuffer {
impl PooledBuffer {
/// Take the inner buffer, preventing return to pool
pub fn take(mut self) -> BytesMut {
self.pool.decrement_allocated();
self.buffer.take().unwrap()
}
@@ -364,6 +396,25 @@ mod tests {
let stats = pool.stats();
assert_eq!(stats.pooled, 0);
assert_eq!(stats.allocated, 0);
}
#[test]
fn test_pool_replaces_oversized_buffers() {
let pool = Arc::new(BufferPool::with_config(1024, 10));
{
let mut buf = pool.get();
buf.reserve(8192);
assert!(buf.capacity() > 2048);
}
let stats = pool.stats();
assert_eq!(stats.replaced_nonstandard, 1);
assert_eq!(stats.pooled, 1);
let buf = pool.get();
assert!(buf.capacity() <= 2048);
}
#[test]