mirror of https://github.com/telemt/telemt.git
Merge pull request #209 from telemt/flow
ME Pool + ME Hotpath + ME Buffers tuning
This commit is contained in:
commit
869d1429ac
|
|
@ -49,19 +49,32 @@ impl SecureRandom {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generate random bytes
|
/// Fill a caller-provided buffer with random bytes.
|
||||||
pub fn bytes(&self, len: usize) -> Vec<u8> {
|
pub fn fill(&self, out: &mut [u8]) {
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
const CHUNK_SIZE: usize = 512;
|
const CHUNK_SIZE: usize = 512;
|
||||||
|
|
||||||
while inner.buffer.len() < len {
|
let mut written = 0usize;
|
||||||
let mut chunk = vec![0u8; CHUNK_SIZE];
|
while written < out.len() {
|
||||||
inner.rng.fill_bytes(&mut chunk);
|
if inner.buffer.is_empty() {
|
||||||
inner.cipher.apply(&mut chunk);
|
let mut chunk = vec![0u8; CHUNK_SIZE];
|
||||||
inner.buffer.extend_from_slice(&chunk);
|
inner.rng.fill_bytes(&mut chunk);
|
||||||
|
inner.cipher.apply(&mut chunk);
|
||||||
|
inner.buffer.extend_from_slice(&chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
let take = (out.len() - written).min(inner.buffer.len());
|
||||||
|
out[written..written + take].copy_from_slice(&inner.buffer[..take]);
|
||||||
|
inner.buffer.drain(..take);
|
||||||
|
written += take;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
inner.buffer.drain(..len).collect()
|
|
||||||
|
/// Generate random bytes
|
||||||
|
pub fn bytes(&self, len: usize) -> Vec<u8> {
|
||||||
|
let mut out = vec![0u8; len];
|
||||||
|
self.fill(&mut out);
|
||||||
|
out
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generate random number in range [0, max)
|
/// Generate random number in range [0, max)
|
||||||
|
|
|
||||||
20
src/main.rs
20
src/main.rs
|
|
@ -265,7 +265,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connection concurrency limit
|
// Connection concurrency limit
|
||||||
let _max_connections = Arc::new(Semaphore::new(10_000));
|
let max_connections = Arc::new(Semaphore::new(10_000));
|
||||||
|
|
||||||
if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me {
|
if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me {
|
||||||
warn!("No usable IP family for Middle Proxy detected; falling back to direct DC");
|
warn!("No usable IP family for Middle Proxy detected; falling back to direct DC");
|
||||||
|
|
@ -844,6 +844,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
||||||
let me_pool = me_pool.clone();
|
let me_pool = me_pool.clone();
|
||||||
let tls_cache = tls_cache.clone();
|
let tls_cache = tls_cache.clone();
|
||||||
let ip_tracker = ip_tracker.clone();
|
let ip_tracker = ip_tracker.clone();
|
||||||
|
let max_connections_unix = max_connections.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let unix_conn_counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1));
|
let unix_conn_counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1));
|
||||||
|
|
@ -851,6 +852,13 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
||||||
loop {
|
loop {
|
||||||
match unix_listener.accept().await {
|
match unix_listener.accept().await {
|
||||||
Ok((stream, _)) => {
|
Ok((stream, _)) => {
|
||||||
|
let permit = match max_connections_unix.clone().acquire_owned().await {
|
||||||
|
Ok(permit) => permit,
|
||||||
|
Err(_) => {
|
||||||
|
error!("Connection limiter is closed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
let conn_id = unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
let conn_id = unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
let fake_peer = SocketAddr::from(([127, 0, 0, 1], (conn_id % 65535) as u16));
|
let fake_peer = SocketAddr::from(([127, 0, 0, 1], (conn_id % 65535) as u16));
|
||||||
|
|
||||||
|
|
@ -866,6 +874,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
||||||
let proxy_protocol_enabled = config.server.proxy_protocol;
|
let proxy_protocol_enabled = config.server.proxy_protocol;
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
let _permit = permit;
|
||||||
if let Err(e) = crate::proxy::client::handle_client_stream(
|
if let Err(e) = crate::proxy::client::handle_client_stream(
|
||||||
stream, fake_peer, config, stats,
|
stream, fake_peer, config, stats,
|
||||||
upstream_manager, replay_checker, buffer_pool, rng,
|
upstream_manager, replay_checker, buffer_pool, rng,
|
||||||
|
|
@ -933,11 +942,19 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
||||||
let me_pool = me_pool.clone();
|
let me_pool = me_pool.clone();
|
||||||
let tls_cache = tls_cache.clone();
|
let tls_cache = tls_cache.clone();
|
||||||
let ip_tracker = ip_tracker.clone();
|
let ip_tracker = ip_tracker.clone();
|
||||||
|
let max_connections_tcp = max_connections.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
match listener.accept().await {
|
match listener.accept().await {
|
||||||
Ok((stream, peer_addr)) => {
|
Ok((stream, peer_addr)) => {
|
||||||
|
let permit = match max_connections_tcp.clone().acquire_owned().await {
|
||||||
|
Ok(permit) => permit,
|
||||||
|
Err(_) => {
|
||||||
|
error!("Connection limiter is closed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
let config = config_rx.borrow_and_update().clone();
|
let config = config_rx.borrow_and_update().clone();
|
||||||
let stats = stats.clone();
|
let stats = stats.clone();
|
||||||
let upstream_manager = upstream_manager.clone();
|
let upstream_manager = upstream_manager.clone();
|
||||||
|
|
@ -950,6 +967,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
||||||
let proxy_protocol_enabled = listener_proxy_protocol;
|
let proxy_protocol_enabled = listener_proxy_protocol;
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
let _permit = permit;
|
||||||
if let Err(e) = ClientHandler::new(
|
if let Err(e) = ClientHandler::new(
|
||||||
stream,
|
stream,
|
||||||
peer_addr,
|
peer_addr,
|
||||||
|
|
|
||||||
|
|
@ -95,6 +95,7 @@ where
|
||||||
let user_clone = user.clone();
|
let user_clone = user.clone();
|
||||||
let me_writer = tokio::spawn(async move {
|
let me_writer = tokio::spawn(async move {
|
||||||
let mut writer = crypto_writer;
|
let mut writer = crypto_writer;
|
||||||
|
let mut frame_buf = Vec::with_capacity(16 * 1024);
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
msg = me_rx_task.recv() => {
|
msg = me_rx_task.recv() => {
|
||||||
|
|
@ -102,7 +103,15 @@ where
|
||||||
Some(MeResponse::Data { flags, data }) => {
|
Some(MeResponse::Data { flags, data }) => {
|
||||||
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
|
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
|
||||||
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
|
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
|
||||||
write_client_payload(&mut writer, proto_tag, flags, &data, rng_clone.as_ref()).await?;
|
write_client_payload(
|
||||||
|
&mut writer,
|
||||||
|
proto_tag,
|
||||||
|
flags,
|
||||||
|
&data,
|
||||||
|
rng_clone.as_ref(),
|
||||||
|
&mut frame_buf,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
// Drain all immediately queued ME responses and flush once.
|
// Drain all immediately queued ME responses and flush once.
|
||||||
while let Ok(next) = me_rx_task.try_recv() {
|
while let Ok(next) = me_rx_task.try_recv() {
|
||||||
|
|
@ -116,6 +125,7 @@ where
|
||||||
flags,
|
flags,
|
||||||
&data,
|
&data,
|
||||||
rng_clone.as_ref(),
|
rng_clone.as_ref(),
|
||||||
|
&mut frame_buf,
|
||||||
).await?;
|
).await?;
|
||||||
}
|
}
|
||||||
MeResponse::Ack(confirm) => {
|
MeResponse::Ack(confirm) => {
|
||||||
|
|
@ -363,6 +373,7 @@ async fn write_client_payload<W>(
|
||||||
flags: u32,
|
flags: u32,
|
||||||
data: &[u8],
|
data: &[u8],
|
||||||
rng: &SecureRandom,
|
rng: &SecureRandom,
|
||||||
|
frame_buf: &mut Vec<u8>,
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
where
|
||||||
W: AsyncWrite + Unpin + Send + 'static,
|
W: AsyncWrite + Unpin + Send + 'static,
|
||||||
|
|
@ -384,7 +395,8 @@ where
|
||||||
if quickack {
|
if quickack {
|
||||||
first |= 0x80;
|
first |= 0x80;
|
||||||
}
|
}
|
||||||
let mut frame_buf = Vec::with_capacity(1 + data.len());
|
frame_buf.clear();
|
||||||
|
frame_buf.reserve(1 + data.len());
|
||||||
frame_buf.push(first);
|
frame_buf.push(first);
|
||||||
frame_buf.extend_from_slice(data);
|
frame_buf.extend_from_slice(data);
|
||||||
client_writer
|
client_writer
|
||||||
|
|
@ -397,7 +409,8 @@ where
|
||||||
first |= 0x80;
|
first |= 0x80;
|
||||||
}
|
}
|
||||||
let lw = (len_words as u32).to_le_bytes();
|
let lw = (len_words as u32).to_le_bytes();
|
||||||
let mut frame_buf = Vec::with_capacity(4 + data.len());
|
frame_buf.clear();
|
||||||
|
frame_buf.reserve(4 + data.len());
|
||||||
frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]);
|
frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]);
|
||||||
frame_buf.extend_from_slice(data);
|
frame_buf.extend_from_slice(data);
|
||||||
client_writer
|
client_writer
|
||||||
|
|
@ -428,11 +441,14 @@ where
|
||||||
len_val |= 0x8000_0000;
|
len_val |= 0x8000_0000;
|
||||||
}
|
}
|
||||||
let total = 4 + data.len() + padding_len;
|
let total = 4 + data.len() + padding_len;
|
||||||
let mut frame_buf = Vec::with_capacity(total);
|
frame_buf.clear();
|
||||||
|
frame_buf.reserve(total);
|
||||||
frame_buf.extend_from_slice(&len_val.to_le_bytes());
|
frame_buf.extend_from_slice(&len_val.to_le_bytes());
|
||||||
frame_buf.extend_from_slice(data);
|
frame_buf.extend_from_slice(data);
|
||||||
if padding_len > 0 {
|
if padding_len > 0 {
|
||||||
frame_buf.extend_from_slice(&rng.bytes(padding_len));
|
let start = frame_buf.len();
|
||||||
|
frame_buf.resize(start + padding_len, 0);
|
||||||
|
rng.fill(&mut frame_buf[start..]);
|
||||||
}
|
}
|
||||||
client_writer
|
client_writer
|
||||||
.write_all(&frame_buf)
|
.write_all(&frame_buf)
|
||||||
|
|
|
||||||
|
|
@ -109,7 +109,22 @@ impl Stats {
|
||||||
|
|
||||||
pub fn decrement_user_curr_connects(&self, user: &str) {
|
pub fn decrement_user_curr_connects(&self, user: &str) {
|
||||||
if let Some(stats) = self.user_stats.get(user) {
|
if let Some(stats) = self.user_stats.get(user) {
|
||||||
stats.curr_connects.fetch_sub(1, Ordering::Relaxed);
|
let counter = &stats.curr_connects;
|
||||||
|
let mut current = counter.load(Ordering::Relaxed);
|
||||||
|
loop {
|
||||||
|
if current == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
match counter.compare_exchange_weak(
|
||||||
|
current,
|
||||||
|
current - 1,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
) {
|
||||||
|
Ok(_) => break,
|
||||||
|
Err(actual) => current = actual,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -223,7 +223,7 @@ pub(crate) struct RpcWriter {
|
||||||
impl RpcWriter {
|
impl RpcWriter {
|
||||||
pub(crate) async fn send(&mut self, payload: &[u8]) -> Result<()> {
|
pub(crate) async fn send(&mut self, payload: &[u8]) -> Result<()> {
|
||||||
let frame = build_rpc_frame(self.seq_no, payload, self.crc_mode);
|
let frame = build_rpc_frame(self.seq_no, payload, self.crc_mode);
|
||||||
self.seq_no += 1;
|
self.seq_no = self.seq_no.wrapping_add(1);
|
||||||
|
|
||||||
let pad = (16 - (frame.len() % 16)) % 16;
|
let pad = (16 - (frame.len() % 16)) % 16;
|
||||||
let mut buf = frame;
|
let mut buf = frame;
|
||||||
|
|
|
||||||
|
|
@ -415,7 +415,6 @@ impl MePool {
|
||||||
let degraded = Arc::new(AtomicBool::new(false));
|
let degraded = Arc::new(AtomicBool::new(false));
|
||||||
let draining = Arc::new(AtomicBool::new(false));
|
let draining = Arc::new(AtomicBool::new(false));
|
||||||
let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
|
let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
|
||||||
let tx_for_keepalive = tx.clone();
|
|
||||||
let mut rpc_writer = RpcWriter {
|
let mut rpc_writer = RpcWriter {
|
||||||
writer: hs.wr,
|
writer: hs.wr,
|
||||||
key: hs.write_key,
|
key: hs.write_key,
|
||||||
|
|
@ -452,7 +451,7 @@ impl MePool {
|
||||||
};
|
};
|
||||||
self.writers.write().await.push(writer.clone());
|
self.writers.write().await.push(writer.clone());
|
||||||
self.conn_count.fetch_add(1, Ordering::Relaxed);
|
self.conn_count.fetch_add(1, Ordering::Relaxed);
|
||||||
self.writer_available.notify_waiters();
|
self.writer_available.notify_one();
|
||||||
|
|
||||||
let reg = self.registry.clone();
|
let reg = self.registry.clone();
|
||||||
let writers_arc = self.writers_arc();
|
let writers_arc = self.writers_arc();
|
||||||
|
|
@ -461,7 +460,6 @@ impl MePool {
|
||||||
let rtt_stats = self.rtt_stats.clone();
|
let rtt_stats = self.rtt_stats.clone();
|
||||||
let stats_reader = self.stats.clone();
|
let stats_reader = self.stats.clone();
|
||||||
let stats_ping = self.stats.clone();
|
let stats_ping = self.stats.clone();
|
||||||
let stats_keepalive = self.stats.clone();
|
|
||||||
let pool = Arc::downgrade(self);
|
let pool = Arc::downgrade(self);
|
||||||
let cancel_ping = cancel.clone();
|
let cancel_ping = cancel.clone();
|
||||||
let tx_ping = tx.clone();
|
let tx_ping = tx.clone();
|
||||||
|
|
@ -474,7 +472,6 @@ impl MePool {
|
||||||
let keepalive_jitter = self.me_keepalive_jitter;
|
let keepalive_jitter = self.me_keepalive_jitter;
|
||||||
let cancel_reader_token = cancel.clone();
|
let cancel_reader_token = cancel.clone();
|
||||||
let cancel_ping_token = cancel_ping.clone();
|
let cancel_ping_token = cancel_ping.clone();
|
||||||
let cancel_keepalive_token = cancel.clone();
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let res = reader_loop(
|
let res = reader_loop(
|
||||||
|
|
@ -513,15 +510,40 @@ impl MePool {
|
||||||
let pool_ping = Arc::downgrade(self);
|
let pool_ping = Arc::downgrade(self);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut ping_id: i64 = rand::random::<i64>();
|
let mut ping_id: i64 = rand::random::<i64>();
|
||||||
loop {
|
// Per-writer jittered start to avoid phase sync.
|
||||||
|
let startup_jitter = if keepalive_enabled {
|
||||||
|
let jitter_cap_ms = keepalive_interval.as_millis() / 2;
|
||||||
|
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
|
||||||
|
Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))
|
||||||
|
} else {
|
||||||
let jitter = rand::rng()
|
let jitter = rand::rng()
|
||||||
.random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
|
.random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
|
||||||
let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
|
let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
|
||||||
|
Duration::from_secs(wait)
|
||||||
|
};
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancel_ping_token.cancelled() => return,
|
||||||
|
_ = tokio::time::sleep(startup_jitter) => {}
|
||||||
|
}
|
||||||
|
loop {
|
||||||
|
let wait = if keepalive_enabled {
|
||||||
|
let jitter_cap_ms = keepalive_interval.as_millis() / 2;
|
||||||
|
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
|
||||||
|
keepalive_interval
|
||||||
|
+ Duration::from_millis(
|
||||||
|
rand::rng().random_range(0..=effective_jitter_ms as u64)
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
let jitter = rand::rng()
|
||||||
|
.random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
|
||||||
|
let secs = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
|
||||||
|
Duration::from_secs(secs)
|
||||||
|
};
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = cancel_ping_token.cancelled() => {
|
_ = cancel_ping_token.cancelled() => {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
_ = tokio::time::sleep(Duration::from_secs(wait)) => {}
|
_ = tokio::time::sleep(wait) => {}
|
||||||
}
|
}
|
||||||
let sent_id = ping_id;
|
let sent_id = ping_id;
|
||||||
let mut p = Vec::with_capacity(12);
|
let mut p = Vec::with_capacity(12);
|
||||||
|
|
@ -538,8 +560,10 @@ impl MePool {
|
||||||
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
|
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
|
||||||
}
|
}
|
||||||
ping_id = ping_id.wrapping_add(1);
|
ping_id = ping_id.wrapping_add(1);
|
||||||
|
stats_ping.increment_me_keepalive_sent();
|
||||||
if tx_ping.send(WriterCommand::DataAndFlush(p)).await.is_err() {
|
if tx_ping.send(WriterCommand::DataAndFlush(p)).await.is_err() {
|
||||||
debug!("Active ME ping failed, removing dead writer");
|
stats_ping.increment_me_keepalive_failed();
|
||||||
|
debug!("ME ping failed, removing dead writer");
|
||||||
cancel_ping.cancel();
|
cancel_ping.cancel();
|
||||||
if let Some(pool) = pool_ping.upgrade() {
|
if let Some(pool) = pool_ping.upgrade() {
|
||||||
if cleanup_for_ping
|
if cleanup_for_ping
|
||||||
|
|
@ -554,46 +578,6 @@ impl MePool {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
if keepalive_enabled {
|
|
||||||
let tx_keepalive = tx_for_keepalive;
|
|
||||||
let cancel_keepalive = cancel_keepalive_token;
|
|
||||||
let ping_tracker_keepalive = ping_tracker.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
// Per-writer jittered start to avoid phase sync.
|
|
||||||
let jitter_cap_ms = keepalive_interval.as_millis() / 2;
|
|
||||||
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
|
|
||||||
let initial_jitter_ms = rand::rng().random_range(0..=effective_jitter_ms as u64);
|
|
||||||
tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await;
|
|
||||||
let mut ping_id: i64 = rand::random::<i64>();
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
_ = cancel_keepalive.cancelled() => break,
|
|
||||||
_ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))) => {}
|
|
||||||
}
|
|
||||||
let sent_id = ping_id;
|
|
||||||
ping_id = ping_id.wrapping_add(1);
|
|
||||||
let mut p = Vec::with_capacity(12);
|
|
||||||
p.extend_from_slice(&RPC_PING_U32.to_le_bytes());
|
|
||||||
p.extend_from_slice(&sent_id.to_le_bytes());
|
|
||||||
{
|
|
||||||
let mut tracker = ping_tracker_keepalive.lock().await;
|
|
||||||
let before = tracker.len();
|
|
||||||
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
|
|
||||||
let expired = before.saturating_sub(tracker.len());
|
|
||||||
if expired > 0 {
|
|
||||||
stats_keepalive.increment_me_keepalive_timeout_by(expired as u64);
|
|
||||||
}
|
|
||||||
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
|
|
||||||
}
|
|
||||||
stats_keepalive.increment_me_keepalive_sent();
|
|
||||||
if tx_keepalive.send(WriterCommand::DataAndFlush(p)).await.is_err() {
|
|
||||||
stats_keepalive.increment_me_keepalive_failed();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -630,15 +614,19 @@ impl MePool {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_writer_only(&self, writer_id: u64) -> Vec<BoundConn> {
|
async fn remove_writer_only(&self, writer_id: u64) -> Vec<BoundConn> {
|
||||||
|
let mut close_tx: Option<mpsc::Sender<WriterCommand>> = None;
|
||||||
{
|
{
|
||||||
let mut ws = self.writers.write().await;
|
let mut ws = self.writers.write().await;
|
||||||
if let Some(pos) = ws.iter().position(|w| w.id == writer_id) {
|
if let Some(pos) = ws.iter().position(|w| w.id == writer_id) {
|
||||||
let w = ws.remove(pos);
|
let w = ws.remove(pos);
|
||||||
w.cancel.cancel();
|
w.cancel.cancel();
|
||||||
let _ = w.tx.send(WriterCommand::Close).await;
|
close_tx = Some(w.tx.clone());
|
||||||
self.conn_count.fetch_sub(1, Ordering::Relaxed);
|
self.conn_count.fetch_sub(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if let Some(tx) = close_tx {
|
||||||
|
let _ = tx.send(WriterCommand::Close).await;
|
||||||
|
}
|
||||||
self.rtt_stats.lock().await.remove(&writer_id);
|
self.rtt_stats.lock().await.remove(&writer_id);
|
||||||
self.registry.writer_lost(writer_id).await
|
self.registry.writer_lost(writer_id).await
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use tokio::sync::{mpsc, RwLock};
|
use tokio::sync::{mpsc, RwLock};
|
||||||
use tokio::sync::mpsc::error::TrySendError;
|
use tokio::sync::mpsc::error::TrySendError;
|
||||||
|
|
@ -9,6 +10,7 @@ use super::codec::WriterCommand;
|
||||||
use super::MeResponse;
|
use super::MeResponse;
|
||||||
|
|
||||||
const ROUTE_CHANNEL_CAPACITY: usize = 4096;
|
const ROUTE_CHANNEL_CAPACITY: usize = 4096;
|
||||||
|
const ROUTE_BACKPRESSURE_TIMEOUT: Duration = Duration::from_millis(25);
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
pub enum RouteResult {
|
pub enum RouteResult {
|
||||||
|
|
@ -94,15 +96,26 @@ impl ConnRegistry {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult {
|
pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult {
|
||||||
let inner = self.inner.read().await;
|
let tx = {
|
||||||
if let Some(tx) = inner.map.get(&id) {
|
let inner = self.inner.read().await;
|
||||||
match tx.try_send(resp) {
|
inner.map.get(&id).cloned()
|
||||||
Ok(()) => RouteResult::Routed,
|
};
|
||||||
Err(TrySendError::Closed(_)) => RouteResult::ChannelClosed,
|
|
||||||
Err(TrySendError::Full(_)) => RouteResult::QueueFull,
|
let Some(tx) = tx else {
|
||||||
|
return RouteResult::NoConn;
|
||||||
|
};
|
||||||
|
|
||||||
|
match tx.try_send(resp) {
|
||||||
|
Ok(()) => RouteResult::Routed,
|
||||||
|
Err(TrySendError::Closed(_)) => RouteResult::ChannelClosed,
|
||||||
|
Err(TrySendError::Full(resp)) => {
|
||||||
|
// Absorb short bursts without dropping/closing the session immediately.
|
||||||
|
match tokio::time::timeout(ROUTE_BACKPRESSURE_TIMEOUT, tx.send(resp)).await {
|
||||||
|
Ok(Ok(())) => RouteResult::Routed,
|
||||||
|
Ok(Err(_)) => RouteResult::ChannelClosed,
|
||||||
|
Err(_) => RouteResult::QueueFull,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
RouteResult::NoConn
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -62,6 +62,8 @@ impl MePool {
|
||||||
let mut writers_snapshot = {
|
let mut writers_snapshot = {
|
||||||
let ws = self.writers.read().await;
|
let ws = self.writers.read().await;
|
||||||
if ws.is_empty() {
|
if ws.is_empty() {
|
||||||
|
// Create waiter before recovery attempts so notify_one permits are not missed.
|
||||||
|
let waiter = self.writer_available.notified();
|
||||||
drop(ws);
|
drop(ws);
|
||||||
for family in self.family_order() {
|
for family in self.family_order() {
|
||||||
let map = match family {
|
let map = match family {
|
||||||
|
|
@ -72,13 +74,19 @@ impl MePool {
|
||||||
for (ip, port) in addrs {
|
for (ip, port) in addrs {
|
||||||
let addr = SocketAddr::new(*ip, *port);
|
let addr = SocketAddr::new(*ip, *port);
|
||||||
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
|
if self.connect_one(addr, self.rng.as_ref()).await.is_ok() {
|
||||||
self.writer_available.notify_waiters();
|
self.writer_available.notify_one();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if tokio::time::timeout(Duration::from_secs(3), self.writer_available.notified()).await.is_err() {
|
if !self.writers.read().await.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if tokio::time::timeout(Duration::from_secs(3), waiter).await.is_err() {
|
||||||
|
if !self.writers.read().await.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
return Err(ProxyError::Proxy("All ME connections dead (waited 3s)".into()));
|
return Err(ProxyError::Proxy("All ME connections dead (waited 3s)".into()));
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
|
|
||||||
|
|
@ -394,6 +394,7 @@ impl UpstreamManager {
|
||||||
Ok(stream)
|
Ok(stream)
|
||||||
},
|
},
|
||||||
UpstreamType::Socks4 { address, interface, user_id } => {
|
UpstreamType::Socks4 { address, interface, user_id } => {
|
||||||
|
let connect_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS);
|
||||||
// Try to parse as SocketAddr first (IP:port), otherwise treat as hostname:port
|
// Try to parse as SocketAddr first (IP:port), otherwise treat as hostname:port
|
||||||
let mut stream = if let Ok(proxy_addr) = address.parse::<SocketAddr>() {
|
let mut stream = if let Ok(proxy_addr) = address.parse::<SocketAddr>() {
|
||||||
// IP:port format - use socket with optional interface binding
|
// IP:port format - use socket with optional interface binding
|
||||||
|
|
@ -416,7 +417,15 @@ impl UpstreamManager {
|
||||||
let std_stream: std::net::TcpStream = socket.into();
|
let std_stream: std::net::TcpStream = socket.into();
|
||||||
let stream = TcpStream::from_std(std_stream)?;
|
let stream = TcpStream::from_std(std_stream)?;
|
||||||
|
|
||||||
stream.writable().await?;
|
match tokio::time::timeout(connect_timeout, stream.writable()).await {
|
||||||
|
Ok(Ok(())) => {}
|
||||||
|
Ok(Err(e)) => return Err(ProxyError::Io(e)),
|
||||||
|
Err(_) => {
|
||||||
|
return Err(ProxyError::ConnectionTimeout {
|
||||||
|
addr: proxy_addr.to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
if let Some(e) = stream.take_error()? {
|
if let Some(e) = stream.take_error()? {
|
||||||
return Err(ProxyError::Io(e));
|
return Err(ProxyError::Io(e));
|
||||||
}
|
}
|
||||||
|
|
@ -427,8 +436,15 @@ impl UpstreamManager {
|
||||||
if interface.is_some() {
|
if interface.is_some() {
|
||||||
warn!("SOCKS4 interface binding is not supported for hostname addresses, ignoring");
|
warn!("SOCKS4 interface binding is not supported for hostname addresses, ignoring");
|
||||||
}
|
}
|
||||||
TcpStream::connect(address).await
|
match tokio::time::timeout(connect_timeout, TcpStream::connect(address)).await {
|
||||||
.map_err(ProxyError::Io)?
|
Ok(Ok(stream)) => stream,
|
||||||
|
Ok(Err(e)) => return Err(ProxyError::Io(e)),
|
||||||
|
Err(_) => {
|
||||||
|
return Err(ProxyError::ConnectionTimeout {
|
||||||
|
addr: address.clone(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// replace socks user_id with config.selected_scope, if set
|
// replace socks user_id with config.selected_scope, if set
|
||||||
|
|
@ -436,10 +452,19 @@ impl UpstreamManager {
|
||||||
.filter(|s| !s.is_empty());
|
.filter(|s| !s.is_empty());
|
||||||
let _user_id: Option<&str> = scope.or(user_id.as_deref());
|
let _user_id: Option<&str> = scope.or(user_id.as_deref());
|
||||||
|
|
||||||
connect_socks4(&mut stream, target, _user_id).await?;
|
match tokio::time::timeout(connect_timeout, connect_socks4(&mut stream, target, _user_id)).await {
|
||||||
|
Ok(Ok(())) => {}
|
||||||
|
Ok(Err(e)) => return Err(e),
|
||||||
|
Err(_) => {
|
||||||
|
return Err(ProxyError::ConnectionTimeout {
|
||||||
|
addr: target.to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(stream)
|
Ok(stream)
|
||||||
},
|
},
|
||||||
UpstreamType::Socks5 { address, interface, username, password } => {
|
UpstreamType::Socks5 { address, interface, username, password } => {
|
||||||
|
let connect_timeout = Duration::from_secs(DIRECT_CONNECT_TIMEOUT_SECS);
|
||||||
// Try to parse as SocketAddr first (IP:port), otherwise treat as hostname:port
|
// Try to parse as SocketAddr first (IP:port), otherwise treat as hostname:port
|
||||||
let mut stream = if let Ok(proxy_addr) = address.parse::<SocketAddr>() {
|
let mut stream = if let Ok(proxy_addr) = address.parse::<SocketAddr>() {
|
||||||
// IP:port format - use socket with optional interface binding
|
// IP:port format - use socket with optional interface binding
|
||||||
|
|
@ -462,7 +487,15 @@ impl UpstreamManager {
|
||||||
let std_stream: std::net::TcpStream = socket.into();
|
let std_stream: std::net::TcpStream = socket.into();
|
||||||
let stream = TcpStream::from_std(std_stream)?;
|
let stream = TcpStream::from_std(std_stream)?;
|
||||||
|
|
||||||
stream.writable().await?;
|
match tokio::time::timeout(connect_timeout, stream.writable()).await {
|
||||||
|
Ok(Ok(())) => {}
|
||||||
|
Ok(Err(e)) => return Err(ProxyError::Io(e)),
|
||||||
|
Err(_) => {
|
||||||
|
return Err(ProxyError::ConnectionTimeout {
|
||||||
|
addr: proxy_addr.to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
if let Some(e) = stream.take_error()? {
|
if let Some(e) = stream.take_error()? {
|
||||||
return Err(ProxyError::Io(e));
|
return Err(ProxyError::Io(e));
|
||||||
}
|
}
|
||||||
|
|
@ -473,8 +506,15 @@ impl UpstreamManager {
|
||||||
if interface.is_some() {
|
if interface.is_some() {
|
||||||
warn!("SOCKS5 interface binding is not supported for hostname addresses, ignoring");
|
warn!("SOCKS5 interface binding is not supported for hostname addresses, ignoring");
|
||||||
}
|
}
|
||||||
TcpStream::connect(address).await
|
match tokio::time::timeout(connect_timeout, TcpStream::connect(address)).await {
|
||||||
.map_err(ProxyError::Io)?
|
Ok(Ok(stream)) => stream,
|
||||||
|
Ok(Err(e)) => return Err(ProxyError::Io(e)),
|
||||||
|
Err(_) => {
|
||||||
|
return Err(ProxyError::ConnectionTimeout {
|
||||||
|
addr: address.clone(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!(config = ?config, "Socks5 connection");
|
debug!(config = ?config, "Socks5 connection");
|
||||||
|
|
@ -484,7 +524,20 @@ impl UpstreamManager {
|
||||||
let _username: Option<&str> = scope.or(username.as_deref());
|
let _username: Option<&str> = scope.or(username.as_deref());
|
||||||
let _password: Option<&str> = scope.or(password.as_deref());
|
let _password: Option<&str> = scope.or(password.as_deref());
|
||||||
|
|
||||||
connect_socks5(&mut stream, target, _username, _password).await?;
|
match tokio::time::timeout(
|
||||||
|
connect_timeout,
|
||||||
|
connect_socks5(&mut stream, target, _username, _password),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(Ok(())) => {}
|
||||||
|
Ok(Err(e)) => return Err(e),
|
||||||
|
Err(_) => {
|
||||||
|
return Err(ProxyError::ConnectionTimeout {
|
||||||
|
addr: target.to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(stream)
|
Ok(stream)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue