Compare commits

..

1 Commits

Author SHA1 Message Date
Maxim Myalin 8c884fe891
Merge 062464175e into f230f2ce0e 2026-03-19 01:13:48 +03:00
11 changed files with 82 additions and 441 deletions

View File

@ -1,19 +1,17 @@
# Code of Conduct
# TELEMT Code of Conduct
## 1. Purpose
Telemt exists to solve technical problems.
Telemt is open to contributors who want to learn, improve and build meaningful systems together.
It is not a platform for ideology, politics, or personal agendas.
It is a place for building, testing, reasoning, documenting, and improving systems.
All interaction here is defined by systems, constraints, and outcomes.
Discussions that advance this work are in scope. Discussions that divert it are not.
Technology has consequences. Responsibility is inherent.
Technology has consequences.
Responsibility is inherent.
> **Zweck bestimmt die Form.**
> Purpose defines form.
---
@ -26,24 +24,20 @@ Technology has consequences. Responsibility is inherent.
* **Clarity over noise**
Communication is structured, concise, and relevant.
* **Openness with standards**
Participation is open. The work remains disciplined.
* **Independence**
Telemt does not represent any state, ideology, or organization.
* **Independence of judgment**
Claims are evaluated on technical merit, not affiliation or posture.
* **Open participation**
Access is open. Standards are not.
* **Responsibility over capability**
Capability does not justify careless use.
* **Cooperation over friction**
Progress depends on coordination, mutual support, and honest review.
Progress is achieved through coordination and mutual support.
* **Good intent, rigorous method**
Assume good intent, but require rigor.
> **Aussagen gelten nach ihrer Begründung.**
> Claims are weighed by evidence.
> **Fakten sind nicht verhandelbar.**
> Facts are not negotiable.
---
@ -59,12 +53,7 @@ Participants are expected to:
* Help others reach correct and reproducible outcomes
* Act in a way that improves the system as a whole
Precision is learned.
New contributors are welcome. They are expected to grow into these standards. Existing contributors are expected to make that growth possible.
> **Wer behauptet, belegt.**
> Whoever claims, proves.
---
@ -73,25 +62,22 @@ New contributors are welcome. They are expected to grow into these standards. Ex
The following is not allowed:
* Personal attacks, insults, harassment, or intimidation
* Repeatedly derailing discussion away from Telemts purpose
* Personal attacks, insults, harassment, intimidation
* Political discourse, propaganda, ideological conflict
* Off-topic or disruptive discussion
* Spam, flooding, or repeated low-quality input
* Misinformation presented as fact
* Attempts to degrade, destabilize, or exhaust Telemt or its participants
* Use of Telemt or its spaces to enable harm
Telemt is not a venue for disputes that displace technical work.
Such discussions may be closed, removed, or redirected.
* Attempts to degrade or destabilize Telemt
* Use of Telemt or its space to enable harm
> **Störung ist kein Beitrag.**
> Disruption is not contribution.
---
## 5. Security and Misuse
Telemt is intended for responsible use.
Telemt is intended for lawful and responsible use.
* Do not use it to plan, coordinate, or execute harm
* Do not publish vulnerabilities without responsible disclosure
@ -100,24 +86,11 @@ Telemt is intended for responsible use.
Security is both technical and behavioral.
> **Verantwortung endet nicht am Code.**
> Responsibility does not end at the code.
---
## 6. Openness
Telemt is open to contributors of different backgrounds, experience levels, and working styles.
Standards are public, legible, and applied to the work itself.
Questions are welcome. Careful disagreement is welcome. Honest correction is welcome.
Gatekeeping by obscurity, status signaling, or hostility is not.
---
## 7. Scope
## 6. Scope
This Code of Conduct applies to all official spaces:
@ -127,43 +100,31 @@ This Code of Conduct applies to all official spaces:
---
## 8. Maintainer Stewardship
## 7. Enforcement
Maintainers are responsible for final decisions in matters of conduct, scope, and direction.
Maintainers may act to preserve the integrity of Telemt:
This responsibility is stewardship: preserving continuity, protecting signal, maintaining standards, and keeping Telemt workable for others.
* Remove content
* Lock discussions
* Reject contributions
* Restrict or ban participants
Judgment should be exercised with restraint, consistency, and institutional responsibility.
Not every decision requires extended debate.
Not every intervention requires public explanation.
All decisions are expected to serve the durability, clarity, and integrity of Telemt.
Actions are taken to maintain function, continuity, and signal quality.
> **Ordnung ist Voraussetzung der Funktion.**
> Order is the precondition of function.
---
## 9. Enforcement
## 8. Maintainer Authority
Maintainers may act to preserve the integrity of Telemt, including by:
Maintainers have final authority in interpretation and enforcement.
* Removing content
* Locking discussions
* Rejecting contributions
* Restricting or banning participants
Actions are taken to maintain function, continuity, and signal quality.
Where possible, correction is preferred to exclusion.
Where necessary, exclusion is preferred to decay.
Authority exists to ensure continuity, consistency, and technical direction.
---
## 10. Final
## 9. Final
Telemt is built on discipline, structure, and shared intent.
@ -171,23 +132,19 @@ Signal over noise.
Facts over opinion.
Systems over rhetoric.
Work is collective.
Work here is collective.
Outcomes are shared.
Responsibility is distributed.
Precision is learned.
Rigor is expected.
Help is part of the work.
> **Ordnung ist Voraussetzung der Freiheit.**
If you contribute — contribute with care.
If you contribute — contribute with precision.
If you speak — speak with substance.
If you engage — engage constructively.
---
## 11. After All
## 10. After All
Systems outlive intentions.
@ -195,14 +152,12 @@ What is built will be used.
What is released will propagate.
What is maintained will define the future state.
There is no neutral infrastructure, only infrastructure shaped well or poorly.
There is no neutral infrastructure.
> **Jedes System trägt Verantwortung.**
> Every system carries responsibility.
Stability requires discipline.
Freedom requires structure.
Trust requires honesty.
In the end, the system reflects its contributors.

View File

@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.3.24"
version = "3.3.23"
edition = "2024"
[dependencies]

View File

@ -646,11 +646,6 @@ impl ProxyConfig {
"general.me_route_backpressure_base_timeout_ms must be > 0".to_string(),
));
}
if config.general.me_route_backpressure_base_timeout_ms > 5000 {
return Err(ProxyError::Config(
"general.me_route_backpressure_base_timeout_ms must be within [1, 5000]".to_string(),
));
}
if config.general.me_route_backpressure_high_timeout_ms
< config.general.me_route_backpressure_base_timeout_ms
@ -659,11 +654,6 @@ impl ProxyConfig {
"general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(),
));
}
if config.general.me_route_backpressure_high_timeout_ms > 5000 {
return Err(ProxyError::Config(
"general.me_route_backpressure_high_timeout_ms must be within [1, 5000]".to_string(),
));
}
if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) {
return Err(ProxyError::Config(
@ -1678,47 +1668,6 @@ mod tests {
let _ = std::fs::remove_file(path_valid);
}
#[test]
fn me_route_backpressure_base_timeout_ms_out_of_range_is_rejected() {
let toml = r#"
[general]
me_route_backpressure_base_timeout_ms = 5001
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_route_backpressure_base_timeout_ms_out_of_range_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.me_route_backpressure_base_timeout_ms must be within [1, 5000]"));
let _ = std::fs::remove_file(path);
}
#[test]
fn me_route_backpressure_high_timeout_ms_out_of_range_is_rejected() {
let toml = r#"
[general]
me_route_backpressure_base_timeout_ms = 100
me_route_backpressure_high_timeout_ms = 5001
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_route_backpressure_high_timeout_ms_out_of_range_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.me_route_backpressure_high_timeout_ms must be within [1, 5000]"));
let _ = std::fs::remove_file(path);
}
#[test]
fn me_route_no_writer_wait_ms_out_of_range_is_rejected() {
let toml = r#"

View File

@ -1692,57 +1692,6 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_close_signal_drop_total Close-signal drops for already-removed ME writers"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_close_signal_drop_total counter");
let _ = writeln!(
out,
"telemt_me_writer_close_signal_drop_total {}",
if me_allows_normal {
stats.get_me_writer_close_signal_drop_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_close_signal_channel_full_total Close-signal drops caused by full writer command channels"
);
let _ = writeln!(
out,
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
);
let _ = writeln!(
out,
"telemt_me_writer_close_signal_channel_full_total {}",
if me_allows_normal {
stats.get_me_writer_close_signal_channel_full_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_draining_writers_reap_progress_total Draining-writer removals processed by reap cleanup"
);
let _ = writeln!(
out,
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
);
let _ = writeln!(
out,
"telemt_me_draining_writers_reap_progress_total {}",
if me_allows_normal {
stats.get_me_draining_writers_reap_progress_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals");
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter");
let _ = writeln!(
@ -2175,13 +2124,6 @@ mod tests {
assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"));
assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter"));
assert!(output.contains(
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
));
assert!(output.contains(
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
));
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter"));
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter"));
assert!(output.contains(

View File

@ -123,9 +123,6 @@ pub struct Stats {
pool_drain_soft_evict_total: AtomicU64,
pool_drain_soft_evict_writer_total: AtomicU64,
pool_stale_pick_total: AtomicU64,
me_writer_close_signal_drop_total: AtomicU64,
me_writer_close_signal_channel_full_total: AtomicU64,
me_draining_writers_reap_progress_total: AtomicU64,
me_writer_removed_total: AtomicU64,
me_writer_removed_unexpected_total: AtomicU64,
me_refill_triggered_total: AtomicU64,
@ -737,24 +734,6 @@ impl Stats {
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_close_signal_drop_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_close_signal_drop_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_close_signal_channel_full_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_close_signal_channel_full_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_draining_writers_reap_progress_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_draining_writers_reap_progress_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_removed_total(&self) {
if self.telemetry_me_allows_debug() {
self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed);
@ -1280,17 +1259,6 @@ impl Stats {
pub fn get_pool_stale_pick_total(&self) -> u64 {
self.pool_stale_pick_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_close_signal_drop_total(&self) -> u64 {
self.me_writer_close_signal_drop_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_close_signal_channel_full_total(&self) -> u64 {
self.me_writer_close_signal_channel_full_total
.load(Ordering::Relaxed)
}
pub fn get_me_draining_writers_reap_progress_total(&self) -> u64 {
self.me_draining_writers_reap_progress_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_removed_total(&self) -> u64 {
self.me_writer_removed_total.load(Ordering::Relaxed)
}

View File

@ -314,8 +314,6 @@ pub(super) async fn reap_draining_writers(
}
pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer_id).await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
closed_total = closed_total.saturating_add(1);
}
for writer_id in empty_writer_ids {
@ -326,8 +324,6 @@ pub(super) async fn reap_draining_writers(
continue;
}
pool.remove_writer_and_close_clients(writer_id).await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
closed_total = closed_total.saturating_add(1);
}

View File

@ -4,7 +4,6 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, Instant};
use bytes::Bytes;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
@ -210,89 +209,6 @@ async fn reap_draining_writers_removes_empty_draining_writers() {
assert_eq!(current_writer_ids(&pool).await, vec![3]);
}
#[tokio::test]
async fn reap_draining_writers_does_not_block_on_stuck_writer_close_signal() {
let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs();
let (blocked_tx, blocked_rx) = mpsc::channel::<WriterCommand>(1);
assert!(
blocked_tx
.try_send(WriterCommand::Data(Bytes::from_static(b"stuck")))
.is_ok()
);
let blocked_rx_guard = tokio::spawn(async move {
let _hold_rx = blocked_rx;
tokio::time::sleep(Duration::from_secs(30)).await;
});
let blocked_writer_id = 90u64;
let blocked_writer = MeWriter {
id: blocked_writer_id,
addr: SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
4500 + blocked_writer_id as u16,
),
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
writer_dc: 2,
generation: 1,
contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())),
created_at: Instant::now() - Duration::from_secs(blocked_writer_id),
tx: blocked_tx.clone(),
cancel: CancellationToken::new(),
degraded: Arc::new(AtomicBool::new(false)),
rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)),
draining: Arc::new(AtomicBool::new(true)),
draining_started_at_epoch_secs: Arc::new(AtomicU64::new(
now_epoch_secs.saturating_sub(120),
)),
drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)),
allow_drain_fallback: Arc::new(AtomicBool::new(false)),
};
pool.writers.write().await.push(blocked_writer);
pool.registry
.register_writer(blocked_writer_id, blocked_tx)
.await;
pool.conn_count.fetch_add(1, Ordering::Relaxed);
insert_draining_writer(&pool, 91, now_epoch_secs.saturating_sub(110), 0, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
let reap_res = tokio::time::timeout(
Duration::from_millis(500),
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed),
)
.await;
blocked_rx_guard.abort();
assert!(reap_res.is_ok(), "reap should not block on close signal");
assert!(current_writer_ids(&pool).await.is_empty());
assert_eq!(pool.stats.get_me_writer_close_signal_drop_total(), 2);
assert_eq!(pool.stats.get_me_writer_close_signal_channel_full_total(), 1);
assert_eq!(pool.stats.get_me_draining_writers_reap_progress_total(), 2);
let activity = pool.registry.writer_activity_snapshot().await;
assert!(!activity.bound_clients_by_writer.contains_key(&blocked_writer_id));
assert!(!activity.bound_clients_by_writer.contains_key(&91));
let (probe_conn_id, _rx) = pool.registry.register().await;
assert!(
!pool.registry
.bind_writer(
probe_conn_id,
blocked_writer_id,
ConnMeta {
target_dc: 2,
client_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6400),
our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443),
proto_flags: 0,
},
)
.await
);
let _ = pool.registry.unregister(probe_conn_id).await;
}
#[tokio::test]
async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() {
let pool = make_pool(2).await;

View File

@ -8,7 +8,6 @@ use bytes::Bytes;
use bytes::BytesMut;
use rand::Rng;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
@ -492,9 +491,11 @@ impl MePool {
}
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
// Full client cleanup now happens inside `registry.writer_lost` to keep
// writer reap/remove paths strictly non-blocking per connection.
let _ = self.remove_writer_only(writer_id).await;
let conns = self.remove_writer_only(writer_id).await;
for bound in conns {
let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await;
let _ = self.registry.unregister(bound.conn_id).await;
}
}
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> {
@ -524,11 +525,6 @@ impl MePool {
self.conn_count.fetch_sub(1, Ordering::Relaxed);
}
}
// State invariant:
// - writer is removed from `self.writers` (pool visibility),
// - writer is removed from registry routing/binding maps via `writer_lost`.
// The close command below is only a best-effort accelerator for task shutdown.
// Cleanup progress must never depend on command-channel availability.
let conns = self.registry.writer_lost(writer_id).await;
{
let mut tracker = self.ping_tracker.lock().await;
@ -536,25 +532,7 @@ impl MePool {
}
self.rtt_stats.lock().await.remove(&writer_id);
if let Some(tx) = close_tx {
match tx.try_send(WriterCommand::Close) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
self.stats.increment_me_writer_close_signal_drop_total();
self.stats
.increment_me_writer_close_signal_channel_full_total();
debug!(
writer_id,
"Skipping close signal for removed writer: command channel is full"
);
}
Err(TrySendError::Closed(_)) => {
self.stats.increment_me_writer_close_signal_drop_total();
debug!(
writer_id,
"Skipping close signal for removed writer: command channel is closed"
);
}
}
let _ = tx.send(WriterCommand::Close).await;
}
if trigger_refill
&& let Some(addr) = removed_addr

View File

@ -8,7 +8,6 @@ use bytes::{Bytes, BytesMut};
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio::sync::{Mutex, mpsc};
use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
@ -174,12 +173,12 @@ pub(crate) async fn reader_loop(
} else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
debug!(cid, "RPC_CLOSE_EXT from ME");
let _ = reg.route_nowait(cid, MeResponse::Close).await;
reg.route(cid, MeResponse::Close).await;
reg.unregister(cid).await;
} else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
debug!(cid, "RPC_CLOSE_CONN from ME");
let _ = reg.route_nowait(cid, MeResponse::Close).await;
reg.route(cid, MeResponse::Close).await;
reg.unregister(cid).await;
} else if pt == RPC_PING_U32 && body.len() >= 8 {
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
@ -187,15 +186,13 @@ pub(crate) async fn reader_loop(
let mut pong = Vec::with_capacity(12);
pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes());
pong.extend_from_slice(&ping_id.to_le_bytes());
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(pong))) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
debug!(ping_id, "PONG dropped: writer command channel is full");
}
Err(TrySendError::Closed(_)) => {
warn!("PONG send failed: writer channel closed");
break;
}
if tx
.send(WriterCommand::DataAndFlush(Bytes::from(pong)))
.await
.is_err()
{
warn!("PONG send failed");
break;
}
} else if pt == RPC_PONG_U32 && body.len() >= 8 {
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
@ -235,13 +232,6 @@ async fn send_close_conn(tx: &mpsc::Sender<WriterCommand>, conn_id: u64) {
let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes());
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
debug!(conn_id, "ME close_conn signal skipped: writer command channel is full");
}
Err(TrySendError::Closed(_)) => {
debug!(conn_id, "ME close_conn signal skipped: writer command channel is closed");
}
}
let _ = tx.send(WriterCommand::DataAndFlush(Bytes::from(p))).await;
}

View File

@ -169,7 +169,6 @@ impl ConnRegistry {
None
}
#[allow(dead_code)]
pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult {
let tx = {
let inner = self.inner.read().await;
@ -446,38 +445,30 @@ impl ConnRegistry {
}
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
let mut close_txs = Vec::<mpsc::Sender<MeResponse>>::new();
let mut out = Vec::new();
{
let mut inner = self.inner.write().await;
inner.writers.remove(&writer_id);
inner.last_meta_for_writer.remove(&writer_id);
inner.writer_idle_since_epoch_secs.remove(&writer_id);
let conns = inner
.conns_for_writer
.remove(&writer_id)
.unwrap_or_default()
.into_iter()
.collect::<Vec<_>>();
let mut inner = self.inner.write().await;
inner.writers.remove(&writer_id);
inner.last_meta_for_writer.remove(&writer_id);
inner.writer_idle_since_epoch_secs.remove(&writer_id);
let conns = inner
.conns_for_writer
.remove(&writer_id)
.unwrap_or_default()
.into_iter()
.collect::<Vec<_>>();
for conn_id in conns {
if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) {
continue;
}
inner.writer_for_conn.remove(&conn_id);
if let Some(client_tx) = inner.map.remove(&conn_id) {
close_txs.push(client_tx);
}
if let Some(meta) = inner.meta.remove(&conn_id) {
out.push(BoundConn { conn_id, meta });
}
let mut out = Vec::new();
for conn_id in conns {
if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) {
continue;
}
inner.writer_for_conn.remove(&conn_id);
if let Some(m) = inner.meta.get(&conn_id) {
out.push(BoundConn {
conn_id,
meta: m.clone(),
});
}
}
for client_tx in close_txs {
let _ = client_tx.try_send(MeResponse::Close);
}
out
}
@ -500,7 +491,6 @@ impl ConnRegistry {
#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use super::ConnMeta;
use super::ConnRegistry;
@ -673,39 +663,6 @@ mod tests {
assert!(registry.is_writer_empty(20).await);
}
#[tokio::test]
async fn writer_lost_removes_bound_conn_from_registry_and_signals_close() {
let registry = ConnRegistry::new();
let (conn_id, mut rx) = registry.register().await;
let (writer_tx, _writer_rx) = tokio::sync::mpsc::channel(8);
registry.register_writer(10, writer_tx).await;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
assert!(
registry
.bind_writer(
conn_id,
10,
ConnMeta {
target_dc: 2,
client_addr: addr,
our_addr: addr,
proto_flags: 0,
},
)
.await
);
let lost = registry.writer_lost(10).await;
assert_eq!(lost.len(), 1);
assert_eq!(lost[0].conn_id, conn_id);
assert!(registry.get_writer(conn_id).await.is_none());
assert!(registry.get_meta(conn_id).await.is_none());
assert_eq!(registry.unregister(conn_id).await, None);
let close = tokio::time::timeout(Duration::from_millis(50), rx.recv()).await;
assert!(matches!(close, Ok(Some(MeResponse::Close))));
}
#[tokio::test]
async fn bind_writer_rejects_unregistered_writer() {
let registry = ConnRegistry::new();

View File

@ -643,19 +643,13 @@ impl MePool {
let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes());
match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
debug!(
conn_id,
writer_id = w.writer_id,
"ME close skipped: writer command channel is full"
);
}
Err(TrySendError::Closed(_)) => {
debug!("ME close write failed");
self.remove_writer_and_close_clients(w.writer_id).await;
}
if w.tx
.send(WriterCommand::DataAndFlush(Bytes::from(p)))
.await
.is_err()
{
debug!("ME close write failed");
self.remove_writer_and_close_clients(w.writer_id).await;
}
} else {
debug!(conn_id, "ME close skipped (writer missing)");
@ -672,12 +666,8 @@ impl MePool {
p.extend_from_slice(&conn_id.to_le_bytes());
match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
debug!(
conn_id,
writer_id = w.writer_id,
"ME close_conn skipped: writer command channel is full"
);
Err(TrySendError::Full(cmd)) => {
let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await;
}
Err(TrySendError::Closed(_)) => {
debug!(conn_id, "ME close_conn skipped: writer channel closed");