mirror of https://github.com/telemt/telemt.git
Cleanup-path as non-blocking
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
parent
6f9aef7bb4
commit
3279f6d46a
|
|
@ -612,6 +612,11 @@ impl ProxyConfig {
|
||||||
"general.me_route_backpressure_base_timeout_ms must be > 0".to_string(),
|
"general.me_route_backpressure_base_timeout_ms must be > 0".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
if config.general.me_route_backpressure_base_timeout_ms > 5000 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_route_backpressure_base_timeout_ms must be within [1, 5000]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if config.general.me_route_backpressure_high_timeout_ms
|
if config.general.me_route_backpressure_high_timeout_ms
|
||||||
< config.general.me_route_backpressure_base_timeout_ms
|
< config.general.me_route_backpressure_base_timeout_ms
|
||||||
|
|
@ -620,6 +625,11 @@ impl ProxyConfig {
|
||||||
"general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(),
|
"general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
if config.general.me_route_backpressure_high_timeout_ms > 5000 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_route_backpressure_high_timeout_ms must be within [1, 5000]".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) {
|
if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) {
|
||||||
return Err(ProxyError::Config(
|
return Err(ProxyError::Config(
|
||||||
|
|
@ -1624,6 +1634,47 @@ mod tests {
|
||||||
let _ = std::fs::remove_file(path_valid);
|
let _ = std::fs::remove_file(path_valid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn me_route_backpressure_base_timeout_ms_out_of_range_is_rejected() {
|
||||||
|
let toml = r#"
|
||||||
|
[general]
|
||||||
|
me_route_backpressure_base_timeout_ms = 5001
|
||||||
|
|
||||||
|
[censorship]
|
||||||
|
tls_domain = "example.com"
|
||||||
|
|
||||||
|
[access.users]
|
||||||
|
user = "00000000000000000000000000000000"
|
||||||
|
"#;
|
||||||
|
let dir = std::env::temp_dir();
|
||||||
|
let path = dir.join("telemt_me_route_backpressure_base_timeout_ms_out_of_range_test.toml");
|
||||||
|
std::fs::write(&path, toml).unwrap();
|
||||||
|
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||||
|
assert!(err.contains("general.me_route_backpressure_base_timeout_ms must be within [1, 5000]"));
|
||||||
|
let _ = std::fs::remove_file(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn me_route_backpressure_high_timeout_ms_out_of_range_is_rejected() {
|
||||||
|
let toml = r#"
|
||||||
|
[general]
|
||||||
|
me_route_backpressure_base_timeout_ms = 100
|
||||||
|
me_route_backpressure_high_timeout_ms = 5001
|
||||||
|
|
||||||
|
[censorship]
|
||||||
|
tls_domain = "example.com"
|
||||||
|
|
||||||
|
[access.users]
|
||||||
|
user = "00000000000000000000000000000000"
|
||||||
|
"#;
|
||||||
|
let dir = std::env::temp_dir();
|
||||||
|
let path = dir.join("telemt_me_route_backpressure_high_timeout_ms_out_of_range_test.toml");
|
||||||
|
std::fs::write(&path, toml).unwrap();
|
||||||
|
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||||
|
assert!(err.contains("general.me_route_backpressure_high_timeout_ms must be within [1, 5000]"));
|
||||||
|
let _ = std::fs::remove_file(path);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn me_route_no_writer_wait_ms_out_of_range_is_rejected() {
|
fn me_route_no_writer_wait_ms_out_of_range_is_rejected() {
|
||||||
let toml = r#"
|
let toml = r#"
|
||||||
|
|
|
||||||
|
|
@ -492,11 +492,9 @@ impl MePool {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
|
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
|
||||||
let conns = self.remove_writer_only(writer_id).await;
|
// Full client cleanup now happens inside `registry.writer_lost` to keep
|
||||||
for bound in conns {
|
// writer reap/remove paths strictly non-blocking per connection.
|
||||||
let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await;
|
let _ = self.remove_writer_only(writer_id).await;
|
||||||
let _ = self.registry.unregister(bound.conn_id).await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> {
|
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> {
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ use bytes::{Bytes, BytesMut};
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio::sync::{Mutex, mpsc};
|
use tokio::sync::{Mutex, mpsc};
|
||||||
|
use tokio::sync::mpsc::error::TrySendError;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, trace, warn};
|
use tracing::{debug, trace, warn};
|
||||||
|
|
||||||
|
|
@ -173,12 +174,12 @@ pub(crate) async fn reader_loop(
|
||||||
} else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 {
|
} else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 {
|
||||||
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
|
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
|
||||||
debug!(cid, "RPC_CLOSE_EXT from ME");
|
debug!(cid, "RPC_CLOSE_EXT from ME");
|
||||||
reg.route(cid, MeResponse::Close).await;
|
let _ = reg.route_nowait(cid, MeResponse::Close).await;
|
||||||
reg.unregister(cid).await;
|
reg.unregister(cid).await;
|
||||||
} else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 {
|
} else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 {
|
||||||
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
|
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
|
||||||
debug!(cid, "RPC_CLOSE_CONN from ME");
|
debug!(cid, "RPC_CLOSE_CONN from ME");
|
||||||
reg.route(cid, MeResponse::Close).await;
|
let _ = reg.route_nowait(cid, MeResponse::Close).await;
|
||||||
reg.unregister(cid).await;
|
reg.unregister(cid).await;
|
||||||
} else if pt == RPC_PING_U32 && body.len() >= 8 {
|
} else if pt == RPC_PING_U32 && body.len() >= 8 {
|
||||||
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
|
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
|
||||||
|
|
@ -186,13 +187,15 @@ pub(crate) async fn reader_loop(
|
||||||
let mut pong = Vec::with_capacity(12);
|
let mut pong = Vec::with_capacity(12);
|
||||||
pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes());
|
pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes());
|
||||||
pong.extend_from_slice(&ping_id.to_le_bytes());
|
pong.extend_from_slice(&ping_id.to_le_bytes());
|
||||||
if tx
|
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(pong))) {
|
||||||
.send(WriterCommand::DataAndFlush(Bytes::from(pong)))
|
Ok(()) => {}
|
||||||
.await
|
Err(TrySendError::Full(_)) => {
|
||||||
.is_err()
|
debug!(ping_id, "PONG dropped: writer command channel is full");
|
||||||
{
|
}
|
||||||
warn!("PONG send failed");
|
Err(TrySendError::Closed(_)) => {
|
||||||
break;
|
warn!("PONG send failed: writer channel closed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if pt == RPC_PONG_U32 && body.len() >= 8 {
|
} else if pt == RPC_PONG_U32 && body.len() >= 8 {
|
||||||
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
|
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
|
||||||
|
|
@ -232,6 +235,13 @@ async fn send_close_conn(tx: &mpsc::Sender<WriterCommand>, conn_id: u64) {
|
||||||
let mut p = Vec::with_capacity(12);
|
let mut p = Vec::with_capacity(12);
|
||||||
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
|
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
|
||||||
p.extend_from_slice(&conn_id.to_le_bytes());
|
p.extend_from_slice(&conn_id.to_le_bytes());
|
||||||
|
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
|
||||||
let _ = tx.send(WriterCommand::DataAndFlush(Bytes::from(p))).await;
|
Ok(()) => {}
|
||||||
|
Err(TrySendError::Full(_)) => {
|
||||||
|
debug!(conn_id, "ME close_conn signal skipped: writer command channel is full");
|
||||||
|
}
|
||||||
|
Err(TrySendError::Closed(_)) => {
|
||||||
|
debug!(conn_id, "ME close_conn signal skipped: writer command channel is closed");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -169,6 +169,7 @@ impl ConnRegistry {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult {
|
pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult {
|
||||||
let tx = {
|
let tx = {
|
||||||
let inner = self.inner.read().await;
|
let inner = self.inner.read().await;
|
||||||
|
|
@ -445,30 +446,38 @@ impl ConnRegistry {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
|
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
|
||||||
let mut inner = self.inner.write().await;
|
let mut close_txs = Vec::<mpsc::Sender<MeResponse>>::new();
|
||||||
inner.writers.remove(&writer_id);
|
|
||||||
inner.last_meta_for_writer.remove(&writer_id);
|
|
||||||
inner.writer_idle_since_epoch_secs.remove(&writer_id);
|
|
||||||
let conns = inner
|
|
||||||
.conns_for_writer
|
|
||||||
.remove(&writer_id)
|
|
||||||
.unwrap_or_default()
|
|
||||||
.into_iter()
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let mut out = Vec::new();
|
let mut out = Vec::new();
|
||||||
for conn_id in conns {
|
{
|
||||||
if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) {
|
let mut inner = self.inner.write().await;
|
||||||
continue;
|
inner.writers.remove(&writer_id);
|
||||||
}
|
inner.last_meta_for_writer.remove(&writer_id);
|
||||||
inner.writer_for_conn.remove(&conn_id);
|
inner.writer_idle_since_epoch_secs.remove(&writer_id);
|
||||||
if let Some(m) = inner.meta.get(&conn_id) {
|
let conns = inner
|
||||||
out.push(BoundConn {
|
.conns_for_writer
|
||||||
conn_id,
|
.remove(&writer_id)
|
||||||
meta: m.clone(),
|
.unwrap_or_default()
|
||||||
});
|
.into_iter()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
for conn_id in conns {
|
||||||
|
if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
inner.writer_for_conn.remove(&conn_id);
|
||||||
|
if let Some(client_tx) = inner.map.remove(&conn_id) {
|
||||||
|
close_txs.push(client_tx);
|
||||||
|
}
|
||||||
|
if let Some(meta) = inner.meta.remove(&conn_id) {
|
||||||
|
out.push(BoundConn { conn_id, meta });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for client_tx in close_txs {
|
||||||
|
let _ = client_tx.try_send(MeResponse::Close);
|
||||||
|
}
|
||||||
|
|
||||||
out
|
out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -491,6 +500,7 @@ impl ConnRegistry {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use super::ConnMeta;
|
use super::ConnMeta;
|
||||||
use super::ConnRegistry;
|
use super::ConnRegistry;
|
||||||
|
|
@ -663,6 +673,39 @@ mod tests {
|
||||||
assert!(registry.is_writer_empty(20).await);
|
assert!(registry.is_writer_empty(20).await);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn writer_lost_removes_bound_conn_from_registry_and_signals_close() {
|
||||||
|
let registry = ConnRegistry::new();
|
||||||
|
let (conn_id, mut rx) = registry.register().await;
|
||||||
|
let (writer_tx, _writer_rx) = tokio::sync::mpsc::channel(8);
|
||||||
|
registry.register_writer(10, writer_tx).await;
|
||||||
|
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
registry
|
||||||
|
.bind_writer(
|
||||||
|
conn_id,
|
||||||
|
10,
|
||||||
|
ConnMeta {
|
||||||
|
target_dc: 2,
|
||||||
|
client_addr: addr,
|
||||||
|
our_addr: addr,
|
||||||
|
proto_flags: 0,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
);
|
||||||
|
|
||||||
|
let lost = registry.writer_lost(10).await;
|
||||||
|
assert_eq!(lost.len(), 1);
|
||||||
|
assert_eq!(lost[0].conn_id, conn_id);
|
||||||
|
assert!(registry.get_writer(conn_id).await.is_none());
|
||||||
|
assert!(registry.get_meta(conn_id).await.is_none());
|
||||||
|
assert_eq!(registry.unregister(conn_id).await, None);
|
||||||
|
let close = tokio::time::timeout(Duration::from_millis(50), rx.recv()).await;
|
||||||
|
assert!(matches!(close, Ok(Some(MeResponse::Close))));
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn bind_writer_rejects_unregistered_writer() {
|
async fn bind_writer_rejects_unregistered_writer() {
|
||||||
let registry = ConnRegistry::new();
|
let registry = ConnRegistry::new();
|
||||||
|
|
|
||||||
|
|
@ -643,13 +643,19 @@ impl MePool {
|
||||||
let mut p = Vec::with_capacity(12);
|
let mut p = Vec::with_capacity(12);
|
||||||
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
|
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
|
||||||
p.extend_from_slice(&conn_id.to_le_bytes());
|
p.extend_from_slice(&conn_id.to_le_bytes());
|
||||||
if w.tx
|
match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
|
||||||
.send(WriterCommand::DataAndFlush(Bytes::from(p)))
|
Ok(()) => {}
|
||||||
.await
|
Err(TrySendError::Full(_)) => {
|
||||||
.is_err()
|
debug!(
|
||||||
{
|
conn_id,
|
||||||
debug!("ME close write failed");
|
writer_id = w.writer_id,
|
||||||
self.remove_writer_and_close_clients(w.writer_id).await;
|
"ME close skipped: writer command channel is full"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(TrySendError::Closed(_)) => {
|
||||||
|
debug!("ME close write failed");
|
||||||
|
self.remove_writer_and_close_clients(w.writer_id).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
debug!(conn_id, "ME close skipped (writer missing)");
|
debug!(conn_id, "ME close skipped (writer missing)");
|
||||||
|
|
@ -666,8 +672,12 @@ impl MePool {
|
||||||
p.extend_from_slice(&conn_id.to_le_bytes());
|
p.extend_from_slice(&conn_id.to_le_bytes());
|
||||||
match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
|
match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
Err(TrySendError::Full(cmd)) => {
|
Err(TrySendError::Full(_)) => {
|
||||||
let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await;
|
debug!(
|
||||||
|
conn_id,
|
||||||
|
writer_id = w.writer_id,
|
||||||
|
"ME close_conn skipped: writer command channel is full"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
Err(TrySendError::Closed(_)) => {
|
Err(TrySendError::Closed(_)) => {
|
||||||
debug!(conn_id, "ME close_conn skipped: writer channel closed");
|
debug!(conn_id, "ME close_conn skipped: writer channel closed");
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue