mirror of https://github.com/telemt/telemt.git
Compare commits
1 Commits
a29cde0205
...
8c884fe891
| Author | SHA1 | Date |
|---|---|---|
|
|
8c884fe891 |
|
|
@ -1,19 +1,17 @@
|
||||||
# Code of Conduct
|
# TELEMT Code of Conduct
|
||||||
|
|
||||||
## 1. Purpose
|
## 1. Purpose
|
||||||
|
|
||||||
Telemt exists to solve technical problems.
|
Telemt exists to solve technical problems.
|
||||||
|
|
||||||
Telemt is open to contributors who want to learn, improve and build meaningful systems together.
|
It is not a platform for ideology, politics, or personal agendas.
|
||||||
|
|
||||||
It is a place for building, testing, reasoning, documenting, and improving systems.
|
All interaction here is defined by systems, constraints, and outcomes.
|
||||||
|
|
||||||
Discussions that advance this work are in scope. Discussions that divert it are not.
|
Technology has consequences.
|
||||||
|
Responsibility is inherent.
|
||||||
Technology has consequences. Responsibility is inherent.
|
|
||||||
|
|
||||||
> **Zweck bestimmt die Form.**
|
> **Zweck bestimmt die Form.**
|
||||||
|
|
||||||
> Purpose defines form.
|
> Purpose defines form.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
@ -26,24 +24,20 @@ Technology has consequences. Responsibility is inherent.
|
||||||
* **Clarity over noise**
|
* **Clarity over noise**
|
||||||
Communication is structured, concise, and relevant.
|
Communication is structured, concise, and relevant.
|
||||||
|
|
||||||
* **Openness with standards**
|
* **Independence**
|
||||||
Participation is open. The work remains disciplined.
|
Telemt does not represent any state, ideology, or organization.
|
||||||
|
|
||||||
* **Independence of judgment**
|
* **Open participation**
|
||||||
Claims are evaluated on technical merit, not affiliation or posture.
|
Access is open. Standards are not.
|
||||||
|
|
||||||
* **Responsibility over capability**
|
* **Responsibility over capability**
|
||||||
Capability does not justify careless use.
|
Capability does not justify careless use.
|
||||||
|
|
||||||
* **Cooperation over friction**
|
* **Cooperation over friction**
|
||||||
Progress depends on coordination, mutual support, and honest review.
|
Progress is achieved through coordination and mutual support.
|
||||||
|
|
||||||
* **Good intent, rigorous method**
|
> **Fakten sind nicht verhandelbar.**
|
||||||
Assume good intent, but require rigor.
|
> Facts are not negotiable.
|
||||||
|
|
||||||
> **Aussagen gelten nach ihrer Begründung.**
|
|
||||||
|
|
||||||
> Claims are weighed by evidence.
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|
@ -59,12 +53,7 @@ Participants are expected to:
|
||||||
* Help others reach correct and reproducible outcomes
|
* Help others reach correct and reproducible outcomes
|
||||||
* Act in a way that improves the system as a whole
|
* Act in a way that improves the system as a whole
|
||||||
|
|
||||||
Precision is learned.
|
|
||||||
|
|
||||||
New contributors are welcome. They are expected to grow into these standards. Existing contributors are expected to make that growth possible.
|
|
||||||
|
|
||||||
> **Wer behauptet, belegt.**
|
> **Wer behauptet, belegt.**
|
||||||
|
|
||||||
> Whoever claims, proves.
|
> Whoever claims, proves.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
@ -73,25 +62,22 @@ New contributors are welcome. They are expected to grow into these standards. Ex
|
||||||
|
|
||||||
The following is not allowed:
|
The following is not allowed:
|
||||||
|
|
||||||
* Personal attacks, insults, harassment, or intimidation
|
* Personal attacks, insults, harassment, intimidation
|
||||||
* Repeatedly derailing discussion away from Telemt’s purpose
|
* Political discourse, propaganda, ideological conflict
|
||||||
|
* Off-topic or disruptive discussion
|
||||||
* Spam, flooding, or repeated low-quality input
|
* Spam, flooding, or repeated low-quality input
|
||||||
* Misinformation presented as fact
|
* Misinformation presented as fact
|
||||||
* Attempts to degrade, destabilize, or exhaust Telemt or its participants
|
* Attempts to degrade or destabilize Telemt
|
||||||
* Use of Telemt or its spaces to enable harm
|
* Use of Telemt or its space to enable harm
|
||||||
|
|
||||||
Telemt is not a venue for disputes that displace technical work.
|
|
||||||
Such discussions may be closed, removed, or redirected.
|
|
||||||
|
|
||||||
> **Störung ist kein Beitrag.**
|
> **Störung ist kein Beitrag.**
|
||||||
|
|
||||||
> Disruption is not contribution.
|
> Disruption is not contribution.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 5. Security and Misuse
|
## 5. Security and Misuse
|
||||||
|
|
||||||
Telemt is intended for responsible use.
|
Telemt is intended for lawful and responsible use.
|
||||||
|
|
||||||
* Do not use it to plan, coordinate, or execute harm
|
* Do not use it to plan, coordinate, or execute harm
|
||||||
* Do not publish vulnerabilities without responsible disclosure
|
* Do not publish vulnerabilities without responsible disclosure
|
||||||
|
|
@ -100,24 +86,11 @@ Telemt is intended for responsible use.
|
||||||
Security is both technical and behavioral.
|
Security is both technical and behavioral.
|
||||||
|
|
||||||
> **Verantwortung endet nicht am Code.**
|
> **Verantwortung endet nicht am Code.**
|
||||||
|
|
||||||
> Responsibility does not end at the code.
|
> Responsibility does not end at the code.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 6. Openness
|
## 6. Scope
|
||||||
|
|
||||||
Telemt is open to contributors of different backgrounds, experience levels, and working styles.
|
|
||||||
|
|
||||||
Standards are public, legible, and applied to the work itself.
|
|
||||||
|
|
||||||
Questions are welcome. Careful disagreement is welcome. Honest correction is welcome.
|
|
||||||
|
|
||||||
Gatekeeping by obscurity, status signaling, or hostility is not.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 7. Scope
|
|
||||||
|
|
||||||
This Code of Conduct applies to all official spaces:
|
This Code of Conduct applies to all official spaces:
|
||||||
|
|
||||||
|
|
@ -127,43 +100,31 @@ This Code of Conduct applies to all official spaces:
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 8. Maintainer Stewardship
|
## 7. Enforcement
|
||||||
|
|
||||||
Maintainers are responsible for final decisions in matters of conduct, scope, and direction.
|
Maintainers may act to preserve the integrity of Telemt:
|
||||||
|
|
||||||
This responsibility is stewardship: preserving continuity, protecting signal, maintaining standards, and keeping Telemt workable for others.
|
* Remove content
|
||||||
|
* Lock discussions
|
||||||
|
* Reject contributions
|
||||||
|
* Restrict or ban participants
|
||||||
|
|
||||||
Judgment should be exercised with restraint, consistency, and institutional responsibility.
|
Actions are taken to maintain function, continuity, and signal quality.
|
||||||
|
|
||||||
Not every decision requires extended debate.
|
|
||||||
Not every intervention requires public explanation.
|
|
||||||
|
|
||||||
All decisions are expected to serve the durability, clarity, and integrity of Telemt.
|
|
||||||
|
|
||||||
> **Ordnung ist Voraussetzung der Funktion.**
|
> **Ordnung ist Voraussetzung der Funktion.**
|
||||||
|
|
||||||
> Order is the precondition of function.
|
> Order is the precondition of function.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 9. Enforcement
|
## 8. Maintainer Authority
|
||||||
|
|
||||||
Maintainers may act to preserve the integrity of Telemt, including by:
|
Maintainers have final authority in interpretation and enforcement.
|
||||||
|
|
||||||
* Removing content
|
Authority exists to ensure continuity, consistency, and technical direction.
|
||||||
* Locking discussions
|
|
||||||
* Rejecting contributions
|
|
||||||
* Restricting or banning participants
|
|
||||||
|
|
||||||
Actions are taken to maintain function, continuity, and signal quality.
|
|
||||||
|
|
||||||
Where possible, correction is preferred to exclusion.
|
|
||||||
|
|
||||||
Where necessary, exclusion is preferred to decay.
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 10. Final
|
## 9. Final
|
||||||
|
|
||||||
Telemt is built on discipline, structure, and shared intent.
|
Telemt is built on discipline, structure, and shared intent.
|
||||||
|
|
||||||
|
|
@ -171,23 +132,19 @@ Signal over noise.
|
||||||
Facts over opinion.
|
Facts over opinion.
|
||||||
Systems over rhetoric.
|
Systems over rhetoric.
|
||||||
|
|
||||||
Work is collective.
|
Work here is collective.
|
||||||
Outcomes are shared.
|
Outcomes are shared.
|
||||||
Responsibility is distributed.
|
Responsibility is distributed.
|
||||||
|
|
||||||
Precision is learned.
|
|
||||||
Rigor is expected.
|
|
||||||
Help is part of the work.
|
|
||||||
|
|
||||||
> **Ordnung ist Voraussetzung der Freiheit.**
|
> **Ordnung ist Voraussetzung der Freiheit.**
|
||||||
|
|
||||||
If you contribute — contribute with care.
|
If you contribute — contribute with precision.
|
||||||
If you speak — speak with substance.
|
If you speak — speak with substance.
|
||||||
If you engage — engage constructively.
|
If you engage — engage constructively.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 11. After All
|
## 10. After All
|
||||||
|
|
||||||
Systems outlive intentions.
|
Systems outlive intentions.
|
||||||
|
|
||||||
|
|
@ -195,14 +152,12 @@ What is built will be used.
|
||||||
What is released will propagate.
|
What is released will propagate.
|
||||||
What is maintained will define the future state.
|
What is maintained will define the future state.
|
||||||
|
|
||||||
There is no neutral infrastructure, only infrastructure shaped well or poorly.
|
There is no neutral infrastructure.
|
||||||
|
|
||||||
> **Jedes System trägt Verantwortung.**
|
> **Jedes System trägt Verantwortung.**
|
||||||
|
|
||||||
> Every system carries responsibility.
|
> Every system carries responsibility.
|
||||||
|
|
||||||
Stability requires discipline.
|
Stability requires discipline.
|
||||||
Freedom requires structure.
|
Freedom requires structure.
|
||||||
Trust requires honesty.
|
|
||||||
|
|
||||||
In the end, the system reflects its contributors.
|
In the end, the system reflects its contributors.
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "telemt"
|
name = "telemt"
|
||||||
version = "3.3.24"
|
version = "3.3.23"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|
|
||||||
|
|
@ -646,11 +646,6 @@ impl ProxyConfig {
|
||||||
"general.me_route_backpressure_base_timeout_ms must be > 0".to_string(),
|
"general.me_route_backpressure_base_timeout_ms must be > 0".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
if config.general.me_route_backpressure_base_timeout_ms > 5000 {
|
|
||||||
return Err(ProxyError::Config(
|
|
||||||
"general.me_route_backpressure_base_timeout_ms must be within [1, 5000]".to_string(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.general.me_route_backpressure_high_timeout_ms
|
if config.general.me_route_backpressure_high_timeout_ms
|
||||||
< config.general.me_route_backpressure_base_timeout_ms
|
< config.general.me_route_backpressure_base_timeout_ms
|
||||||
|
|
@ -659,11 +654,6 @@ impl ProxyConfig {
|
||||||
"general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(),
|
"general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
if config.general.me_route_backpressure_high_timeout_ms > 5000 {
|
|
||||||
return Err(ProxyError::Config(
|
|
||||||
"general.me_route_backpressure_high_timeout_ms must be within [1, 5000]".to_string(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) {
|
if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) {
|
||||||
return Err(ProxyError::Config(
|
return Err(ProxyError::Config(
|
||||||
|
|
@ -1678,47 +1668,6 @@ mod tests {
|
||||||
let _ = std::fs::remove_file(path_valid);
|
let _ = std::fs::remove_file(path_valid);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn me_route_backpressure_base_timeout_ms_out_of_range_is_rejected() {
|
|
||||||
let toml = r#"
|
|
||||||
[general]
|
|
||||||
me_route_backpressure_base_timeout_ms = 5001
|
|
||||||
|
|
||||||
[censorship]
|
|
||||||
tls_domain = "example.com"
|
|
||||||
|
|
||||||
[access.users]
|
|
||||||
user = "00000000000000000000000000000000"
|
|
||||||
"#;
|
|
||||||
let dir = std::env::temp_dir();
|
|
||||||
let path = dir.join("telemt_me_route_backpressure_base_timeout_ms_out_of_range_test.toml");
|
|
||||||
std::fs::write(&path, toml).unwrap();
|
|
||||||
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
|
||||||
assert!(err.contains("general.me_route_backpressure_base_timeout_ms must be within [1, 5000]"));
|
|
||||||
let _ = std::fs::remove_file(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn me_route_backpressure_high_timeout_ms_out_of_range_is_rejected() {
|
|
||||||
let toml = r#"
|
|
||||||
[general]
|
|
||||||
me_route_backpressure_base_timeout_ms = 100
|
|
||||||
me_route_backpressure_high_timeout_ms = 5001
|
|
||||||
|
|
||||||
[censorship]
|
|
||||||
tls_domain = "example.com"
|
|
||||||
|
|
||||||
[access.users]
|
|
||||||
user = "00000000000000000000000000000000"
|
|
||||||
"#;
|
|
||||||
let dir = std::env::temp_dir();
|
|
||||||
let path = dir.join("telemt_me_route_backpressure_high_timeout_ms_out_of_range_test.toml");
|
|
||||||
std::fs::write(&path, toml).unwrap();
|
|
||||||
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
|
||||||
assert!(err.contains("general.me_route_backpressure_high_timeout_ms must be within [1, 5000]"));
|
|
||||||
let _ = std::fs::remove_file(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn me_route_no_writer_wait_ms_out_of_range_is_rejected() {
|
fn me_route_no_writer_wait_ms_out_of_range_is_rejected() {
|
||||||
let toml = r#"
|
let toml = r#"
|
||||||
|
|
|
||||||
|
|
@ -1692,57 +1692,6 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"# HELP telemt_me_writer_close_signal_drop_total Close-signal drops for already-removed ME writers"
|
|
||||||
);
|
|
||||||
let _ = writeln!(out, "# TYPE telemt_me_writer_close_signal_drop_total counter");
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_me_writer_close_signal_drop_total {}",
|
|
||||||
if me_allows_normal {
|
|
||||||
stats.get_me_writer_close_signal_drop_total()
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"# HELP telemt_me_writer_close_signal_channel_full_total Close-signal drops caused by full writer command channels"
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_me_writer_close_signal_channel_full_total {}",
|
|
||||||
if me_allows_normal {
|
|
||||||
stats.get_me_writer_close_signal_channel_full_total()
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"# HELP telemt_me_draining_writers_reap_progress_total Draining-writer removals processed by reap cleanup"
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
|
|
||||||
);
|
|
||||||
let _ = writeln!(
|
|
||||||
out,
|
|
||||||
"telemt_me_draining_writers_reap_progress_total {}",
|
|
||||||
if me_allows_normal {
|
|
||||||
stats.get_me_draining_writers_reap_progress_total()
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals");
|
let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals");
|
||||||
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter");
|
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter");
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
|
|
@ -2175,13 +2124,6 @@ mod tests {
|
||||||
assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"));
|
assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"));
|
||||||
assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter"));
|
assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter"));
|
||||||
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
|
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
|
||||||
assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter"));
|
|
||||||
assert!(output.contains(
|
|
||||||
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
|
|
||||||
));
|
|
||||||
assert!(output.contains(
|
|
||||||
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
|
|
||||||
));
|
|
||||||
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter"));
|
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter"));
|
||||||
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter"));
|
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter"));
|
||||||
assert!(output.contains(
|
assert!(output.contains(
|
||||||
|
|
|
||||||
|
|
@ -123,9 +123,6 @@ pub struct Stats {
|
||||||
pool_drain_soft_evict_total: AtomicU64,
|
pool_drain_soft_evict_total: AtomicU64,
|
||||||
pool_drain_soft_evict_writer_total: AtomicU64,
|
pool_drain_soft_evict_writer_total: AtomicU64,
|
||||||
pool_stale_pick_total: AtomicU64,
|
pool_stale_pick_total: AtomicU64,
|
||||||
me_writer_close_signal_drop_total: AtomicU64,
|
|
||||||
me_writer_close_signal_channel_full_total: AtomicU64,
|
|
||||||
me_draining_writers_reap_progress_total: AtomicU64,
|
|
||||||
me_writer_removed_total: AtomicU64,
|
me_writer_removed_total: AtomicU64,
|
||||||
me_writer_removed_unexpected_total: AtomicU64,
|
me_writer_removed_unexpected_total: AtomicU64,
|
||||||
me_refill_triggered_total: AtomicU64,
|
me_refill_triggered_total: AtomicU64,
|
||||||
|
|
@ -737,24 +734,6 @@ impl Stats {
|
||||||
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
|
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn increment_me_writer_close_signal_drop_total(&self) {
|
|
||||||
if self.telemetry_me_allows_normal() {
|
|
||||||
self.me_writer_close_signal_drop_total
|
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub fn increment_me_writer_close_signal_channel_full_total(&self) {
|
|
||||||
if self.telemetry_me_allows_normal() {
|
|
||||||
self.me_writer_close_signal_channel_full_total
|
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub fn increment_me_draining_writers_reap_progress_total(&self) {
|
|
||||||
if self.telemetry_me_allows_normal() {
|
|
||||||
self.me_draining_writers_reap_progress_total
|
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub fn increment_me_writer_removed_total(&self) {
|
pub fn increment_me_writer_removed_total(&self) {
|
||||||
if self.telemetry_me_allows_debug() {
|
if self.telemetry_me_allows_debug() {
|
||||||
self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed);
|
self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
@ -1280,17 +1259,6 @@ impl Stats {
|
||||||
pub fn get_pool_stale_pick_total(&self) -> u64 {
|
pub fn get_pool_stale_pick_total(&self) -> u64 {
|
||||||
self.pool_stale_pick_total.load(Ordering::Relaxed)
|
self.pool_stale_pick_total.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
pub fn get_me_writer_close_signal_drop_total(&self) -> u64 {
|
|
||||||
self.me_writer_close_signal_drop_total.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
pub fn get_me_writer_close_signal_channel_full_total(&self) -> u64 {
|
|
||||||
self.me_writer_close_signal_channel_full_total
|
|
||||||
.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
pub fn get_me_draining_writers_reap_progress_total(&self) -> u64 {
|
|
||||||
self.me_draining_writers_reap_progress_total
|
|
||||||
.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
pub fn get_me_writer_removed_total(&self) -> u64 {
|
pub fn get_me_writer_removed_total(&self) -> u64 {
|
||||||
self.me_writer_removed_total.load(Ordering::Relaxed)
|
self.me_writer_removed_total.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -314,8 +314,6 @@ pub(super) async fn reap_draining_writers(
|
||||||
}
|
}
|
||||||
pool.stats.increment_pool_force_close_total();
|
pool.stats.increment_pool_force_close_total();
|
||||||
pool.remove_writer_and_close_clients(writer_id).await;
|
pool.remove_writer_and_close_clients(writer_id).await;
|
||||||
pool.stats
|
|
||||||
.increment_me_draining_writers_reap_progress_total();
|
|
||||||
closed_total = closed_total.saturating_add(1);
|
closed_total = closed_total.saturating_add(1);
|
||||||
}
|
}
|
||||||
for writer_id in empty_writer_ids {
|
for writer_id in empty_writer_ids {
|
||||||
|
|
@ -326,8 +324,6 @@ pub(super) async fn reap_draining_writers(
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
pool.remove_writer_and_close_clients(writer_id).await;
|
pool.remove_writer_and_close_clients(writer_id).await;
|
||||||
pool.stats
|
|
||||||
.increment_me_draining_writers_reap_progress_total();
|
|
||||||
closed_total = closed_total.saturating_add(1);
|
closed_total = closed_total.saturating_add(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@ use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use bytes::Bytes;
|
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
|
|
@ -210,89 +209,6 @@ async fn reap_draining_writers_removes_empty_draining_writers() {
|
||||||
assert_eq!(current_writer_ids(&pool).await, vec![3]);
|
assert_eq!(current_writer_ids(&pool).await, vec![3]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn reap_draining_writers_does_not_block_on_stuck_writer_close_signal() {
|
|
||||||
let pool = make_pool(128).await;
|
|
||||||
let now_epoch_secs = MePool::now_epoch_secs();
|
|
||||||
|
|
||||||
let (blocked_tx, blocked_rx) = mpsc::channel::<WriterCommand>(1);
|
|
||||||
assert!(
|
|
||||||
blocked_tx
|
|
||||||
.try_send(WriterCommand::Data(Bytes::from_static(b"stuck")))
|
|
||||||
.is_ok()
|
|
||||||
);
|
|
||||||
let blocked_rx_guard = tokio::spawn(async move {
|
|
||||||
let _hold_rx = blocked_rx;
|
|
||||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
|
||||||
});
|
|
||||||
|
|
||||||
let blocked_writer_id = 90u64;
|
|
||||||
let blocked_writer = MeWriter {
|
|
||||||
id: blocked_writer_id,
|
|
||||||
addr: SocketAddr::new(
|
|
||||||
IpAddr::V4(Ipv4Addr::LOCALHOST),
|
|
||||||
4500 + blocked_writer_id as u16,
|
|
||||||
),
|
|
||||||
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
|
|
||||||
writer_dc: 2,
|
|
||||||
generation: 1,
|
|
||||||
contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())),
|
|
||||||
created_at: Instant::now() - Duration::from_secs(blocked_writer_id),
|
|
||||||
tx: blocked_tx.clone(),
|
|
||||||
cancel: CancellationToken::new(),
|
|
||||||
degraded: Arc::new(AtomicBool::new(false)),
|
|
||||||
rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)),
|
|
||||||
draining: Arc::new(AtomicBool::new(true)),
|
|
||||||
draining_started_at_epoch_secs: Arc::new(AtomicU64::new(
|
|
||||||
now_epoch_secs.saturating_sub(120),
|
|
||||||
)),
|
|
||||||
drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)),
|
|
||||||
allow_drain_fallback: Arc::new(AtomicBool::new(false)),
|
|
||||||
};
|
|
||||||
pool.writers.write().await.push(blocked_writer);
|
|
||||||
pool.registry
|
|
||||||
.register_writer(blocked_writer_id, blocked_tx)
|
|
||||||
.await;
|
|
||||||
pool.conn_count.fetch_add(1, Ordering::Relaxed);
|
|
||||||
|
|
||||||
insert_draining_writer(&pool, 91, now_epoch_secs.saturating_sub(110), 0, 0).await;
|
|
||||||
|
|
||||||
let mut warn_next_allowed = HashMap::new();
|
|
||||||
let mut soft_evict_next_allowed = HashMap::new();
|
|
||||||
|
|
||||||
let reap_res = tokio::time::timeout(
|
|
||||||
Duration::from_millis(500),
|
|
||||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
blocked_rx_guard.abort();
|
|
||||||
|
|
||||||
assert!(reap_res.is_ok(), "reap should not block on close signal");
|
|
||||||
assert!(current_writer_ids(&pool).await.is_empty());
|
|
||||||
assert_eq!(pool.stats.get_me_writer_close_signal_drop_total(), 2);
|
|
||||||
assert_eq!(pool.stats.get_me_writer_close_signal_channel_full_total(), 1);
|
|
||||||
assert_eq!(pool.stats.get_me_draining_writers_reap_progress_total(), 2);
|
|
||||||
let activity = pool.registry.writer_activity_snapshot().await;
|
|
||||||
assert!(!activity.bound_clients_by_writer.contains_key(&blocked_writer_id));
|
|
||||||
assert!(!activity.bound_clients_by_writer.contains_key(&91));
|
|
||||||
let (probe_conn_id, _rx) = pool.registry.register().await;
|
|
||||||
assert!(
|
|
||||||
!pool.registry
|
|
||||||
.bind_writer(
|
|
||||||
probe_conn_id,
|
|
||||||
blocked_writer_id,
|
|
||||||
ConnMeta {
|
|
||||||
target_dc: 2,
|
|
||||||
client_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6400),
|
|
||||||
our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443),
|
|
||||||
proto_flags: 0,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
);
|
|
||||||
let _ = pool.registry.unregister(probe_conn_id).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() {
|
async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() {
|
||||||
let pool = make_pool(2).await;
|
let pool = make_pool(2).await;
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,6 @@ use bytes::Bytes;
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::sync::mpsc::error::TrySendError;
|
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
|
|
@ -492,9 +491,11 @@ impl MePool {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
|
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
|
||||||
// Full client cleanup now happens inside `registry.writer_lost` to keep
|
let conns = self.remove_writer_only(writer_id).await;
|
||||||
// writer reap/remove paths strictly non-blocking per connection.
|
for bound in conns {
|
||||||
let _ = self.remove_writer_only(writer_id).await;
|
let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await;
|
||||||
|
let _ = self.registry.unregister(bound.conn_id).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> {
|
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> {
|
||||||
|
|
@ -524,11 +525,6 @@ impl MePool {
|
||||||
self.conn_count.fetch_sub(1, Ordering::Relaxed);
|
self.conn_count.fetch_sub(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// State invariant:
|
|
||||||
// - writer is removed from `self.writers` (pool visibility),
|
|
||||||
// - writer is removed from registry routing/binding maps via `writer_lost`.
|
|
||||||
// The close command below is only a best-effort accelerator for task shutdown.
|
|
||||||
// Cleanup progress must never depend on command-channel availability.
|
|
||||||
let conns = self.registry.writer_lost(writer_id).await;
|
let conns = self.registry.writer_lost(writer_id).await;
|
||||||
{
|
{
|
||||||
let mut tracker = self.ping_tracker.lock().await;
|
let mut tracker = self.ping_tracker.lock().await;
|
||||||
|
|
@ -536,25 +532,7 @@ impl MePool {
|
||||||
}
|
}
|
||||||
self.rtt_stats.lock().await.remove(&writer_id);
|
self.rtt_stats.lock().await.remove(&writer_id);
|
||||||
if let Some(tx) = close_tx {
|
if let Some(tx) = close_tx {
|
||||||
match tx.try_send(WriterCommand::Close) {
|
let _ = tx.send(WriterCommand::Close).await;
|
||||||
Ok(()) => {}
|
|
||||||
Err(TrySendError::Full(_)) => {
|
|
||||||
self.stats.increment_me_writer_close_signal_drop_total();
|
|
||||||
self.stats
|
|
||||||
.increment_me_writer_close_signal_channel_full_total();
|
|
||||||
debug!(
|
|
||||||
writer_id,
|
|
||||||
"Skipping close signal for removed writer: command channel is full"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(TrySendError::Closed(_)) => {
|
|
||||||
self.stats.increment_me_writer_close_signal_drop_total();
|
|
||||||
debug!(
|
|
||||||
writer_id,
|
|
||||||
"Skipping close signal for removed writer: command channel is closed"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if trigger_refill
|
if trigger_refill
|
||||||
&& let Some(addr) = removed_addr
|
&& let Some(addr) = removed_addr
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,6 @@ use bytes::{Bytes, BytesMut};
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio::sync::{Mutex, mpsc};
|
use tokio::sync::{Mutex, mpsc};
|
||||||
use tokio::sync::mpsc::error::TrySendError;
|
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{debug, trace, warn};
|
use tracing::{debug, trace, warn};
|
||||||
|
|
||||||
|
|
@ -174,12 +173,12 @@ pub(crate) async fn reader_loop(
|
||||||
} else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 {
|
} else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 {
|
||||||
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
|
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
|
||||||
debug!(cid, "RPC_CLOSE_EXT from ME");
|
debug!(cid, "RPC_CLOSE_EXT from ME");
|
||||||
let _ = reg.route_nowait(cid, MeResponse::Close).await;
|
reg.route(cid, MeResponse::Close).await;
|
||||||
reg.unregister(cid).await;
|
reg.unregister(cid).await;
|
||||||
} else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 {
|
} else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 {
|
||||||
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
|
let cid = u64::from_le_bytes(body[0..8].try_into().unwrap());
|
||||||
debug!(cid, "RPC_CLOSE_CONN from ME");
|
debug!(cid, "RPC_CLOSE_CONN from ME");
|
||||||
let _ = reg.route_nowait(cid, MeResponse::Close).await;
|
reg.route(cid, MeResponse::Close).await;
|
||||||
reg.unregister(cid).await;
|
reg.unregister(cid).await;
|
||||||
} else if pt == RPC_PING_U32 && body.len() >= 8 {
|
} else if pt == RPC_PING_U32 && body.len() >= 8 {
|
||||||
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
|
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
|
||||||
|
|
@ -187,15 +186,13 @@ pub(crate) async fn reader_loop(
|
||||||
let mut pong = Vec::with_capacity(12);
|
let mut pong = Vec::with_capacity(12);
|
||||||
pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes());
|
pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes());
|
||||||
pong.extend_from_slice(&ping_id.to_le_bytes());
|
pong.extend_from_slice(&ping_id.to_le_bytes());
|
||||||
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(pong))) {
|
if tx
|
||||||
Ok(()) => {}
|
.send(WriterCommand::DataAndFlush(Bytes::from(pong)))
|
||||||
Err(TrySendError::Full(_)) => {
|
.await
|
||||||
debug!(ping_id, "PONG dropped: writer command channel is full");
|
.is_err()
|
||||||
}
|
{
|
||||||
Err(TrySendError::Closed(_)) => {
|
warn!("PONG send failed");
|
||||||
warn!("PONG send failed: writer channel closed");
|
break;
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else if pt == RPC_PONG_U32 && body.len() >= 8 {
|
} else if pt == RPC_PONG_U32 && body.len() >= 8 {
|
||||||
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
|
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
|
||||||
|
|
@ -235,13 +232,6 @@ async fn send_close_conn(tx: &mpsc::Sender<WriterCommand>, conn_id: u64) {
|
||||||
let mut p = Vec::with_capacity(12);
|
let mut p = Vec::with_capacity(12);
|
||||||
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
|
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
|
||||||
p.extend_from_slice(&conn_id.to_le_bytes());
|
p.extend_from_slice(&conn_id.to_le_bytes());
|
||||||
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
|
|
||||||
Ok(()) => {}
|
let _ = tx.send(WriterCommand::DataAndFlush(Bytes::from(p))).await;
|
||||||
Err(TrySendError::Full(_)) => {
|
|
||||||
debug!(conn_id, "ME close_conn signal skipped: writer command channel is full");
|
|
||||||
}
|
|
||||||
Err(TrySendError::Closed(_)) => {
|
|
||||||
debug!(conn_id, "ME close_conn signal skipped: writer command channel is closed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -169,7 +169,6 @@ impl ConnRegistry {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult {
|
pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult {
|
||||||
let tx = {
|
let tx = {
|
||||||
let inner = self.inner.read().await;
|
let inner = self.inner.read().await;
|
||||||
|
|
@ -446,38 +445,30 @@ impl ConnRegistry {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
|
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
|
||||||
let mut close_txs = Vec::<mpsc::Sender<MeResponse>>::new();
|
let mut inner = self.inner.write().await;
|
||||||
let mut out = Vec::new();
|
inner.writers.remove(&writer_id);
|
||||||
{
|
inner.last_meta_for_writer.remove(&writer_id);
|
||||||
let mut inner = self.inner.write().await;
|
inner.writer_idle_since_epoch_secs.remove(&writer_id);
|
||||||
inner.writers.remove(&writer_id);
|
let conns = inner
|
||||||
inner.last_meta_for_writer.remove(&writer_id);
|
.conns_for_writer
|
||||||
inner.writer_idle_since_epoch_secs.remove(&writer_id);
|
.remove(&writer_id)
|
||||||
let conns = inner
|
.unwrap_or_default()
|
||||||
.conns_for_writer
|
.into_iter()
|
||||||
.remove(&writer_id)
|
.collect::<Vec<_>>();
|
||||||
.unwrap_or_default()
|
|
||||||
.into_iter()
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
for conn_id in conns {
|
let mut out = Vec::new();
|
||||||
if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) {
|
for conn_id in conns {
|
||||||
continue;
|
if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) {
|
||||||
}
|
continue;
|
||||||
inner.writer_for_conn.remove(&conn_id);
|
}
|
||||||
if let Some(client_tx) = inner.map.remove(&conn_id) {
|
inner.writer_for_conn.remove(&conn_id);
|
||||||
close_txs.push(client_tx);
|
if let Some(m) = inner.meta.get(&conn_id) {
|
||||||
}
|
out.push(BoundConn {
|
||||||
if let Some(meta) = inner.meta.remove(&conn_id) {
|
conn_id,
|
||||||
out.push(BoundConn { conn_id, meta });
|
meta: m.clone(),
|
||||||
}
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for client_tx in close_txs {
|
|
||||||
let _ = client_tx.try_send(MeResponse::Close);
|
|
||||||
}
|
|
||||||
|
|
||||||
out
|
out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -500,7 +491,6 @@ impl ConnRegistry {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use super::ConnMeta;
|
use super::ConnMeta;
|
||||||
use super::ConnRegistry;
|
use super::ConnRegistry;
|
||||||
|
|
@ -673,39 +663,6 @@ mod tests {
|
||||||
assert!(registry.is_writer_empty(20).await);
|
assert!(registry.is_writer_empty(20).await);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn writer_lost_removes_bound_conn_from_registry_and_signals_close() {
|
|
||||||
let registry = ConnRegistry::new();
|
|
||||||
let (conn_id, mut rx) = registry.register().await;
|
|
||||||
let (writer_tx, _writer_rx) = tokio::sync::mpsc::channel(8);
|
|
||||||
registry.register_writer(10, writer_tx).await;
|
|
||||||
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
|
|
||||||
|
|
||||||
assert!(
|
|
||||||
registry
|
|
||||||
.bind_writer(
|
|
||||||
conn_id,
|
|
||||||
10,
|
|
||||||
ConnMeta {
|
|
||||||
target_dc: 2,
|
|
||||||
client_addr: addr,
|
|
||||||
our_addr: addr,
|
|
||||||
proto_flags: 0,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
);
|
|
||||||
|
|
||||||
let lost = registry.writer_lost(10).await;
|
|
||||||
assert_eq!(lost.len(), 1);
|
|
||||||
assert_eq!(lost[0].conn_id, conn_id);
|
|
||||||
assert!(registry.get_writer(conn_id).await.is_none());
|
|
||||||
assert!(registry.get_meta(conn_id).await.is_none());
|
|
||||||
assert_eq!(registry.unregister(conn_id).await, None);
|
|
||||||
let close = tokio::time::timeout(Duration::from_millis(50), rx.recv()).await;
|
|
||||||
assert!(matches!(close, Ok(Some(MeResponse::Close))));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn bind_writer_rejects_unregistered_writer() {
|
async fn bind_writer_rejects_unregistered_writer() {
|
||||||
let registry = ConnRegistry::new();
|
let registry = ConnRegistry::new();
|
||||||
|
|
|
||||||
|
|
@ -643,19 +643,13 @@ impl MePool {
|
||||||
let mut p = Vec::with_capacity(12);
|
let mut p = Vec::with_capacity(12);
|
||||||
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
|
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
|
||||||
p.extend_from_slice(&conn_id.to_le_bytes());
|
p.extend_from_slice(&conn_id.to_le_bytes());
|
||||||
match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
|
if w.tx
|
||||||
Ok(()) => {}
|
.send(WriterCommand::DataAndFlush(Bytes::from(p)))
|
||||||
Err(TrySendError::Full(_)) => {
|
.await
|
||||||
debug!(
|
.is_err()
|
||||||
conn_id,
|
{
|
||||||
writer_id = w.writer_id,
|
debug!("ME close write failed");
|
||||||
"ME close skipped: writer command channel is full"
|
self.remove_writer_and_close_clients(w.writer_id).await;
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(TrySendError::Closed(_)) => {
|
|
||||||
debug!("ME close write failed");
|
|
||||||
self.remove_writer_and_close_clients(w.writer_id).await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
debug!(conn_id, "ME close skipped (writer missing)");
|
debug!(conn_id, "ME close skipped (writer missing)");
|
||||||
|
|
@ -672,12 +666,8 @@ impl MePool {
|
||||||
p.extend_from_slice(&conn_id.to_le_bytes());
|
p.extend_from_slice(&conn_id.to_le_bytes());
|
||||||
match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
|
match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
Err(TrySendError::Full(_)) => {
|
Err(TrySendError::Full(cmd)) => {
|
||||||
debug!(
|
let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await;
|
||||||
conn_id,
|
|
||||||
writer_id = w.writer_id,
|
|
||||||
"ME close_conn skipped: writer command channel is full"
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
Err(TrySendError::Closed(_)) => {
|
Err(TrySendError::Closed(_)) => {
|
||||||
debug!(conn_id, "ME close_conn skipped: writer channel closed");
|
debug!(conn_id, "ME close_conn skipped: writer channel closed");
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue