mirror of https://github.com/telemt/telemt.git
Compare commits
2 Commits
9c4ac98ad6
...
1d0fa57490
| Author | SHA1 | Date |
|---|---|---|
|
|
1d0fa57490 | |
|
|
c2f16a343a |
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "telemt"
|
name = "telemt"
|
||||||
version = "3.3.36"
|
version = "3.3.35"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,9 @@
|
||||||
***Löst Probleme, bevor andere überhaupt wissen, dass sie existieren*** / ***It solves problems before others even realize they exist***
|
***Löst Probleme, bevor andere überhaupt wissen, dass sie existieren*** / ***It solves problems before others even realize they exist***
|
||||||
|
|
||||||
### [**Telemt Chat in Telegram**](https://t.me/telemtrs)
|
### [**Telemt Chat in Telegram**](https://t.me/telemtrs)
|
||||||
|
#### Fixed TLS ClientHello is now available in Telegram Desktop starting from version 6.7.2: to work with EE-MTProxy, please update your client;
|
||||||
|
#### Fixed TLS ClientHello for Telegram Android Client is available in [our chat](https://t.me/telemtrs/30234/36441); official releases for Android and iOS are "work in progress";
|
||||||
|
|
||||||
|
|
||||||
**Telemt** is a fast, secure, and feature-rich server written in Rust: it fully implements the official Telegram proxy algo and adds many production-ready improvements such as:
|
**Telemt** is a fast, secure, and feature-rich server written in Rust: it fully implements the official Telegram proxy algo and adds many production-ready improvements such as:
|
||||||
- [ME Pool + Reader/Writer + Registry + Refill + Adaptive Floor + Trio-State + Generation Lifecycle](https://github.com/telemt/telemt/blob/main/docs/model/MODEL.en.md)
|
- [ME Pool + Reader/Writer + Registry + Refill + Adaptive Floor + Trio-State + Generation Lifecycle](https://github.com/telemt/telemt/blob/main/docs/model/MODEL.en.md)
|
||||||
|
|
|
||||||
|
|
@ -26,15 +26,6 @@ pub struct UserIpTracker {
|
||||||
cleanup_drain_lock: Arc<AsyncMutex<()>>,
|
cleanup_drain_lock: Arc<AsyncMutex<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
|
||||||
pub struct UserIpTrackerMemoryStats {
|
|
||||||
pub active_users: usize,
|
|
||||||
pub recent_users: usize,
|
|
||||||
pub active_entries: usize,
|
|
||||||
pub recent_entries: usize,
|
|
||||||
pub cleanup_queue_len: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl UserIpTracker {
|
impl UserIpTracker {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
|
@ -150,13 +141,6 @@ impl UserIpTracker {
|
||||||
|
|
||||||
let mut active_ips = self.active_ips.write().await;
|
let mut active_ips = self.active_ips.write().await;
|
||||||
let mut recent_ips = self.recent_ips.write().await;
|
let mut recent_ips = self.recent_ips.write().await;
|
||||||
let window = *self.limit_window.read().await;
|
|
||||||
let now = Instant::now();
|
|
||||||
|
|
||||||
for user_recent in recent_ips.values_mut() {
|
|
||||||
Self::prune_recent(user_recent, now, window);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut users =
|
let mut users =
|
||||||
Vec::<String>::with_capacity(active_ips.len().saturating_add(recent_ips.len()));
|
Vec::<String>::with_capacity(active_ips.len().saturating_add(recent_ips.len()));
|
||||||
users.extend(active_ips.keys().cloned());
|
users.extend(active_ips.keys().cloned());
|
||||||
|
|
@ -182,26 +166,6 @@ impl UserIpTracker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn memory_stats(&self) -> UserIpTrackerMemoryStats {
|
|
||||||
let cleanup_queue_len = self
|
|
||||||
.cleanup_queue
|
|
||||||
.lock()
|
|
||||||
.unwrap_or_else(|poisoned| poisoned.into_inner())
|
|
||||||
.len();
|
|
||||||
let active_ips = self.active_ips.read().await;
|
|
||||||
let recent_ips = self.recent_ips.read().await;
|
|
||||||
let active_entries = active_ips.values().map(HashMap::len).sum();
|
|
||||||
let recent_entries = recent_ips.values().map(HashMap::len).sum();
|
|
||||||
|
|
||||||
UserIpTrackerMemoryStats {
|
|
||||||
active_users: active_ips.len(),
|
|
||||||
recent_users: recent_ips.len(),
|
|
||||||
active_entries,
|
|
||||||
recent_entries,
|
|
||||||
cleanup_queue_len,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn set_limit_policy(&self, mode: UserMaxUniqueIpsMode, window_secs: u64) {
|
pub async fn set_limit_policy(&self, mode: UserMaxUniqueIpsMode, window_secs: u64) {
|
||||||
{
|
{
|
||||||
let mut current_mode = self.limit_mode.write().await;
|
let mut current_mode = self.limit_mode.write().await;
|
||||||
|
|
@ -487,7 +451,6 @@ impl Default for UserIpTracker {
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
||||||
use std::sync::atomic::Ordering;
|
|
||||||
|
|
||||||
fn test_ipv4(oct1: u8, oct2: u8, oct3: u8, oct4: u8) -> IpAddr {
|
fn test_ipv4(oct1: u8, oct2: u8, oct3: u8, oct4: u8) -> IpAddr {
|
||||||
IpAddr::V4(Ipv4Addr::new(oct1, oct2, oct3, oct4))
|
IpAddr::V4(Ipv4Addr::new(oct1, oct2, oct3, oct4))
|
||||||
|
|
@ -801,54 +764,4 @@ mod tests {
|
||||||
tokio::time::sleep(Duration::from_millis(1100)).await;
|
tokio::time::sleep(Duration::from_millis(1100)).await;
|
||||||
assert!(tracker.check_and_add("test_user", ip2).await.is_ok());
|
assert!(tracker.check_and_add("test_user", ip2).await.is_ok());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_memory_stats_reports_queue_and_entry_counts() {
|
|
||||||
let tracker = UserIpTracker::new();
|
|
||||||
tracker.set_user_limit("test_user", 4).await;
|
|
||||||
let ip1 = test_ipv4(10, 2, 0, 1);
|
|
||||||
let ip2 = test_ipv4(10, 2, 0, 2);
|
|
||||||
|
|
||||||
tracker.check_and_add("test_user", ip1).await.unwrap();
|
|
||||||
tracker.check_and_add("test_user", ip2).await.unwrap();
|
|
||||||
tracker.enqueue_cleanup("test_user".to_string(), ip1);
|
|
||||||
|
|
||||||
let snapshot = tracker.memory_stats().await;
|
|
||||||
assert_eq!(snapshot.active_users, 1);
|
|
||||||
assert_eq!(snapshot.recent_users, 1);
|
|
||||||
assert_eq!(snapshot.active_entries, 2);
|
|
||||||
assert_eq!(snapshot.recent_entries, 2);
|
|
||||||
assert_eq!(snapshot.cleanup_queue_len, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_compact_prunes_stale_recent_entries() {
|
|
||||||
let tracker = UserIpTracker::new();
|
|
||||||
tracker
|
|
||||||
.set_limit_policy(UserMaxUniqueIpsMode::TimeWindow, 1)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let stale_user = "stale-user".to_string();
|
|
||||||
let stale_ip = test_ipv4(10, 3, 0, 1);
|
|
||||||
{
|
|
||||||
let mut recent_ips = tracker.recent_ips.write().await;
|
|
||||||
recent_ips
|
|
||||||
.entry(stale_user.clone())
|
|
||||||
.or_insert_with(HashMap::new)
|
|
||||||
.insert(stale_ip, Instant::now() - Duration::from_secs(5));
|
|
||||||
}
|
|
||||||
|
|
||||||
tracker.last_compact_epoch_secs.store(0, Ordering::Relaxed);
|
|
||||||
tracker
|
|
||||||
.check_and_add("trigger-user", test_ipv4(10, 3, 0, 2))
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let recent_ips = tracker.recent_ips.read().await;
|
|
||||||
let stale_exists = recent_ips
|
|
||||||
.get(&stale_user)
|
|
||||||
.map(|ips| ips.contains_key(&stale_ip))
|
|
||||||
.unwrap_or(false);
|
|
||||||
assert!(!stale_exists);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
102
src/metrics.rs
102
src/metrics.rs
|
|
@ -293,27 +293,6 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"# HELP telemt_buffer_pool_buffers_total Snapshot of pooled and allocated buffers"
|
|
||||||
);
|
|
||||||
let _ = writeln!(out, "# TYPE telemt_buffer_pool_buffers_total gauge");
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_buffer_pool_buffers_total{{kind=\"pooled\"}} {}",
|
|
||||||
stats.get_buffer_pool_pooled_gauge()
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_buffer_pool_buffers_total{{kind=\"allocated\"}} {}",
|
|
||||||
stats.get_buffer_pool_allocated_gauge()
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_buffer_pool_buffers_total{{kind=\"in_use\"}} {}",
|
|
||||||
stats.get_buffer_pool_in_use_gauge()
|
|
||||||
);
|
|
||||||
|
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
out,
|
out,
|
||||||
"# HELP telemt_connections_total Total accepted connections"
|
"# HELP telemt_connections_total Total accepted connections"
|
||||||
|
|
@ -962,39 +941,6 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"# HELP telemt_me_c2me_enqueue_events_total ME client->ME enqueue outcomes"
|
|
||||||
);
|
|
||||||
let _ = writeln!(out, "# TYPE telemt_me_c2me_enqueue_events_total counter");
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_me_c2me_enqueue_events_total{{event=\"full\"}} {}",
|
|
||||||
if me_allows_normal {
|
|
||||||
stats.get_me_c2me_send_full_total()
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_me_c2me_enqueue_events_total{{event=\"high_water\"}} {}",
|
|
||||||
if me_allows_normal {
|
|
||||||
stats.get_me_c2me_send_high_water_total()
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_me_c2me_enqueue_events_total{{event=\"timeout\"}} {}",
|
|
||||||
if me_allows_normal {
|
|
||||||
stats.get_me_c2me_send_timeout_total()
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
out,
|
out,
|
||||||
"# HELP telemt_me_d2c_batches_total Total DC->Client flush batches"
|
"# HELP telemt_me_d2c_batches_total Total DC->Client flush batches"
|
||||||
|
|
@ -2544,48 +2490,6 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
||||||
if user_enabled { 0 } else { 1 }
|
if user_enabled { 0 } else { 1 }
|
||||||
);
|
);
|
||||||
|
|
||||||
let ip_memory = ip_tracker.memory_stats().await;
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"# HELP telemt_ip_tracker_users Number of users tracked by IP limiter state"
|
|
||||||
);
|
|
||||||
let _ = writeln!(out, "# TYPE telemt_ip_tracker_users gauge");
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_ip_tracker_users{{scope=\"active\"}} {}",
|
|
||||||
ip_memory.active_users
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_ip_tracker_users{{scope=\"recent\"}} {}",
|
|
||||||
ip_memory.recent_users
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"# HELP telemt_ip_tracker_entries Number of IP entries tracked by limiter state"
|
|
||||||
);
|
|
||||||
let _ = writeln!(out, "# TYPE telemt_ip_tracker_entries gauge");
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_ip_tracker_entries{{scope=\"active\"}} {}",
|
|
||||||
ip_memory.active_entries
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_ip_tracker_entries{{scope=\"recent\"}} {}",
|
|
||||||
ip_memory.recent_entries
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"# HELP telemt_ip_tracker_cleanup_queue_len Deferred disconnect cleanup queue length"
|
|
||||||
);
|
|
||||||
let _ = writeln!(out, "# TYPE telemt_ip_tracker_cleanup_queue_len gauge");
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_ip_tracker_cleanup_queue_len {}",
|
|
||||||
ip_memory.cleanup_queue_len
|
|
||||||
);
|
|
||||||
|
|
||||||
if user_enabled {
|
if user_enabled {
|
||||||
for entry in stats.iter_user_stats() {
|
for entry in stats.iter_user_stats() {
|
||||||
let user = entry.key();
|
let user = entry.key();
|
||||||
|
|
@ -2824,9 +2728,6 @@ mod tests {
|
||||||
assert!(output.contains("telemt_user_unique_ips_recent_window{user=\"alice\"} 1"));
|
assert!(output.contains("telemt_user_unique_ips_recent_window{user=\"alice\"} 1"));
|
||||||
assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 4"));
|
assert!(output.contains("telemt_user_unique_ips_limit{user=\"alice\"} 4"));
|
||||||
assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.250000"));
|
assert!(output.contains("telemt_user_unique_ips_utilization{user=\"alice\"} 0.250000"));
|
||||||
assert!(output.contains("telemt_ip_tracker_users{scope=\"active\"} 1"));
|
|
||||||
assert!(output.contains("telemt_ip_tracker_entries{scope=\"active\"} 1"));
|
|
||||||
assert!(output.contains("telemt_ip_tracker_cleanup_queue_len 0"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
@ -2898,9 +2799,6 @@ mod tests {
|
||||||
assert!(output.contains("# TYPE telemt_user_unique_ips_recent_window gauge"));
|
assert!(output.contains("# TYPE telemt_user_unique_ips_recent_window gauge"));
|
||||||
assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge"));
|
assert!(output.contains("# TYPE telemt_user_unique_ips_limit gauge"));
|
||||||
assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge"));
|
assert!(output.contains("# TYPE telemt_user_unique_ips_utilization gauge"));
|
||||||
assert!(output.contains("# TYPE telemt_ip_tracker_users gauge"));
|
|
||||||
assert!(output.contains("# TYPE telemt_ip_tracker_entries gauge"));
|
|
||||||
assert!(output.contains("# TYPE telemt_ip_tracker_cleanup_queue_len gauge"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
|
||||||
|
|
@ -276,7 +276,6 @@ where
|
||||||
stats.increment_user_connects(user);
|
stats.increment_user_connects(user);
|
||||||
let _direct_connection_lease = stats.acquire_direct_connection_lease();
|
let _direct_connection_lease = stats.acquire_direct_connection_lease();
|
||||||
|
|
||||||
let buffer_pool_trim = Arc::clone(&buffer_pool);
|
|
||||||
let relay_result = relay_bidirectional(
|
let relay_result = relay_bidirectional(
|
||||||
client_reader,
|
client_reader,
|
||||||
client_writer,
|
client_writer,
|
||||||
|
|
@ -322,13 +321,6 @@ where
|
||||||
Err(e) => debug!(user = %user, error = %e, "Direct relay ended with error"),
|
Err(e) => debug!(user = %user, error = %e, "Direct relay ended with error"),
|
||||||
}
|
}
|
||||||
|
|
||||||
buffer_pool_trim.trim_to(buffer_pool_trim.max_buffers().min(64));
|
|
||||||
let pool_snapshot = buffer_pool_trim.stats();
|
|
||||||
stats.set_buffer_pool_gauges(
|
|
||||||
pool_snapshot.pooled,
|
|
||||||
pool_snapshot.allocated,
|
|
||||||
pool_snapshot.allocated.saturating_sub(pool_snapshot.pooled),
|
|
||||||
);
|
|
||||||
relay_result
|
relay_result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -645,14 +645,11 @@ async fn enqueue_c2me_command(
|
||||||
tx: &mpsc::Sender<C2MeCommand>,
|
tx: &mpsc::Sender<C2MeCommand>,
|
||||||
cmd: C2MeCommand,
|
cmd: C2MeCommand,
|
||||||
send_timeout: Option<Duration>,
|
send_timeout: Option<Duration>,
|
||||||
stats: &Stats,
|
|
||||||
) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> {
|
) -> std::result::Result<(), mpsc::error::SendError<C2MeCommand>> {
|
||||||
match tx.try_send(cmd) {
|
match tx.try_send(cmd) {
|
||||||
Ok(()) => Ok(()),
|
Ok(()) => Ok(()),
|
||||||
Err(mpsc::error::TrySendError::Closed(cmd)) => Err(mpsc::error::SendError(cmd)),
|
Err(mpsc::error::TrySendError::Closed(cmd)) => Err(mpsc::error::SendError(cmd)),
|
||||||
Err(mpsc::error::TrySendError::Full(cmd)) => {
|
Err(mpsc::error::TrySendError::Full(cmd)) => {
|
||||||
stats.increment_me_c2me_send_full_total();
|
|
||||||
stats.increment_me_c2me_send_high_water_total();
|
|
||||||
note_relay_pressure_event();
|
note_relay_pressure_event();
|
||||||
// Cooperative yield reduces burst catch-up when the per-conn queue is near saturation.
|
// Cooperative yield reduces burst catch-up when the per-conn queue is near saturation.
|
||||||
if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS {
|
if tx.capacity() <= C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS {
|
||||||
|
|
@ -661,10 +658,7 @@ async fn enqueue_c2me_command(
|
||||||
let reserve_result = match send_timeout {
|
let reserve_result = match send_timeout {
|
||||||
Some(send_timeout) => match timeout(send_timeout, tx.reserve()).await {
|
Some(send_timeout) => match timeout(send_timeout, tx.reserve()).await {
|
||||||
Ok(result) => result,
|
Ok(result) => result,
|
||||||
Err(_) => {
|
Err(_) => return Err(mpsc::error::SendError(cmd)),
|
||||||
stats.increment_me_c2me_send_timeout_total();
|
|
||||||
return Err(mpsc::error::SendError(cmd));
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
None => tx.reserve().await,
|
None => tx.reserve().await,
|
||||||
};
|
};
|
||||||
|
|
@ -673,10 +667,7 @@ async fn enqueue_c2me_command(
|
||||||
permit.send(cmd);
|
permit.send(cmd);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => Err(mpsc::error::SendError(cmd)),
|
||||||
stats.increment_me_c2me_send_timeout_total();
|
|
||||||
Err(mpsc::error::SendError(cmd))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -850,23 +841,11 @@ where
|
||||||
let me_writer = tokio::spawn(async move {
|
let me_writer = tokio::spawn(async move {
|
||||||
let mut writer = crypto_writer;
|
let mut writer = crypto_writer;
|
||||||
let mut frame_buf = Vec::with_capacity(16 * 1024);
|
let mut frame_buf = Vec::with_capacity(16 * 1024);
|
||||||
let shrink_threshold = d2c_flush_policy.frame_buf_shrink_threshold_bytes;
|
|
||||||
|
|
||||||
fn shrink_session_vec(buf: &mut Vec<u8>, threshold: usize) {
|
|
||||||
if buf.capacity() > threshold {
|
|
||||||
buf.clear();
|
|
||||||
buf.shrink_to(threshold);
|
|
||||||
} else {
|
|
||||||
buf.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
msg = me_rx_task.recv() => {
|
msg = me_rx_task.recv() => {
|
||||||
let Some(first) = msg else {
|
let Some(first) = msg else {
|
||||||
debug!(conn_id, "ME channel closed");
|
debug!(conn_id, "ME channel closed");
|
||||||
shrink_session_vec(&mut frame_buf, shrink_threshold);
|
|
||||||
return Err(ProxyError::Proxy("ME connection lost".into()));
|
return Err(ProxyError::Proxy("ME connection lost".into()));
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -922,7 +901,6 @@ where
|
||||||
batch_bytes,
|
batch_bytes,
|
||||||
flush_duration_us,
|
flush_duration_us,
|
||||||
);
|
);
|
||||||
shrink_session_vec(&mut frame_buf, shrink_threshold);
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -984,7 +962,6 @@ where
|
||||||
batch_bytes,
|
batch_bytes,
|
||||||
flush_duration_us,
|
flush_duration_us,
|
||||||
);
|
);
|
||||||
shrink_session_vec(&mut frame_buf, shrink_threshold);
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1050,7 +1027,6 @@ where
|
||||||
batch_bytes,
|
batch_bytes,
|
||||||
flush_duration_us,
|
flush_duration_us,
|
||||||
);
|
);
|
||||||
shrink_session_vec(&mut frame_buf, shrink_threshold);
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1115,7 +1091,6 @@ where
|
||||||
batch_bytes,
|
batch_bytes,
|
||||||
flush_duration_us,
|
flush_duration_us,
|
||||||
);
|
);
|
||||||
shrink_session_vec(&mut frame_buf, shrink_threshold);
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1123,7 +1098,6 @@ where
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
debug!(conn_id, "ME channel closed");
|
debug!(conn_id, "ME channel closed");
|
||||||
shrink_session_vec(&mut frame_buf, shrink_threshold);
|
|
||||||
return Err(ProxyError::Proxy("ME connection lost".into()));
|
return Err(ProxyError::Proxy("ME connection lost".into()));
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
|
|
@ -1173,7 +1147,6 @@ where
|
||||||
}
|
}
|
||||||
_ = &mut stop_rx => {
|
_ = &mut stop_rx => {
|
||||||
debug!(conn_id, "ME writer stop signal");
|
debug!(conn_id, "ME writer stop signal");
|
||||||
shrink_session_vec(&mut frame_buf, shrink_threshold);
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1199,13 +1172,7 @@ where
|
||||||
user = %user,
|
user = %user,
|
||||||
"Middle-relay pressure eviction for idle-candidate session"
|
"Middle-relay pressure eviction for idle-candidate session"
|
||||||
);
|
);
|
||||||
let _ = enqueue_c2me_command(
|
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await;
|
||||||
&c2me_tx,
|
|
||||||
C2MeCommand::Close,
|
|
||||||
c2me_send_timeout,
|
|
||||||
stats.as_ref(),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
main_result = Err(ProxyError::Proxy(
|
main_result = Err(ProxyError::Proxy(
|
||||||
"middle-relay session evicted under pressure (idle-candidate)".to_string(),
|
"middle-relay session evicted under pressure (idle-candidate)".to_string(),
|
||||||
));
|
));
|
||||||
|
|
@ -1224,13 +1191,7 @@ where
|
||||||
"Cutover affected middle session, closing client connection"
|
"Cutover affected middle session, closing client connection"
|
||||||
);
|
);
|
||||||
tokio::time::sleep(delay).await;
|
tokio::time::sleep(delay).await;
|
||||||
let _ = enqueue_c2me_command(
|
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout).await;
|
||||||
&c2me_tx,
|
|
||||||
C2MeCommand::Close,
|
|
||||||
c2me_send_timeout,
|
|
||||||
stats.as_ref(),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
|
main_result = Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
@ -1292,7 +1253,6 @@ where
|
||||||
&c2me_tx,
|
&c2me_tx,
|
||||||
C2MeCommand::Data { payload, flags },
|
C2MeCommand::Data { payload, flags },
|
||||||
c2me_send_timeout,
|
c2me_send_timeout,
|
||||||
stats.as_ref(),
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.is_err()
|
.is_err()
|
||||||
|
|
@ -1304,12 +1264,8 @@ where
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
debug!(conn_id, "Client EOF");
|
debug!(conn_id, "Client EOF");
|
||||||
client_closed = true;
|
client_closed = true;
|
||||||
let _ = enqueue_c2me_command(
|
let _ =
|
||||||
&c2me_tx,
|
enqueue_c2me_command(&c2me_tx, C2MeCommand::Close, c2me_send_timeout)
|
||||||
C2MeCommand::Close,
|
|
||||||
c2me_send_timeout,
|
|
||||||
stats.as_ref(),
|
|
||||||
)
|
|
||||||
.await;
|
.await;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
@ -1361,13 +1317,6 @@ where
|
||||||
);
|
);
|
||||||
clear_relay_idle_candidate(conn_id);
|
clear_relay_idle_candidate(conn_id);
|
||||||
me_pool.registry().unregister(conn_id).await;
|
me_pool.registry().unregister(conn_id).await;
|
||||||
buffer_pool.trim_to(buffer_pool.max_buffers().min(64));
|
|
||||||
let pool_snapshot = buffer_pool.stats();
|
|
||||||
stats.set_buffer_pool_gauges(
|
|
||||||
pool_snapshot.pooled,
|
|
||||||
pool_snapshot.allocated,
|
|
||||||
pool_snapshot.allocated.saturating_sub(pool_snapshot.pooled),
|
|
||||||
);
|
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,4 @@
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::stats::Stats;
|
|
||||||
use crate::stream::BufferPool;
|
use crate::stream::BufferPool;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
@ -120,7 +119,6 @@ async fn c2me_channel_full_path_yields_then_sends() {
|
||||||
.expect("priming queue with one frame must succeed");
|
.expect("priming queue with one frame must succeed");
|
||||||
|
|
||||||
let tx2 = tx.clone();
|
let tx2 = tx.clone();
|
||||||
let stats = Stats::default();
|
|
||||||
let producer = tokio::spawn(async move {
|
let producer = tokio::spawn(async move {
|
||||||
enqueue_c2me_command(
|
enqueue_c2me_command(
|
||||||
&tx2,
|
&tx2,
|
||||||
|
|
@ -129,7 +127,6 @@ async fn c2me_channel_full_path_yields_then_sends() {
|
||||||
flags: 2,
|
flags: 2,
|
||||||
},
|
},
|
||||||
None,
|
None,
|
||||||
&stats,
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -200,14 +200,6 @@ pub struct Stats {
|
||||||
me_d2c_flush_duration_us_bucket_1001_5000: AtomicU64,
|
me_d2c_flush_duration_us_bucket_1001_5000: AtomicU64,
|
||||||
me_d2c_flush_duration_us_bucket_5001_20000: AtomicU64,
|
me_d2c_flush_duration_us_bucket_5001_20000: AtomicU64,
|
||||||
me_d2c_flush_duration_us_bucket_gt_20000: AtomicU64,
|
me_d2c_flush_duration_us_bucket_gt_20000: AtomicU64,
|
||||||
// Buffer pool gauges
|
|
||||||
buffer_pool_pooled_gauge: AtomicU64,
|
|
||||||
buffer_pool_allocated_gauge: AtomicU64,
|
|
||||||
buffer_pool_in_use_gauge: AtomicU64,
|
|
||||||
// C2ME enqueue observability
|
|
||||||
me_c2me_send_full_total: AtomicU64,
|
|
||||||
me_c2me_send_high_water_total: AtomicU64,
|
|
||||||
me_c2me_send_timeout_total: AtomicU64,
|
|
||||||
me_d2c_batch_timeout_armed_total: AtomicU64,
|
me_d2c_batch_timeout_armed_total: AtomicU64,
|
||||||
me_d2c_batch_timeout_fired_total: AtomicU64,
|
me_d2c_batch_timeout_fired_total: AtomicU64,
|
||||||
me_writer_pick_sorted_rr_success_try_total: AtomicU64,
|
me_writer_pick_sorted_rr_success_try_total: AtomicU64,
|
||||||
|
|
@ -1422,37 +1414,6 @@ impl Stats {
|
||||||
.store(value, Ordering::Relaxed);
|
.store(value, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_buffer_pool_gauges(&self, pooled: usize, allocated: usize, in_use: usize) {
|
|
||||||
if self.telemetry_me_allows_normal() {
|
|
||||||
self.buffer_pool_pooled_gauge
|
|
||||||
.store(pooled as u64, Ordering::Relaxed);
|
|
||||||
self.buffer_pool_allocated_gauge
|
|
||||||
.store(allocated as u64, Ordering::Relaxed);
|
|
||||||
self.buffer_pool_in_use_gauge
|
|
||||||
.store(in_use as u64, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn increment_me_c2me_send_full_total(&self) {
|
|
||||||
if self.telemetry_me_allows_normal() {
|
|
||||||
self.me_c2me_send_full_total.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn increment_me_c2me_send_high_water_total(&self) {
|
|
||||||
if self.telemetry_me_allows_normal() {
|
|
||||||
self.me_c2me_send_high_water_total
|
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn increment_me_c2me_send_timeout_total(&self) {
|
|
||||||
if self.telemetry_me_allows_normal() {
|
|
||||||
self.me_c2me_send_timeout_total
|
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub fn increment_me_floor_cap_block_total(&self) {
|
pub fn increment_me_floor_cap_block_total(&self) {
|
||||||
if self.telemetry_me_allows_normal() {
|
if self.telemetry_me_allows_normal() {
|
||||||
self.me_floor_cap_block_total
|
self.me_floor_cap_block_total
|
||||||
|
|
@ -1819,30 +1780,6 @@ impl Stats {
|
||||||
self.me_d2c_flush_duration_us_bucket_gt_20000
|
self.me_d2c_flush_duration_us_bucket_gt_20000
|
||||||
.load(Ordering::Relaxed)
|
.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_buffer_pool_pooled_gauge(&self) -> u64 {
|
|
||||||
self.buffer_pool_pooled_gauge.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_buffer_pool_allocated_gauge(&self) -> u64 {
|
|
||||||
self.buffer_pool_allocated_gauge.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_buffer_pool_in_use_gauge(&self) -> u64 {
|
|
||||||
self.buffer_pool_in_use_gauge.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_me_c2me_send_full_total(&self) -> u64 {
|
|
||||||
self.me_c2me_send_full_total.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_me_c2me_send_high_water_total(&self) -> u64 {
|
|
||||||
self.me_c2me_send_high_water_total.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_me_c2me_send_timeout_total(&self) -> u64 {
|
|
||||||
self.me_c2me_send_timeout_total.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
pub fn get_me_d2c_batch_timeout_armed_total(&self) -> u64 {
|
pub fn get_me_d2c_batch_timeout_armed_total(&self) -> u64 {
|
||||||
self.me_d2c_batch_timeout_armed_total
|
self.me_d2c_batch_timeout_armed_total
|
||||||
.load(Ordering::Relaxed)
|
.load(Ordering::Relaxed)
|
||||||
|
|
@ -2234,8 +2171,6 @@ impl ReplayShard {
|
||||||
|
|
||||||
fn cleanup(&mut self, now: Instant, window: Duration) {
|
fn cleanup(&mut self, now: Instant, window: Duration) {
|
||||||
if window.is_zero() {
|
if window.is_zero() {
|
||||||
self.cache.clear();
|
|
||||||
self.queue.clear();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let cutoff = now.checked_sub(window).unwrap_or(now);
|
let cutoff = now.checked_sub(window).unwrap_or(now);
|
||||||
|
|
@ -2257,22 +2192,13 @@ impl ReplayShard {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn check(&mut self, key: &[u8], now: Instant, window: Duration) -> bool {
|
fn check(&mut self, key: &[u8], now: Instant, window: Duration) -> bool {
|
||||||
if window.is_zero() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
self.cleanup(now, window);
|
self.cleanup(now, window);
|
||||||
// key is &[u8], resolves Q=[u8] via Box<[u8]>: Borrow<[u8]>
|
// key is &[u8], resolves Q=[u8] via Box<[u8]>: Borrow<[u8]>
|
||||||
self.cache.get(key).is_some()
|
self.cache.get(key).is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add(&mut self, key: &[u8], now: Instant, window: Duration) {
|
fn add(&mut self, key: &[u8], now: Instant, window: Duration) {
|
||||||
if window.is_zero() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
self.cleanup(now, window);
|
self.cleanup(now, window);
|
||||||
if self.cache.peek(key).is_some() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let seq = self.next_seq();
|
let seq = self.next_seq();
|
||||||
let boxed_key: Box<[u8]> = key.into();
|
let boxed_key: Box<[u8]> = key.into();
|
||||||
|
|
@ -2415,7 +2341,7 @@ impl ReplayChecker {
|
||||||
let interval = if self.window.as_secs() > 60 {
|
let interval = if self.window.as_secs() > 60 {
|
||||||
Duration::from_secs(30)
|
Duration::from_secs(30)
|
||||||
} else {
|
} else {
|
||||||
Duration::from_secs((self.window.as_secs().max(1) / 2).max(1))
|
Duration::from_secs(self.window.as_secs().max(1) / 2)
|
||||||
};
|
};
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
|
@ -2627,20 +2553,6 @@ mod tests {
|
||||||
assert!(!checker.check_handshake(b"expire"));
|
assert!(!checker.check_handshake(b"expire"));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_replay_checker_zero_window_does_not_retain_entries() {
|
|
||||||
let checker = ReplayChecker::new(100, Duration::ZERO);
|
|
||||||
|
|
||||||
for _ in 0..1_000 {
|
|
||||||
assert!(!checker.check_handshake(b"no-retain"));
|
|
||||||
checker.add_handshake(b"no-retain");
|
|
||||||
}
|
|
||||||
|
|
||||||
let stats = checker.stats();
|
|
||||||
assert_eq!(stats.total_entries, 0);
|
|
||||||
assert_eq!(stats.total_queue_len, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_replay_checker_stats() {
|
fn test_replay_checker_stats() {
|
||||||
let checker = ReplayChecker::new(100, Duration::from_secs(60));
|
let checker = ReplayChecker::new(100, Duration::from_secs(60));
|
||||||
|
|
|
||||||
|
|
@ -35,10 +35,6 @@ pub struct BufferPool {
|
||||||
misses: AtomicUsize,
|
misses: AtomicUsize,
|
||||||
/// Number of successful reuses
|
/// Number of successful reuses
|
||||||
hits: AtomicUsize,
|
hits: AtomicUsize,
|
||||||
/// Number of non-standard buffers replaced with a fresh default-sized buffer
|
|
||||||
replaced_nonstandard: AtomicUsize,
|
|
||||||
/// Number of buffers dropped because the pool queue was full
|
|
||||||
dropped_pool_full: AtomicUsize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BufferPool {
|
impl BufferPool {
|
||||||
|
|
@ -56,8 +52,6 @@ impl BufferPool {
|
||||||
allocated: AtomicUsize::new(0),
|
allocated: AtomicUsize::new(0),
|
||||||
misses: AtomicUsize::new(0),
|
misses: AtomicUsize::new(0),
|
||||||
hits: AtomicUsize::new(0),
|
hits: AtomicUsize::new(0),
|
||||||
replaced_nonstandard: AtomicUsize::new(0),
|
|
||||||
dropped_pool_full: AtomicUsize::new(0),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -97,36 +91,17 @@ impl BufferPool {
|
||||||
|
|
||||||
/// Return a buffer to the pool
|
/// Return a buffer to the pool
|
||||||
fn return_buffer(&self, mut buffer: BytesMut) {
|
fn return_buffer(&self, mut buffer: BytesMut) {
|
||||||
const MAX_RETAINED_BUFFER_FACTOR: usize = 2;
|
// Clear the buffer but keep capacity
|
||||||
|
|
||||||
// Clear the buffer but keep capacity.
|
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
let max_retained_capacity = self
|
|
||||||
.buffer_size
|
|
||||||
.saturating_mul(MAX_RETAINED_BUFFER_FACTOR)
|
|
||||||
.max(self.buffer_size);
|
|
||||||
|
|
||||||
// Keep only near-default capacities in the pool. Oversized buffers keep
|
// Only return if we haven't exceeded max and buffer is right size
|
||||||
// RSS elevated for hours under churn; replace them with default-sized
|
if buffer.capacity() >= self.buffer_size {
|
||||||
// buffers before re-pooling.
|
// Try to push to pool, if full just drop
|
||||||
if buffer.capacity() < self.buffer_size || buffer.capacity() > max_retained_capacity {
|
let _ = self.buffers.push(buffer);
|
||||||
self.replaced_nonstandard.fetch_add(1, Ordering::Relaxed);
|
|
||||||
buffer = BytesMut::with_capacity(self.buffer_size);
|
|
||||||
}
|
}
|
||||||
|
// If buffer was dropped (pool full), decrement allocated
|
||||||
// Try to return into the queue; if full, drop and update accounting.
|
// Actually we don't decrement here because the buffer might have been
|
||||||
if self.buffers.push(buffer).is_err() {
|
// grown beyond our size - we just let it go
|
||||||
self.dropped_pool_full.fetch_add(1, Ordering::Relaxed);
|
|
||||||
self.decrement_allocated();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn decrement_allocated(&self) {
|
|
||||||
let _ = self
|
|
||||||
.allocated
|
|
||||||
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
|
|
||||||
Some(current.saturating_sub(1))
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get pool statistics
|
/// Get pool statistics
|
||||||
|
|
@ -138,8 +113,6 @@ impl BufferPool {
|
||||||
buffer_size: self.buffer_size,
|
buffer_size: self.buffer_size,
|
||||||
hits: self.hits.load(Ordering::Relaxed),
|
hits: self.hits.load(Ordering::Relaxed),
|
||||||
misses: self.misses.load(Ordering::Relaxed),
|
misses: self.misses.load(Ordering::Relaxed),
|
||||||
replaced_nonstandard: self.replaced_nonstandard.load(Ordering::Relaxed),
|
|
||||||
dropped_pool_full: self.dropped_pool_full.load(Ordering::Relaxed),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -148,41 +121,6 @@ impl BufferPool {
|
||||||
self.buffer_size
|
self.buffer_size
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Maximum number of buffers the pool will retain.
|
|
||||||
pub fn max_buffers(&self) -> usize {
|
|
||||||
self.max_buffers
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Current number of pooled buffers.
|
|
||||||
pub fn pooled(&self) -> usize {
|
|
||||||
self.buffers.len()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Total buffers allocated (pooled + checked out).
|
|
||||||
pub fn allocated(&self) -> usize {
|
|
||||||
self.allocated.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Best-effort number of buffers currently checked out.
|
|
||||||
pub fn in_use(&self) -> usize {
|
|
||||||
self.allocated().saturating_sub(self.pooled())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Trim pooled buffers down to a target count.
|
|
||||||
pub fn trim_to(&self, target_pooled: usize) {
|
|
||||||
let target = target_pooled.min(self.max_buffers);
|
|
||||||
loop {
|
|
||||||
if self.buffers.len() <= target {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if self.buffers.pop().is_some() {
|
|
||||||
self.decrement_allocated();
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Preallocate buffers to fill the pool
|
/// Preallocate buffers to fill the pool
|
||||||
pub fn preallocate(&self, count: usize) {
|
pub fn preallocate(&self, count: usize) {
|
||||||
let to_alloc = count.min(self.max_buffers);
|
let to_alloc = count.min(self.max_buffers);
|
||||||
|
|
@ -222,10 +160,6 @@ pub struct PoolStats {
|
||||||
pub hits: usize,
|
pub hits: usize,
|
||||||
/// Number of cache misses (new allocation)
|
/// Number of cache misses (new allocation)
|
||||||
pub misses: usize,
|
pub misses: usize,
|
||||||
/// Number of non-standard buffers replaced during return
|
|
||||||
pub replaced_nonstandard: usize,
|
|
||||||
/// Number of buffers dropped because the pool queue was full
|
|
||||||
pub dropped_pool_full: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PoolStats {
|
impl PoolStats {
|
||||||
|
|
@ -251,7 +185,6 @@ pub struct PooledBuffer {
|
||||||
impl PooledBuffer {
|
impl PooledBuffer {
|
||||||
/// Take the inner buffer, preventing return to pool
|
/// Take the inner buffer, preventing return to pool
|
||||||
pub fn take(mut self) -> BytesMut {
|
pub fn take(mut self) -> BytesMut {
|
||||||
self.pool.decrement_allocated();
|
|
||||||
self.buffer.take().unwrap()
|
self.buffer.take().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -431,25 +364,6 @@ mod tests {
|
||||||
|
|
||||||
let stats = pool.stats();
|
let stats = pool.stats();
|
||||||
assert_eq!(stats.pooled, 0);
|
assert_eq!(stats.pooled, 0);
|
||||||
assert_eq!(stats.allocated, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_pool_replaces_oversized_buffers() {
|
|
||||||
let pool = Arc::new(BufferPool::with_config(1024, 10));
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut buf = pool.get();
|
|
||||||
buf.reserve(8192);
|
|
||||||
assert!(buf.capacity() > 2048);
|
|
||||||
}
|
|
||||||
|
|
||||||
let stats = pool.stats();
|
|
||||||
assert_eq!(stats.replaced_nonstandard, 1);
|
|
||||||
assert_eq!(stats.pooled, 1);
|
|
||||||
|
|
||||||
let buf = pool.get();
|
|
||||||
assert!(buf.capacity() <= 2048);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue