Compare commits

..

1 Commits

Author SHA1 Message Date
Maxim Myalin 8c884fe891
Merge 062464175e into f230f2ce0e 2026-03-19 01:13:48 +03:00
11 changed files with 82 additions and 441 deletions

View File

@ -1,19 +1,17 @@
# Code of Conduct # TELEMT Code of Conduct
## 1. Purpose ## 1. Purpose
Telemt exists to solve technical problems. Telemt exists to solve technical problems.
Telemt is open to contributors who want to learn, improve and build meaningful systems together. It is not a platform for ideology, politics, or personal agendas.
It is a place for building, testing, reasoning, documenting, and improving systems. All interaction here is defined by systems, constraints, and outcomes.
Discussions that advance this work are in scope. Discussions that divert it are not. Technology has consequences.
Responsibility is inherent.
Technology has consequences. Responsibility is inherent.
> **Zweck bestimmt die Form.** > **Zweck bestimmt die Form.**
> Purpose defines form. > Purpose defines form.
--- ---
@ -26,24 +24,20 @@ Technology has consequences. Responsibility is inherent.
* **Clarity over noise** * **Clarity over noise**
Communication is structured, concise, and relevant. Communication is structured, concise, and relevant.
* **Openness with standards** * **Independence**
Participation is open. The work remains disciplined. Telemt does not represent any state, ideology, or organization.
* **Independence of judgment** * **Open participation**
Claims are evaluated on technical merit, not affiliation or posture. Access is open. Standards are not.
* **Responsibility over capability** * **Responsibility over capability**
Capability does not justify careless use. Capability does not justify careless use.
* **Cooperation over friction** * **Cooperation over friction**
Progress depends on coordination, mutual support, and honest review. Progress is achieved through coordination and mutual support.
* **Good intent, rigorous method** > **Fakten sind nicht verhandelbar.**
Assume good intent, but require rigor. > Facts are not negotiable.
> **Aussagen gelten nach ihrer Begründung.**
> Claims are weighed by evidence.
--- ---
@ -59,12 +53,7 @@ Participants are expected to:
* Help others reach correct and reproducible outcomes * Help others reach correct and reproducible outcomes
* Act in a way that improves the system as a whole * Act in a way that improves the system as a whole
Precision is learned.
New contributors are welcome. They are expected to grow into these standards. Existing contributors are expected to make that growth possible.
> **Wer behauptet, belegt.** > **Wer behauptet, belegt.**
> Whoever claims, proves. > Whoever claims, proves.
--- ---
@ -73,25 +62,22 @@ New contributors are welcome. They are expected to grow into these standards. Ex
The following is not allowed: The following is not allowed:
* Personal attacks, insults, harassment, or intimidation * Personal attacks, insults, harassment, intimidation
* Repeatedly derailing discussion away from Telemts purpose * Political discourse, propaganda, ideological conflict
* Off-topic or disruptive discussion
* Spam, flooding, or repeated low-quality input * Spam, flooding, or repeated low-quality input
* Misinformation presented as fact * Misinformation presented as fact
* Attempts to degrade, destabilize, or exhaust Telemt or its participants * Attempts to degrade or destabilize Telemt
* Use of Telemt or its spaces to enable harm * Use of Telemt or its space to enable harm
Telemt is not a venue for disputes that displace technical work.
Such discussions may be closed, removed, or redirected.
> **Störung ist kein Beitrag.** > **Störung ist kein Beitrag.**
> Disruption is not contribution. > Disruption is not contribution.
--- ---
## 5. Security and Misuse ## 5. Security and Misuse
Telemt is intended for responsible use. Telemt is intended for lawful and responsible use.
* Do not use it to plan, coordinate, or execute harm * Do not use it to plan, coordinate, or execute harm
* Do not publish vulnerabilities without responsible disclosure * Do not publish vulnerabilities without responsible disclosure
@ -100,24 +86,11 @@ Telemt is intended for responsible use.
Security is both technical and behavioral. Security is both technical and behavioral.
> **Verantwortung endet nicht am Code.** > **Verantwortung endet nicht am Code.**
> Responsibility does not end at the code. > Responsibility does not end at the code.
--- ---
## 6. Openness ## 6. Scope
Telemt is open to contributors of different backgrounds, experience levels, and working styles.
Standards are public, legible, and applied to the work itself.
Questions are welcome. Careful disagreement is welcome. Honest correction is welcome.
Gatekeeping by obscurity, status signaling, or hostility is not.
---
## 7. Scope
This Code of Conduct applies to all official spaces: This Code of Conduct applies to all official spaces:
@ -127,43 +100,31 @@ This Code of Conduct applies to all official spaces:
--- ---
## 8. Maintainer Stewardship ## 7. Enforcement
Maintainers are responsible for final decisions in matters of conduct, scope, and direction. Maintainers may act to preserve the integrity of Telemt:
This responsibility is stewardship: preserving continuity, protecting signal, maintaining standards, and keeping Telemt workable for others. * Remove content
* Lock discussions
* Reject contributions
* Restrict or ban participants
Judgment should be exercised with restraint, consistency, and institutional responsibility. Actions are taken to maintain function, continuity, and signal quality.
Not every decision requires extended debate.
Not every intervention requires public explanation.
All decisions are expected to serve the durability, clarity, and integrity of Telemt.
> **Ordnung ist Voraussetzung der Funktion.** > **Ordnung ist Voraussetzung der Funktion.**
> Order is the precondition of function. > Order is the precondition of function.
--- ---
## 9. Enforcement ## 8. Maintainer Authority
Maintainers may act to preserve the integrity of Telemt, including by: Maintainers have final authority in interpretation and enforcement.
* Removing content Authority exists to ensure continuity, consistency, and technical direction.
* Locking discussions
* Rejecting contributions
* Restricting or banning participants
Actions are taken to maintain function, continuity, and signal quality.
Where possible, correction is preferred to exclusion.
Where necessary, exclusion is preferred to decay.
--- ---
## 10. Final ## 9. Final
Telemt is built on discipline, structure, and shared intent. Telemt is built on discipline, structure, and shared intent.
@ -171,23 +132,19 @@ Signal over noise.
Facts over opinion. Facts over opinion.
Systems over rhetoric. Systems over rhetoric.
Work is collective. Work here is collective.
Outcomes are shared. Outcomes are shared.
Responsibility is distributed. Responsibility is distributed.
Precision is learned.
Rigor is expected.
Help is part of the work.
> **Ordnung ist Voraussetzung der Freiheit.** > **Ordnung ist Voraussetzung der Freiheit.**
If you contribute — contribute with care. If you contribute — contribute with precision.
If you speak — speak with substance. If you speak — speak with substance.
If you engage — engage constructively. If you engage — engage constructively.
--- ---
## 11. After All ## 10. After All
Systems outlive intentions. Systems outlive intentions.
@ -195,14 +152,12 @@ What is built will be used.
What is released will propagate. What is released will propagate.
What is maintained will define the future state. What is maintained will define the future state.
There is no neutral infrastructure, only infrastructure shaped well or poorly. There is no neutral infrastructure.
> **Jedes System trägt Verantwortung.** > **Jedes System trägt Verantwortung.**
> Every system carries responsibility. > Every system carries responsibility.
Stability requires discipline. Stability requires discipline.
Freedom requires structure. Freedom requires structure.
Trust requires honesty.
In the end, the system reflects its contributors. In the end, the system reflects its contributors.

View File

@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "3.3.24" version = "3.3.23"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -646,11 +646,6 @@ impl ProxyConfig {
"general.me_route_backpressure_base_timeout_ms must be > 0".to_string(), "general.me_route_backpressure_base_timeout_ms must be > 0".to_string(),
)); ));
} }
if config.general.me_route_backpressure_base_timeout_ms > 5000 {
return Err(ProxyError::Config(
"general.me_route_backpressure_base_timeout_ms must be within [1, 5000]".to_string(),
));
}
if config.general.me_route_backpressure_high_timeout_ms if config.general.me_route_backpressure_high_timeout_ms
< config.general.me_route_backpressure_base_timeout_ms < config.general.me_route_backpressure_base_timeout_ms
@ -659,11 +654,6 @@ impl ProxyConfig {
"general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(), "general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(),
)); ));
} }
if config.general.me_route_backpressure_high_timeout_ms > 5000 {
return Err(ProxyError::Config(
"general.me_route_backpressure_high_timeout_ms must be within [1, 5000]".to_string(),
));
}
if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) { if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) {
return Err(ProxyError::Config( return Err(ProxyError::Config(
@ -1678,47 +1668,6 @@ mod tests {
let _ = std::fs::remove_file(path_valid); let _ = std::fs::remove_file(path_valid);
} }
#[test]
fn me_route_backpressure_base_timeout_ms_out_of_range_is_rejected() {
let toml = r#"
[general]
me_route_backpressure_base_timeout_ms = 5001
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_route_backpressure_base_timeout_ms_out_of_range_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.me_route_backpressure_base_timeout_ms must be within [1, 5000]"));
let _ = std::fs::remove_file(path);
}
#[test]
fn me_route_backpressure_high_timeout_ms_out_of_range_is_rejected() {
let toml = r#"
[general]
me_route_backpressure_base_timeout_ms = 100
me_route_backpressure_high_timeout_ms = 5001
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_route_backpressure_high_timeout_ms_out_of_range_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.me_route_backpressure_high_timeout_ms must be within [1, 5000]"));
let _ = std::fs::remove_file(path);
}
#[test] #[test]
fn me_route_no_writer_wait_ms_out_of_range_is_rejected() { fn me_route_no_writer_wait_ms_out_of_range_is_rejected() {
let toml = r#" let toml = r#"

View File

@ -1692,57 +1692,6 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
} }
); );
let _ = writeln!(
out,
"# HELP telemt_me_writer_close_signal_drop_total Close-signal drops for already-removed ME writers"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_close_signal_drop_total counter");
let _ = writeln!(
out,
"telemt_me_writer_close_signal_drop_total {}",
if me_allows_normal {
stats.get_me_writer_close_signal_drop_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_close_signal_channel_full_total Close-signal drops caused by full writer command channels"
);
let _ = writeln!(
out,
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
);
let _ = writeln!(
out,
"telemt_me_writer_close_signal_channel_full_total {}",
if me_allows_normal {
stats.get_me_writer_close_signal_channel_full_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_draining_writers_reap_progress_total Draining-writer removals processed by reap cleanup"
);
let _ = writeln!(
out,
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
);
let _ = writeln!(
out,
"telemt_me_draining_writers_reap_progress_total {}",
if me_allows_normal {
stats.get_me_draining_writers_reap_progress_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals"); let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals");
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter"); let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter");
let _ = writeln!( let _ = writeln!(
@ -2175,13 +2124,6 @@ mod tests {
assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter")); assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"));
assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter")); assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter"));
assert!(output.contains(
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
));
assert!(output.contains(
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
));
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter")); assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter"));
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter")); assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter"));
assert!(output.contains( assert!(output.contains(

View File

@ -123,9 +123,6 @@ pub struct Stats {
pool_drain_soft_evict_total: AtomicU64, pool_drain_soft_evict_total: AtomicU64,
pool_drain_soft_evict_writer_total: AtomicU64, pool_drain_soft_evict_writer_total: AtomicU64,
pool_stale_pick_total: AtomicU64, pool_stale_pick_total: AtomicU64,
me_writer_close_signal_drop_total: AtomicU64,
me_writer_close_signal_channel_full_total: AtomicU64,
me_draining_writers_reap_progress_total: AtomicU64,
me_writer_removed_total: AtomicU64, me_writer_removed_total: AtomicU64,
me_writer_removed_unexpected_total: AtomicU64, me_writer_removed_unexpected_total: AtomicU64,
me_refill_triggered_total: AtomicU64, me_refill_triggered_total: AtomicU64,
@ -737,24 +734,6 @@ impl Stats {
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed); self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
} }
} }
pub fn increment_me_writer_close_signal_drop_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_close_signal_drop_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_close_signal_channel_full_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_close_signal_channel_full_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_draining_writers_reap_progress_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_draining_writers_reap_progress_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_removed_total(&self) { pub fn increment_me_writer_removed_total(&self) {
if self.telemetry_me_allows_debug() { if self.telemetry_me_allows_debug() {
self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed); self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed);
@ -1280,17 +1259,6 @@ impl Stats {
pub fn get_pool_stale_pick_total(&self) -> u64 { pub fn get_pool_stale_pick_total(&self) -> u64 {
self.pool_stale_pick_total.load(Ordering::Relaxed) self.pool_stale_pick_total.load(Ordering::Relaxed)
} }
pub fn get_me_writer_close_signal_drop_total(&self) -> u64 {
self.me_writer_close_signal_drop_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_close_signal_channel_full_total(&self) -> u64 {
self.me_writer_close_signal_channel_full_total
.load(Ordering::Relaxed)
}
pub fn get_me_draining_writers_reap_progress_total(&self) -> u64 {
self.me_draining_writers_reap_progress_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_removed_total(&self) -> u64 { pub fn get_me_writer_removed_total(&self) -> u64 {
self.me_writer_removed_total.load(Ordering::Relaxed) self.me_writer_removed_total.load(Ordering::Relaxed)
} }

View File

@ -314,8 +314,6 @@ pub(super) async fn reap_draining_writers(
} }
pool.stats.increment_pool_force_close_total(); pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer_id).await; pool.remove_writer_and_close_clients(writer_id).await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
closed_total = closed_total.saturating_add(1); closed_total = closed_total.saturating_add(1);
} }
for writer_id in empty_writer_ids { for writer_id in empty_writer_ids {
@ -326,8 +324,6 @@ pub(super) async fn reap_draining_writers(
continue; continue;
} }
pool.remove_writer_and_close_clients(writer_id).await; pool.remove_writer_and_close_clients(writer_id).await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
closed_total = closed_total.saturating_add(1); closed_total = closed_total.saturating_add(1);
} }

View File

@ -4,7 +4,6 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use bytes::Bytes;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -210,89 +209,6 @@ async fn reap_draining_writers_removes_empty_draining_writers() {
assert_eq!(current_writer_ids(&pool).await, vec![3]); assert_eq!(current_writer_ids(&pool).await, vec![3]);
} }
#[tokio::test]
async fn reap_draining_writers_does_not_block_on_stuck_writer_close_signal() {
let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs();
let (blocked_tx, blocked_rx) = mpsc::channel::<WriterCommand>(1);
assert!(
blocked_tx
.try_send(WriterCommand::Data(Bytes::from_static(b"stuck")))
.is_ok()
);
let blocked_rx_guard = tokio::spawn(async move {
let _hold_rx = blocked_rx;
tokio::time::sleep(Duration::from_secs(30)).await;
});
let blocked_writer_id = 90u64;
let blocked_writer = MeWriter {
id: blocked_writer_id,
addr: SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
4500 + blocked_writer_id as u16,
),
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
writer_dc: 2,
generation: 1,
contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())),
created_at: Instant::now() - Duration::from_secs(blocked_writer_id),
tx: blocked_tx.clone(),
cancel: CancellationToken::new(),
degraded: Arc::new(AtomicBool::new(false)),
rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)),
draining: Arc::new(AtomicBool::new(true)),
draining_started_at_epoch_secs: Arc::new(AtomicU64::new(
now_epoch_secs.saturating_sub(120),
)),
drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)),
allow_drain_fallback: Arc::new(AtomicBool::new(false)),
};
pool.writers.write().await.push(blocked_writer);
pool.registry
.register_writer(blocked_writer_id, blocked_tx)
.await;
pool.conn_count.fetch_add(1, Ordering::Relaxed);
insert_draining_writer(&pool, 91, now_epoch_secs.saturating_sub(110), 0, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
let reap_res = tokio::time::timeout(
Duration::from_millis(500),
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed),
)
.await;
blocked_rx_guard.abort();
assert!(reap_res.is_ok(), "reap should not block on close signal");
assert!(current_writer_ids(&pool).await.is_empty());
assert_eq!(pool.stats.get_me_writer_close_signal_drop_total(), 2);
assert_eq!(pool.stats.get_me_writer_close_signal_channel_full_total(), 1);
assert_eq!(pool.stats.get_me_draining_writers_reap_progress_total(), 2);
let activity = pool.registry.writer_activity_snapshot().await;
assert!(!activity.bound_clients_by_writer.contains_key(&blocked_writer_id));
assert!(!activity.bound_clients_by_writer.contains_key(&91));
let (probe_conn_id, _rx) = pool.registry.register().await;
assert!(
!pool.registry
.bind_writer(
probe_conn_id,
blocked_writer_id,
ConnMeta {
target_dc: 2,
client_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6400),
our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443),
proto_flags: 0,
},
)
.await
);
let _ = pool.registry.unregister(probe_conn_id).await;
}
#[tokio::test] #[tokio::test]
async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() { async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() {
let pool = make_pool(2).await; let pool = make_pool(2).await;

View File

@ -8,7 +8,6 @@ use bytes::Bytes;
use bytes::BytesMut; use bytes::BytesMut;
use rand::Rng; use rand::Rng;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
@ -492,9 +491,11 @@ impl MePool {
} }
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) { pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
// Full client cleanup now happens inside `registry.writer_lost` to keep let conns = self.remove_writer_only(writer_id).await;
// writer reap/remove paths strictly non-blocking per connection. for bound in conns {
let _ = self.remove_writer_only(writer_id).await; let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await;
let _ = self.registry.unregister(bound.conn_id).await;
}
} }
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> { async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> {
@ -524,11 +525,6 @@ impl MePool {
self.conn_count.fetch_sub(1, Ordering::Relaxed); self.conn_count.fetch_sub(1, Ordering::Relaxed);
} }
} }
// State invariant:
// - writer is removed from `self.writers` (pool visibility),
// - writer is removed from registry routing/binding maps via `writer_lost`.
// The close command below is only a best-effort accelerator for task shutdown.
// Cleanup progress must never depend on command-channel availability.
let conns = self.registry.writer_lost(writer_id).await; let conns = self.registry.writer_lost(writer_id).await;
{ {
let mut tracker = self.ping_tracker.lock().await; let mut tracker = self.ping_tracker.lock().await;
@ -536,25 +532,7 @@ impl MePool {
} }
self.rtt_stats.lock().await.remove(&writer_id); self.rtt_stats.lock().await.remove(&writer_id);
if let Some(tx) = close_tx { if let Some(tx) = close_tx {
match tx.try_send(WriterCommand::Close) { let _ = tx.send(WriterCommand::Close).await;
Ok(()) => {}
Err(TrySendError::Full(_)) => {
self.stats.increment_me_writer_close_signal_drop_total();
self.stats
.increment_me_writer_close_signal_channel_full_total();
debug!(
writer_id,
"Skipping close signal for removed writer: command channel is full"
);
}
Err(TrySendError::Closed(_)) => {
self.stats.increment_me_writer_close_signal_drop_total();
debug!(
writer_id,
"Skipping close signal for removed writer: command channel is closed"
);
}
}
} }
if trigger_refill if trigger_refill
&& let Some(addr) = removed_addr && let Some(addr) = removed_addr

View File

@ -8,7 +8,6 @@ use bytes::{Bytes, BytesMut};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::sync::{Mutex, mpsc}; use tokio::sync::{Mutex, mpsc};
use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn}; use tracing::{debug, trace, warn};
@ -174,12 +173,12 @@ pub(crate) async fn reader_loop(
} else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 { } else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
debug!(cid, "RPC_CLOSE_EXT from ME"); debug!(cid, "RPC_CLOSE_EXT from ME");
let _ = reg.route_nowait(cid, MeResponse::Close).await; reg.route(cid, MeResponse::Close).await;
reg.unregister(cid).await; reg.unregister(cid).await;
} else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 { } else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 {
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
debug!(cid, "RPC_CLOSE_CONN from ME"); debug!(cid, "RPC_CLOSE_CONN from ME");
let _ = reg.route_nowait(cid, MeResponse::Close).await; reg.route(cid, MeResponse::Close).await;
reg.unregister(cid).await; reg.unregister(cid).await;
} else if pt == RPC_PING_U32 && body.len() >= 8 { } else if pt == RPC_PING_U32 && body.len() >= 8 {
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap()); let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
@ -187,16 +186,14 @@ pub(crate) async fn reader_loop(
let mut pong = Vec::with_capacity(12); let mut pong = Vec::with_capacity(12);
pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes()); pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes());
pong.extend_from_slice(&ping_id.to_le_bytes()); pong.extend_from_slice(&ping_id.to_le_bytes());
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(pong))) { if tx
Ok(()) => {} .send(WriterCommand::DataAndFlush(Bytes::from(pong)))
Err(TrySendError::Full(_)) => { .await
debug!(ping_id, "PONG dropped: writer command channel is full"); .is_err()
} {
Err(TrySendError::Closed(_)) => { warn!("PONG send failed");
warn!("PONG send failed: writer channel closed");
break; break;
} }
}
} else if pt == RPC_PONG_U32 && body.len() >= 8 { } else if pt == RPC_PONG_U32 && body.len() >= 8 {
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap()); let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
stats.increment_me_keepalive_pong(); stats.increment_me_keepalive_pong();
@ -235,13 +232,6 @@ async fn send_close_conn(tx: &mpsc::Sender<WriterCommand>, conn_id: u64) {
let mut p = Vec::with_capacity(12); let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes()); p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes());
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
Ok(()) => {} let _ = tx.send(WriterCommand::DataAndFlush(Bytes::from(p))).await;
Err(TrySendError::Full(_)) => {
debug!(conn_id, "ME close_conn signal skipped: writer command channel is full");
}
Err(TrySendError::Closed(_)) => {
debug!(conn_id, "ME close_conn signal skipped: writer command channel is closed");
}
}
} }

View File

@ -169,7 +169,6 @@ impl ConnRegistry {
None None
} }
#[allow(dead_code)]
pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult { pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult {
let tx = { let tx = {
let inner = self.inner.read().await; let inner = self.inner.read().await;
@ -446,9 +445,6 @@ impl ConnRegistry {
} }
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> { pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
let mut close_txs = Vec::<mpsc::Sender<MeResponse>>::new();
let mut out = Vec::new();
{
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
inner.writers.remove(&writer_id); inner.writers.remove(&writer_id);
inner.last_meta_for_writer.remove(&writer_id); inner.last_meta_for_writer.remove(&writer_id);
@ -460,24 +456,19 @@ impl ConnRegistry {
.into_iter() .into_iter()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut out = Vec::new();
for conn_id in conns { for conn_id in conns {
if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) { if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) {
continue; continue;
} }
inner.writer_for_conn.remove(&conn_id); inner.writer_for_conn.remove(&conn_id);
if let Some(client_tx) = inner.map.remove(&conn_id) { if let Some(m) = inner.meta.get(&conn_id) {
close_txs.push(client_tx); out.push(BoundConn {
} conn_id,
if let Some(meta) = inner.meta.remove(&conn_id) { meta: m.clone(),
out.push(BoundConn { conn_id, meta }); });
} }
} }
}
for client_tx in close_txs {
let _ = client_tx.try_send(MeResponse::Close);
}
out out
} }
@ -500,7 +491,6 @@ impl ConnRegistry {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;
use super::ConnMeta; use super::ConnMeta;
use super::ConnRegistry; use super::ConnRegistry;
@ -673,39 +663,6 @@ mod tests {
assert!(registry.is_writer_empty(20).await); assert!(registry.is_writer_empty(20).await);
} }
#[tokio::test]
async fn writer_lost_removes_bound_conn_from_registry_and_signals_close() {
let registry = ConnRegistry::new();
let (conn_id, mut rx) = registry.register().await;
let (writer_tx, _writer_rx) = tokio::sync::mpsc::channel(8);
registry.register_writer(10, writer_tx).await;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
assert!(
registry
.bind_writer(
conn_id,
10,
ConnMeta {
target_dc: 2,
client_addr: addr,
our_addr: addr,
proto_flags: 0,
},
)
.await
);
let lost = registry.writer_lost(10).await;
assert_eq!(lost.len(), 1);
assert_eq!(lost[0].conn_id, conn_id);
assert!(registry.get_writer(conn_id).await.is_none());
assert!(registry.get_meta(conn_id).await.is_none());
assert_eq!(registry.unregister(conn_id).await, None);
let close = tokio::time::timeout(Duration::from_millis(50), rx.recv()).await;
assert!(matches!(close, Ok(Some(MeResponse::Close))));
}
#[tokio::test] #[tokio::test]
async fn bind_writer_rejects_unregistered_writer() { async fn bind_writer_rejects_unregistered_writer() {
let registry = ConnRegistry::new(); let registry = ConnRegistry::new();

View File

@ -643,20 +643,14 @@ impl MePool {
let mut p = Vec::with_capacity(12); let mut p = Vec::with_capacity(12);
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
p.extend_from_slice(&conn_id.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes());
match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) { if w.tx
Ok(()) => {} .send(WriterCommand::DataAndFlush(Bytes::from(p)))
Err(TrySendError::Full(_)) => { .await
debug!( .is_err()
conn_id, {
writer_id = w.writer_id,
"ME close skipped: writer command channel is full"
);
}
Err(TrySendError::Closed(_)) => {
debug!("ME close write failed"); debug!("ME close write failed");
self.remove_writer_and_close_clients(w.writer_id).await; self.remove_writer_and_close_clients(w.writer_id).await;
} }
}
} else { } else {
debug!(conn_id, "ME close skipped (writer missing)"); debug!(conn_id, "ME close skipped (writer missing)");
} }
@ -672,12 +666,8 @@ impl MePool {
p.extend_from_slice(&conn_id.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes());
match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) { match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
Ok(()) => {} Ok(()) => {}
Err(TrySendError::Full(_)) => { Err(TrySendError::Full(cmd)) => {
debug!( let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await;
conn_id,
writer_id = w.writer_id,
"ME close_conn skipped: writer command channel is full"
);
} }
Err(TrySendError::Closed(_)) => { Err(TrySendError::Closed(_)) => {
debug!(conn_id, "ME close_conn skipped: writer channel closed"); debug!(conn_id, "ME close_conn skipped: writer channel closed");