From 1c6c73beda08c2b26d687329d332b8252faeb105 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 20 Mar 2026 00:41:40 +0300 Subject: [PATCH 1/5] ME Writers Anti-stuck and Quarantine fixes Co-Authored-By: Nook Scheel --- src/transport/middle_proxy/pool_refill.rs | 14 +++++-- src/transport/middle_proxy/pool_writer.rs | 47 +++++++++++++++-------- 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index fc916f4..895bdb5 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -71,11 +71,19 @@ impl MePool { } if let Some((addr, expiry)) = earliest_quarantine { + let remaining = expiry.saturating_duration_since(now); + if remaining.is_zero() { + return vec![addr]; + } + drop(guard); debug!( %addr, - wait_ms = expiry.saturating_duration_since(now).as_millis(), - "All ME endpoints are quarantined for the DC group; retrying earliest one" + wait_ms = remaining.as_millis(), + "All ME endpoints quarantined; waiting for earliest to expire" ); + // After sleeping, the quarantine entry is expired but not removed yet. + // Callers that check is_endpoint_quarantined() will lazily clean it via retain(). + tokio::time::sleep(remaining).await; return vec![addr]; } @@ -311,4 +319,4 @@ impl MePool { dc_guard.remove(&dc_key); }); } -} +} \ No newline at end of file diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 7d78b84..fb1ba10 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -242,21 +242,27 @@ impl MePool { stats_reader_close.increment_me_idle_close_by_peer_total(); info!(writer_id, "ME socket closed by peer on idle writer"); } - if let Some(pool) = pool.upgrade() - && cleanup_for_reader - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) - .is_ok() + if cleanup_for_reader + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() { - pool.remove_writer_and_close_clients(writer_id).await; + if let Some(pool) = pool.upgrade() { + pool.remove_writer_and_close_clients(writer_id).await; + } else { + // Pool is gone (shutdown). Remove writer from Vec directly + // as a last resort — no registry/refill side effects needed + // during shutdown. conn_count is not decremented here because + // the pool (and its counters) are already dropped. + let mut ws = writers_arc.write().await; + ws.retain(|w| w.id != writer_id); + debug!(writer_id, remaining = ws.len(), "Writer removed during pool shutdown"); + } } if let Err(e) = res { if !idle_close_by_peer { warn!(error = %e, "ME reader ended"); } } - let mut ws = writers_arc.write().await; - ws.retain(|w| w.id != writer_id); - info!(remaining = ws.len(), "Dead ME writer removed from pool"); }); let pool_ping = Arc::downgrade(self); @@ -346,12 +352,13 @@ impl MePool { stats_ping.increment_me_keepalive_failed(); debug!("ME ping failed, removing dead writer"); cancel_ping.cancel(); - if let Some(pool) = pool_ping.upgrade() - && cleanup_for_ping - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) - .is_ok() + if cleanup_for_ping + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() { - pool.remove_writer_and_close_clients(writer_id).await; + if let Some(pool) = pool_ping.upgrade() { + pool.remove_writer_and_close_clients(writer_id).await; + } } break; } @@ -556,13 +563,19 @@ impl MePool { } } } + // Quarantine flapping endpoints regardless of draining state — + // a rapidly dying endpoint is unstable whether it was draining or not. + if let Some(addr) = removed_addr { + if let Some(uptime) = removed_uptime { + self.maybe_quarantine_flapping_endpoint(addr, uptime).await; + } + } + // Only trigger immediate refill for unexpected (non-draining) removals. + // Draining writers are intentionally being retired. if trigger_refill && let Some(addr) = removed_addr && let Some(writer_dc) = removed_dc { - if let Some(uptime) = removed_uptime { - self.maybe_quarantine_flapping_endpoint(addr, uptime).await; - } self.trigger_immediate_refill_for_dc(addr, writer_dc); } conns @@ -645,4 +658,4 @@ impl MePool { } } } -} +} \ No newline at end of file From e40361b1717d2567d5f528c503f7af18a98ca258 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 20 Mar 2026 00:45:04 +0300 Subject: [PATCH 2/5] Cargo.toml + Cargo.lock Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- Cargo.lock | 302 ++++++++++++++++++++++++++++------------------------- Cargo.toml | 3 +- 2 files changed, 164 insertions(+), 141 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a704404..79e302f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,15 +45,24 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstyle" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" +checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" [[package]] name = "anyhow" -version = "1.0.101" +version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + +[[package]] +name = "arc-swap" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9f3647c145568cec02c42054e07bdf9a5a698e15b466fb2341bfc393cd24aa5" +dependencies = [ + "rustversion", +] [[package]] name = "asn1-rs" @@ -135,9 +144,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.10.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" +checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" [[package]] name = "block-buffer" @@ -159,9 +168,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.19.1" +version = "3.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" +checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" [[package]] name = "bytes" @@ -186,9 +195,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.55" +version = "1.2.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47b26a0954ae34af09b50f0de26458fa95369a0d478d8236d3f93082b219bd29" +checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423" dependencies = [ "find-msvc-tools", "shlex", @@ -214,9 +223,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.43" +version = "0.4.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fac4744fb15ae8337dc853fee7fb3f4e48c0fbaa23d0afe49c447b4fab126118" +checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ "iana-time-zone", "js-sys", @@ -265,18 +274,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.58" +version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63be97961acde393029492ce0be7a1af7e323e6bae9511ebfac33751be5e6806" +checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.5.58" +version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f13174bda5dfd69d7e947827e5af4b0f2f94a4a3ee92912fba07a66150f21e2" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" dependencies = [ "anstyle", "clap_lex", @@ -284,9 +293,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" [[package]] name = "core-foundation-sys" @@ -486,7 +495,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", ] [[package]] @@ -572,9 +581,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" dependencies = [ "futures-channel", "futures-core", @@ -587,9 +596,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" dependencies = [ "futures-core", "futures-sink", @@ -597,15 +606,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" [[package]] name = "futures-executor" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" dependencies = [ "futures-core", "futures-task", @@ -614,38 +623,38 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" [[package]] name = "futures-macro" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", ] [[package]] name = "futures-sink" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" [[package]] name = "futures-task" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" [[package]] name = "futures-util" -version = "0.3.31" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ "futures-channel", "futures-core", @@ -655,7 +664,6 @@ dependencies = [ "futures-task", "memchr", "pin-project-lite", - "pin-utils", "slab", ] @@ -691,20 +699,20 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "r-efi", + "r-efi 5.3.0", "wasip2", "wasm-bindgen", ] [[package]] name = "getrandom" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "139ef39800118c7683f2fd3c98c1b23c09ae076556b435f8e9064ae108aaeeec" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" dependencies = [ "cfg-if", "libc", - "r-efi", + "r-efi 6.0.0", "wasip2", "wasip3", ] @@ -894,7 +902,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.2", + "socket2 0.6.3", "tokio", "tower-service", "tracing", @@ -1076,9 +1084,9 @@ dependencies = [ [[package]] name = "ipnet" -version = "2.11.0" +version = "2.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" +checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" [[package]] name = "ipnetwork" @@ -1127,9 +1135,9 @@ checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" [[package]] name = "js-sys" -version = "0.3.85" +version = "0.3.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c942ebf8e95485ca0d52d97da7c5a2c387d0e7f0ba4c35e93bfcaee045955b3" +checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c" dependencies = [ "once_cell", "wasm-bindgen", @@ -1169,26 +1177,27 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.181" +version = "0.2.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "459427e2af2b9c839b132acb702a1c654d95e10f8c326bfc2ad11310e458b1c5" +checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" [[package]] name = "libredox" -version = "0.1.12" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616" +checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a" dependencies = [ - "bitflags 2.10.0", + "bitflags 2.11.0", "libc", - "redox_syscall 0.7.1", + "plain", + "redox_syscall 0.7.3", ] [[package]] name = "linux-raw-sys" -version = "0.11.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" [[package]] name = "litemap" @@ -1295,7 +1304,7 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" dependencies = [ - "bitflags 2.10.0", + "bitflags 2.11.0", "cfg-if", "cfg_aliases 0.1.1", "libc", @@ -1318,7 +1327,7 @@ version = "6.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" dependencies = [ - "bitflags 2.10.0", + "bitflags 2.11.0", "crossbeam-channel", "filetime", "fsevent-sys", @@ -1385,9 +1394,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.21.3" +version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" [[package]] name = "oorandom" @@ -1426,9 +1435,9 @@ checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" [[package]] name = "pin-project-lite" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" [[package]] name = "pin-utils" @@ -1436,6 +1445,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "plain" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" + [[package]] name = "plotters" version = "0.3.7" @@ -1495,7 +1510,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn 2.0.114", + "syn 2.0.117", ] [[package]] @@ -1515,7 +1530,7 @@ checksum = "37566cb3fdacef14c0737f9546df7cfeadbfbc9fef10991038bf5015d0c80532" dependencies = [ "bit-set", "bit-vec", - "bitflags 2.10.0", + "bitflags 2.11.0", "num-traits", "rand", "rand_chacha", @@ -1545,7 +1560,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2 0.6.2", + "socket2 0.6.3", "thiserror 2.0.18", "tokio", "tracing", @@ -1554,9 +1569,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.13" +version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" dependencies = [ "bytes", "getrandom 0.3.4", @@ -1582,16 +1597,16 @@ dependencies = [ "cfg_aliases 0.2.1", "libc", "once_cell", - "socket2 0.6.2", + "socket2 0.6.3", "tracing", "windows-sys 0.60.2", ] [[package]] name = "quote" -version = "1.0.44" +version = "1.0.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b2ebcf727b7760c461f091f9f0f539b77b8e87f2fd88131e7f1b433b3cece4" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" dependencies = [ "proc-macro2", ] @@ -1602,6 +1617,12 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + [[package]] name = "rand" version = "0.9.2" @@ -1666,16 +1687,16 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags 2.10.0", + "bitflags 2.11.0", ] [[package]] name = "redox_syscall" -version = "0.7.1" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35985aa610addc02e24fc232012c86fd11f14111180f902b67e2d5331f8ebf2b" +checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16" dependencies = [ - "bitflags 2.10.0", + "bitflags 2.11.0", ] [[package]] @@ -1703,9 +1724,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" [[package]] name = "reqwest" @@ -1785,11 +1806,11 @@ dependencies = [ [[package]] name = "rustix" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" dependencies = [ - "bitflags 2.10.0", + "bitflags 2.11.0", "errno", "libc", "linux-raw-sys", @@ -1798,9 +1819,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.36" +version = "0.23.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" +checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4" dependencies = [ "once_cell", "ring", @@ -1903,7 +1924,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", ] [[package]] @@ -2011,12 +2032,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0" +checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -2044,9 +2065,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.114" +version = "2.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4d107df263a3013ef9b1879b0df87d706ff80f65a86ea879bd9c31f9b307c2a" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" dependencies = [ "proc-macro2", "quote", @@ -2082,15 +2103,16 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", ] [[package]] name = "telemt" -version = "3.3.19" +version = "3.3.25" dependencies = [ "aes", "anyhow", + "arc-swap", "base64", "bytes", "cbc", @@ -2143,12 +2165,12 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.25.0" +version = "3.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom 0.4.1", + "getrandom 0.4.2", "once_cell", "rustix", "windows-sys 0.61.2", @@ -2180,7 +2202,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", ] [[package]] @@ -2191,7 +2213,7 @@ checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", ] [[package]] @@ -2256,9 +2278,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa5fdc3bce6191a1dbc8c02d5c8bffcf557bafa17c124c5264a458f1b0613fa" +checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3" dependencies = [ "tinyvec_macros", ] @@ -2271,9 +2293,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.49.0" +version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72a2903cd7736441aac9df9d7688bd0ce48edccaadf181c3b90be801e81d3d86" +checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" dependencies = [ "bytes", "libc", @@ -2281,7 +2303,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.6.2", + "socket2 0.6.3", "tokio-macros", "tracing", "windows-sys 0.61.2", @@ -2289,13 +2311,13 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.6.0" +version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" +checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", ] [[package]] @@ -2409,7 +2431,7 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ - "bitflags 2.10.0", + "bitflags 2.11.0", "bytes", "futures-util", "http", @@ -2452,7 +2474,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", ] [[package]] @@ -2478,9 +2500,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.22" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" dependencies = [ "matchers", "nu-ansi-term", @@ -2514,9 +2536,9 @@ checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" [[package]] name = "unicode-ident" -version = "1.0.23" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "537dd038a89878be9b64dd4bd1b260315c1bb94f4d784956b81e27a088d9a09e" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" [[package]] name = "unicode-xid" @@ -2614,9 +2636,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.108" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64024a30ec1e37399cf85a7ffefebdb72205ca1c972291c51512360d90bd8566" +checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e" dependencies = [ "cfg-if", "once_cell", @@ -2627,9 +2649,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.58" +version = "0.4.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70a6e77fd0ae8029c9ea0063f87c46fde723e7d887703d74ad2616d792e51e6f" +checksum = "e9c5522b3a28661442748e09d40924dfb9ca614b21c00d3fd135720e48b67db8" dependencies = [ "cfg-if", "futures-util", @@ -2641,9 +2663,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.108" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "008b239d9c740232e71bd39e8ef6429d27097518b6b30bdf9086833bd5b6d608" +checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2651,22 +2673,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.108" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5256bae2d58f54820e6490f9839c49780dff84c65aeab9e772f15d5f0e913a55" +checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3" dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.108" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f01b580c9ac74c8d8f0c0e4afb04eeef2acf145458e52c03845ee9cd23e3d12" +checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16" dependencies = [ "unicode-ident", ] @@ -2699,7 +2721,7 @@ version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ - "bitflags 2.10.0", + "bitflags 2.11.0", "hashbrown 0.15.5", "indexmap", "semver", @@ -2707,9 +2729,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.85" +version = "0.3.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "312e32e551d92129218ea9a2452120f4aabc03529ef03e4d0d82fb2780608598" +checksum = "854ba17bb104abfb26ba36da9729addc7ce7f06f5c0f90f3c391f8461cca21f9" dependencies = [ "js-sys", "wasm-bindgen", @@ -2773,7 +2795,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", ] [[package]] @@ -2784,7 +2806,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", ] [[package]] @@ -3035,9 +3057,9 @@ checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" [[package]] name = "winnow" -version = "0.7.14" +version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a5364e9d77fcdeeaa6062ced926ee3381faa2ee02d3eb83a5c27a8825540829" +checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945" dependencies = [ "memchr", ] @@ -3072,7 +3094,7 @@ dependencies = [ "heck", "indexmap", "prettyplease", - "syn 2.0.114", + "syn 2.0.117", "wasm-metadata", "wit-bindgen-core", "wit-component", @@ -3088,7 +3110,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", "wit-bindgen-core", "wit-bindgen-rust", ] @@ -3100,7 +3122,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", - "bitflags 2.10.0", + "bitflags 2.11.0", "indexmap", "log", "serde", @@ -3172,28 +3194,28 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", "synstructure 0.13.2", ] [[package]] name = "zerocopy" -version = "0.8.39" +version = "0.8.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db6d35d663eadb6c932438e763b262fe1a70987f9ae936e60158176d710cae4a" +checksum = "efbb2a062be311f2ba113ce66f697a4dc589f85e78a4aea276200804cea0ed87" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.39" +version = "0.8.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4122cd3169e94605190e77839c9a40d40ed048d305bfdc146e7df40ab0f3e517" +checksum = "0e8bc7269b54418e7aeeef514aa68f8690b8c0489a06b0136e5f57c4c5ccab89" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", ] [[package]] @@ -3213,7 +3235,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", "synstructure 0.13.2", ] @@ -3234,7 +3256,7 @@ checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", ] [[package]] @@ -3267,11 +3289,11 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.114", + "syn 2.0.117", ] [[package]] name = "zmij" -version = "1.0.20" +version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4de98dfa5d5b7fef4ee834d0073d560c9ca7b6c46a71d058c48db7960f8cfaf7" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml index 2a9cbe3..fab12f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.3.25" +version = "3.3.26" edition = "2024" [dependencies] @@ -40,6 +40,7 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } parking_lot = "0.12" dashmap = "5.5" +arc-swap = "1.7" lru = "0.16" rand = "0.9" chrono = { version = "0.4", features = ["serde"] } From 4f11aa07722925272cbefa2292598fd5a302f2d6 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 20 Mar 2026 11:25:07 +0300 Subject: [PATCH 3/5] Update README.md --- README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 2102a3a..f58a300 100644 --- a/README.md +++ b/README.md @@ -19,10 +19,12 @@ ### 🇷🇺 RU -#### Релиз 3.3.15 Semistable +#### О релизах [3.3.15](https://github.com/telemt/telemt/releases/tag/3.3.15) по итогам работы в продакшн признан одним из самых стабильных и рекомендуется к использованию, когда cutting-edge фичи некритичны! +[3.3.24](https://github.com/telemt/telemt/releases/tag/3.3.24) даёт баланс стабильности и передового функционала, а так же последние исправления по безопасности и багам + Будем рады вашему фидбеку и предложениям по улучшению — особенно в части **API**, **статистики**, **UX** --- @@ -40,10 +42,12 @@ ### 🇬🇧 EN -#### Release 3.3.15 Semistable +#### About releases [3.3.15](https://github.com/telemt/telemt/releases/tag/3.3.15) is, based on the results of his work in production, recognized as one of the most stable and recommended for use when cutting-edge features are not so necessary! +[3.3.24](https://github.com/telemt/telemt/releases/tag/3.3.24) provides a balance of stability and advanced functionality, as well as the latest security and bug fixes + We are looking forward to your feedback and improvement proposals — especially regarding **API**, **statistics**, **UX** --- From ed4d1167dd6c33f0307e314c9b3c1807708b7215 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 20 Mar 2026 12:09:23 +0300 Subject: [PATCH 4/5] ME Writers Advanced Cleanup Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/health.rs | 92 ++++++++++++++++++++++- src/transport/middle_proxy/pool_refill.rs | 44 ++++++----- src/transport/middle_proxy/pool_writer.rs | 31 +++++--- 3 files changed, 133 insertions(+), 34 deletions(-) diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 8b62cff..5829de4 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -1327,6 +1327,33 @@ async fn recover_single_endpoint_outage( } let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms(); + let bypass_quarantine = pool.single_endpoint_outage_disable_quarantine(); + if !bypass_quarantine { + let quarantine_remaining = { + let mut guard = pool.endpoint_quarantine.lock().await; + let quarantine_now = Instant::now(); + guard.retain(|_, expiry| *expiry > quarantine_now); + guard + .get(&endpoint) + .map(|expiry| expiry.saturating_duration_since(quarantine_now)) + }; + + if let Some(remaining) = quarantine_remaining + && !remaining.is_zero() + { + outage_next_attempt.insert(key, now + remaining); + debug!( + dc = %key.0, + family = ?key.1, + %endpoint, + required, + wait_ms = remaining.as_millis(), + "Single-endpoint outage reconnect deferred by endpoint quarantine" + ); + return; + } + } + if *reconnect_budget == 0 { outage_next_attempt.insert(key, now + Duration::from_millis(min_backoff_ms.max(250))); debug!( @@ -1342,7 +1369,6 @@ async fn recover_single_endpoint_outage( pool.stats .increment_me_single_endpoint_outage_reconnect_attempt_total(); - let bypass_quarantine = pool.single_endpoint_outage_disable_quarantine(); let attempt_ok = if bypass_quarantine { pool.stats .increment_me_single_endpoint_quarantine_bypass_total(); @@ -1561,9 +1587,10 @@ mod tests { use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; - use super::reap_draining_writers; + use super::{reap_draining_writers, recover_single_endpoint_outage}; use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode}; use crate::crypto::SecureRandom; + use crate::network::IpFamily; use crate::network::probe::NetworkDecision; use crate::stats::Stats; use crate::transport::middle_proxy::codec::WriterCommand; @@ -1745,4 +1772,65 @@ mod tests { assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20); assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30); } + + #[tokio::test] + async fn removing_draining_writer_still_quarantines_flapping_endpoint() { + let pool = make_pool(1).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let writer_id = 11u64; + let writer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000 + writer_id as u16); + let conn_id = + insert_draining_writer(&pool, writer_id, now_epoch_secs.saturating_sub(5)).await; + + assert!(pool + .registry + .evict_bound_conn_if_writer(conn_id, writer_id) + .await); + pool.remove_writer_and_close_clients(writer_id).await; + + assert!(pool.is_endpoint_quarantined(writer_addr).await); + } + + #[tokio::test] + async fn single_endpoint_outage_respects_quarantine_when_bypass_disabled() { + let pool = make_pool(1).await; + pool.me_single_endpoint_outage_disable_quarantine + .store(false, Ordering::Relaxed); + + let key = (2, IpFamily::V4); + let endpoint = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7443); + let quarantine_ttl = Duration::from_millis(200); + { + let mut guard = pool.endpoint_quarantine.lock().await; + guard.insert(endpoint, Instant::now() + quarantine_ttl); + } + + let rng = Arc::new(SecureRandom::new()); + let mut outage_backoff = HashMap::new(); + let mut outage_next_attempt = HashMap::new(); + let mut reconnect_budget = 1usize; + let started_at = Instant::now(); + + recover_single_endpoint_outage( + &pool, + &rng, + key, + endpoint, + 1, + &mut outage_backoff, + &mut outage_next_attempt, + &mut reconnect_budget, + ) + .await; + + assert_eq!(reconnect_budget, 1); + assert_eq!( + pool.stats + .get_me_single_endpoint_outage_reconnect_attempt_total(), + 0 + ); + assert_eq!(pool.stats.get_me_single_endpoint_quarantine_bypass_total(), 0); + let next_attempt = outage_next_attempt.get(&key).copied().unwrap(); + assert!(next_attempt >= started_at + Duration::from_millis(120)); + } } diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index 895bdb5..43e2e6b 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -49,28 +49,31 @@ impl MePool { return Vec::new(); } - let mut guard = self.endpoint_quarantine.lock().await; - let now = Instant::now(); - guard.retain(|_, expiry| *expiry > now); + loop { + let mut guard = self.endpoint_quarantine.lock().await; + let now = Instant::now(); + guard.retain(|_, expiry| *expiry > now); - let mut ready = Vec::::with_capacity(endpoints.len()); - let mut earliest_quarantine: Option<(SocketAddr, Instant)> = None; - for addr in endpoints { - if let Some(expiry) = guard.get(addr).copied() { - match earliest_quarantine { - Some((_, current_expiry)) if current_expiry <= expiry => {} - _ => earliest_quarantine = Some((*addr, expiry)), + let mut ready = Vec::::with_capacity(endpoints.len()); + let mut earliest_quarantine: Option<(SocketAddr, Instant)> = None; + for addr in endpoints { + if let Some(expiry) = guard.get(addr).copied() { + match earliest_quarantine { + Some((_, current_expiry)) if current_expiry <= expiry => {} + _ => earliest_quarantine = Some((*addr, expiry)), + } + } else { + ready.push(*addr); } - } else { - ready.push(*addr); } - } - if !ready.is_empty() { - return ready; - } + if !ready.is_empty() { + return ready; + } - if let Some((addr, expiry)) = earliest_quarantine { + let Some((addr, expiry)) = earliest_quarantine else { + return Vec::new(); + }; let remaining = expiry.saturating_duration_since(now); if remaining.is_zero() { return vec![addr]; @@ -81,13 +84,8 @@ impl MePool { wait_ms = remaining.as_millis(), "All ME endpoints quarantined; waiting for earliest to expire" ); - // After sleeping, the quarantine entry is expired but not removed yet. - // Callers that check is_endpoint_quarantined() will lazily clean it via retain(). tokio::time::sleep(remaining).await; - return vec![addr]; } - - Vec::new() } pub(super) async fn has_refill_inflight_for_dc_key(&self, key: RefillDcKey) -> bool { @@ -319,4 +317,4 @@ impl MePool { dc_guard.remove(&dc_key); }); } -} \ No newline at end of file +} diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index fb1ba10..e368ead 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -142,6 +142,9 @@ impl MePool { seq_no: 0, crc_mode: hs.crc_mode, }; + let cleanup_done = Arc::new(AtomicBool::new(false)); + let cleanup_for_writer = cleanup_done.clone(); + let pool_writer = Arc::downgrade(self); let cancel_wr = cancel.clone(); tokio::spawn(async move { loop { @@ -160,6 +163,17 @@ impl MePool { _ = cancel_wr.cancelled() => break, } } + cancel_wr.cancel(); + if cleanup_for_writer + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() + { + if let Some(pool) = pool_writer.upgrade() { + pool.remove_writer_and_close_clients(writer_id).await; + } else { + debug!(writer_id, "ME writer cleanup skipped: pool dropped"); + } + } }); let writer = MeWriter { id: writer_id, @@ -196,7 +210,6 @@ impl MePool { let cancel_ping = cancel.clone(); let tx_ping = tx.clone(); let ping_tracker_ping = ping_tracker.clone(); - let cleanup_done = Arc::new(AtomicBool::new(false)); let cleanup_for_reader = cleanup_done.clone(); let cleanup_for_ping = cleanup_done.clone(); let keepalive_enabled = self.me_keepalive_enabled; @@ -242,6 +255,7 @@ impl MePool { stats_reader_close.increment_me_idle_close_by_peer_total(); info!(writer_id, "ME socket closed by peer on idle writer"); } + cancel_reader_token.cancel(); if cleanup_for_reader .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok() @@ -249,13 +263,12 @@ impl MePool { if let Some(pool) = pool.upgrade() { pool.remove_writer_and_close_clients(writer_id).await; } else { - // Pool is gone (shutdown). Remove writer from Vec directly - // as a last resort — no registry/refill side effects needed - // during shutdown. conn_count is not decremented here because - // the pool (and its counters) are already dropped. - let mut ws = writers_arc.write().await; - ws.retain(|w| w.id != writer_id); - debug!(writer_id, remaining = ws.len(), "Writer removed during pool shutdown"); + let remaining = writers_arc.read().await.len(); + debug!( + writer_id, + remaining, + "ME reader cleanup skipped: pool dropped" + ); } } if let Err(e) = res { @@ -658,4 +671,4 @@ impl MePool { } } } -} \ No newline at end of file +} From f61d25ebe0b62395a226e0af70afc5d03016a2f6 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 20 Mar 2026 12:11:47 +0300 Subject: [PATCH 5/5] Authoritative Teardown + Orphan Watchdog + Force-Close Safery Policy Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/cli.rs | 8 +- src/config/defaults.rs | 12 +- src/config/load.rs | 39 +++ src/config/types.rs | 11 +- src/maestro/me_startup.rs | 165 ++++++++--- src/transport/middle_proxy/health.rs | 256 ++++++++++++------ .../middle_proxy/health_regression_tests.rs | 23 +- src/transport/middle_proxy/mod.rs | 2 +- src/transport/middle_proxy/pool.rs | 29 +- src/transport/middle_proxy/pool_refill.rs | 49 ++-- src/transport/middle_proxy/pool_writer.rs | 94 ++++--- 11 files changed, 477 insertions(+), 211 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 5fbd7d5..b6e2d92 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -199,8 +199,14 @@ update_every = 43200 hardswap = false me_pool_drain_ttl_secs = 90 me_instadrain = false +me_pool_drain_threshold = 32 +me_pool_drain_soft_evict_grace_secs = 10 +me_pool_drain_soft_evict_per_writer = 2 +me_pool_drain_soft_evict_budget_per_core = 16 +me_pool_drain_soft_evict_cooldown_ms = 1000 +me_bind_stale_mode = "never" me_pool_min_fresh_ratio = 0.8 -me_reinit_drain_timeout_secs = 120 +me_reinit_drain_timeout_secs = 90 [network] ipv4 = true diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 6d74c93..fea8305 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -40,10 +40,10 @@ const DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS: u64 = 3000; const DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS: u64 = 250; const DEFAULT_ME_C2ME_SEND_TIMEOUT_MS: u64 = 4000; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED: bool = true; -const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 30; -const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 1; -const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 8; -const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 5000; +const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 10; +const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 2; +const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 16; +const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 1000; const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30; const DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS: u64 = 250; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2; @@ -606,7 +606,7 @@ pub(crate) fn default_proxy_secret_len_max() -> usize { } pub(crate) fn default_me_reinit_drain_timeout_secs() -> u64 { - 120 + 90 } pub(crate) fn default_me_pool_drain_ttl_secs() -> u64 { @@ -618,7 +618,7 @@ pub(crate) fn default_me_instadrain() -> bool { } pub(crate) fn default_me_pool_drain_threshold() -> u64 { - 128 + 32 } pub(crate) fn default_me_pool_drain_soft_evict_enabled() -> bool { diff --git a/src/config/load.rs b/src/config/load.rs index c296697..14799ed 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -2037,6 +2037,45 @@ mod tests { let _ = std::fs::remove_file(path); } + #[test] + fn force_close_default_matches_drain_ttl() { + let toml = r#" + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_force_close_default_test.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + assert_eq!(cfg.general.me_reinit_drain_timeout_secs, 90); + assert_eq!(cfg.general.effective_me_pool_force_close_secs(), 90); + let _ = std::fs::remove_file(path); + } + + #[test] + fn force_close_zero_uses_runtime_safety_fallback() { + let toml = r#" + [general] + me_reinit_drain_timeout_secs = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_force_close_zero_fallback_test.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + assert_eq!(cfg.general.me_reinit_drain_timeout_secs, 0); + assert_eq!(cfg.general.effective_me_pool_force_close_secs(), 300); + let _ = std::fs::remove_file(path); + } + #[test] fn force_close_bumped_when_below_drain_ttl() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index ecd051d..d018187 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -135,8 +135,8 @@ impl MeSocksKdfPolicy { #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] #[serde(rename_all = "lowercase")] pub enum MeBindStaleMode { - Never, #[default] + Never, Ttl, Always, } @@ -855,7 +855,7 @@ pub struct GeneralConfig { pub me_pool_min_fresh_ratio: f32, /// Drain timeout in seconds for stale ME writers after endpoint map changes. - /// Set to 0 to keep stale writers draining indefinitely (no force-close). + /// Set to 0 to use the runtime safety fallback timeout. #[serde(default = "default_me_reinit_drain_timeout_secs")] pub me_reinit_drain_timeout_secs: u64, @@ -1068,8 +1068,13 @@ impl GeneralConfig { /// Resolve force-close timeout for stale writers. /// `me_reinit_drain_timeout_secs` remains backward-compatible alias. + /// A configured `0` uses the runtime safety fallback (300s). pub fn effective_me_pool_force_close_secs(&self) -> u64 { - self.me_reinit_drain_timeout_secs + if self.me_reinit_drain_timeout_secs == 0 { + 300 + } else { + self.me_reinit_drain_timeout_secs + } } } diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index 0b1310a..eb45cc4 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -332,25 +332,76 @@ pub(crate) async fn initialize_me_pool( "Middle-End pool initialized successfully" ); - let pool_health = pool_bg.clone(); - let rng_health = rng_bg.clone(); - let min_conns = pool_size; - tokio::spawn(async move { - crate::transport::middle_proxy::me_health_monitor( - pool_health, - rng_health, - min_conns, - ) - .await; - }); - let pool_drain_enforcer = pool_bg.clone(); - tokio::spawn(async move { - crate::transport::middle_proxy::me_drain_timeout_enforcer( - pool_drain_enforcer, - ) - .await; - }); - break; + // ── Supervised background tasks ────────────────── + // Each task runs inside a nested tokio::spawn so + // that a panic is caught via JoinHandle and the + // outer loop restarts the task automatically. + let pool_health = pool_bg.clone(); + let rng_health = rng_bg.clone(); + let min_conns = pool_size; + tokio::spawn(async move { + loop { + let p = pool_health.clone(); + let r = rng_health.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + p, r, min_conns, + ) + .await; + }) + .await; + match res { + Ok(()) => warn!("me_health_monitor exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_health_monitor panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + let pool_drain_enforcer = pool_bg.clone(); + tokio::spawn(async move { + loop { + let p = pool_drain_enforcer.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_drain_timeout_enforcer(p).await; + }) + .await; + match res { + Ok(()) => warn!("me_drain_timeout_enforcer exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_drain_timeout_enforcer panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + let pool_watchdog = pool_bg.clone(); + tokio::spawn(async move { + loop { + let p = pool_watchdog.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_zombie_writer_watchdog(p).await; + }) + .await; + match res { + Ok(()) => warn!("me_zombie_writer_watchdog exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_zombie_writer_watchdog panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + // CRITICAL: keep the current-thread runtime + // alive. Without this, block_on() returns, + // the Runtime is dropped, and ALL spawned + // background tasks (health monitor, drain + // enforcer, zombie watchdog) are silently + // cancelled — causing the draining-writer + // leak that brought us here. + std::future::pending::<()>().await; + unreachable!(); } Err(e) => { startup_tracker_bg.set_me_last_error(Some(e.to_string())).await; @@ -408,23 +459,65 @@ pub(crate) async fn initialize_me_pool( "Middle-End pool initialized successfully" ); - let pool_clone = pool.clone(); - let rng_clone = rng.clone(); - let min_conns = pool_size; - tokio::spawn(async move { - crate::transport::middle_proxy::me_health_monitor( - pool_clone, rng_clone, min_conns, - ) - .await; - }); - let pool_drain_enforcer = pool.clone(); - tokio::spawn(async move { - crate::transport::middle_proxy::me_drain_timeout_enforcer( - pool_drain_enforcer, - ) - .await; - }); - + // ── Supervised background tasks ────────────────── + let pool_clone = pool.clone(); + let rng_clone = rng.clone(); + let min_conns = pool_size; + tokio::spawn(async move { + loop { + let p = pool_clone.clone(); + let r = rng_clone.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + p, r, min_conns, + ) + .await; + }) + .await; + match res { + Ok(()) => warn!("me_health_monitor exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_health_monitor panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + let pool_drain_enforcer = pool.clone(); + tokio::spawn(async move { + loop { + let p = pool_drain_enforcer.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_drain_timeout_enforcer(p).await; + }) + .await; + match res { + Ok(()) => warn!("me_drain_timeout_enforcer exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_drain_timeout_enforcer panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + let pool_watchdog = pool.clone(); + tokio::spawn(async move { + loop { + let p = pool_watchdog.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_zombie_writer_watchdog(p).await; + }) + .await; + match res { + Ok(()) => warn!("me_zombie_writer_watchdog exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_zombie_writer_watchdog panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + break Some(pool); } Err(e) => { diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 5829de4..9d4cc70 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -1327,33 +1327,6 @@ async fn recover_single_endpoint_outage( } let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms(); - let bypass_quarantine = pool.single_endpoint_outage_disable_quarantine(); - if !bypass_quarantine { - let quarantine_remaining = { - let mut guard = pool.endpoint_quarantine.lock().await; - let quarantine_now = Instant::now(); - guard.retain(|_, expiry| *expiry > quarantine_now); - guard - .get(&endpoint) - .map(|expiry| expiry.saturating_duration_since(quarantine_now)) - }; - - if let Some(remaining) = quarantine_remaining - && !remaining.is_zero() - { - outage_next_attempt.insert(key, now + remaining); - debug!( - dc = %key.0, - family = ?key.1, - %endpoint, - required, - wait_ms = remaining.as_millis(), - "Single-endpoint outage reconnect deferred by endpoint quarantine" - ); - return; - } - } - if *reconnect_budget == 0 { outage_next_attempt.insert(key, now + Duration::from_millis(min_backoff_ms.max(250))); debug!( @@ -1369,6 +1342,7 @@ async fn recover_single_endpoint_outage( pool.stats .increment_me_single_endpoint_outage_reconnect_attempt_total(); + let bypass_quarantine = pool.single_endpoint_outage_disable_quarantine(); let attempt_ok = if bypass_quarantine { pool.stats .increment_me_single_endpoint_quarantine_bypass_total(); @@ -1576,6 +1550,170 @@ async fn maybe_rotate_single_endpoint_shadow( ); } +/// Last-resort safety net for draining writers stuck past their deadline. +/// +/// Runs every `TICK_SECS` and force-closes any draining writer whose +/// `drain_deadline_epoch_secs` has been exceeded by more than a threshold. +/// +/// Two thresholds: +/// - `SOFT_THRESHOLD_SECS` (60s): writers with no bound clients +/// - `HARD_THRESHOLD_SECS` (300s): writers WITH bound clients (unconditional) +/// +/// Intentionally kept trivial and independent of pool config to minimise +/// the probability of panicking itself. Uses `SystemTime` directly +/// as a fallback clock source and timeouts on every lock acquisition +/// and writer removal so one stuck writer cannot block the rest. +pub async fn me_zombie_writer_watchdog(pool: Arc) { + use std::time::{SystemTime, UNIX_EPOCH}; + + const TICK_SECS: u64 = 30; + const SOFT_THRESHOLD_SECS: u64 = 60; + const HARD_THRESHOLD_SECS: u64 = 300; + const LOCK_TIMEOUT_SECS: u64 = 5; + const REMOVE_TIMEOUT_SECS: u64 = 10; + const HARD_DETACH_TIMEOUT_STREAK: u8 = 3; + + let mut removal_timeout_streak = HashMap::::new(); + + loop { + tokio::time::sleep(Duration::from_secs(TICK_SECS)).await; + + let now = match SystemTime::now().duration_since(UNIX_EPOCH) { + Ok(d) => d.as_secs(), + Err(_) => continue, + }; + + // Phase 1: collect zombie IDs under a short read-lock with timeout. + let zombie_ids_with_meta: Vec<(u64, bool)> = { + let Ok(ws) = tokio::time::timeout( + Duration::from_secs(LOCK_TIMEOUT_SECS), + pool.writers.read(), + ) + .await + else { + warn!("zombie_watchdog: writers read-lock timeout, skipping tick"); + continue; + }; + ws.iter() + .filter(|w| w.draining.load(std::sync::atomic::Ordering::Relaxed)) + .filter_map(|w| { + let deadline = w + .drain_deadline_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed); + if deadline == 0 { + return None; + } + let overdue = now.saturating_sub(deadline); + if overdue == 0 { + return None; + } + let started = w + .draining_started_at_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed); + let drain_age = now.saturating_sub(started); + if drain_age > HARD_THRESHOLD_SECS { + return Some((w.id, true)); + } + if overdue > SOFT_THRESHOLD_SECS { + return Some((w.id, false)); + } + None + }) + .collect() + }; + // read lock released here + + if zombie_ids_with_meta.is_empty() { + removal_timeout_streak.clear(); + continue; + } + + let mut active_zombie_ids = HashSet::::with_capacity(zombie_ids_with_meta.len()); + for (writer_id, _) in &zombie_ids_with_meta { + active_zombie_ids.insert(*writer_id); + } + removal_timeout_streak.retain(|writer_id, _| active_zombie_ids.contains(writer_id)); + + warn!( + zombie_count = zombie_ids_with_meta.len(), + soft_threshold_secs = SOFT_THRESHOLD_SECS, + hard_threshold_secs = HARD_THRESHOLD_SECS, + "Zombie draining writers detected by watchdog, force-closing" + ); + + // Phase 2: remove each writer individually with a timeout. + // One stuck removal cannot block the rest. + for (writer_id, had_clients) in &zombie_ids_with_meta { + let result = tokio::time::timeout( + Duration::from_secs(REMOVE_TIMEOUT_SECS), + pool.remove_writer_and_close_clients(*writer_id), + ) + .await; + match result { + Ok(()) => { + removal_timeout_streak.remove(writer_id); + pool.stats.increment_pool_force_close_total(); + pool.stats + .increment_me_draining_writers_reap_progress_total(); + info!( + writer_id, + had_clients, + "Zombie writer removed by watchdog" + ); + } + Err(_) => { + let streak = removal_timeout_streak + .entry(*writer_id) + .and_modify(|value| *value = value.saturating_add(1)) + .or_insert(1); + warn!( + writer_id, + had_clients, + timeout_streak = *streak, + "Zombie writer removal timed out" + ); + if *streak < HARD_DETACH_TIMEOUT_STREAK { + continue; + } + + let hard_detach = tokio::time::timeout( + Duration::from_secs(REMOVE_TIMEOUT_SECS), + pool.remove_draining_writer_hard_detach(*writer_id), + ) + .await; + match hard_detach { + Ok(true) => { + removal_timeout_streak.remove(writer_id); + pool.stats.increment_pool_force_close_total(); + pool.stats + .increment_me_draining_writers_reap_progress_total(); + info!( + writer_id, + had_clients, + "Zombie writer hard-detached after repeated timeouts" + ); + } + Ok(false) => { + removal_timeout_streak.remove(writer_id); + debug!( + writer_id, + had_clients, + "Zombie hard-detach skipped (writer already gone or no longer draining)" + ); + } + Err(_) => { + warn!( + writer_id, + had_clients, + "Zombie hard-detach timed out, will retry next tick" + ); + } + } + } + } + } + } +} #[cfg(test)] mod tests { use std::collections::HashMap; @@ -1587,10 +1725,9 @@ mod tests { use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; - use super::{reap_draining_writers, recover_single_endpoint_outage}; + use super::reap_draining_writers; use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode}; use crate::crypto::SecureRandom; - use crate::network::IpFamily; use crate::network::probe::NetworkDecision; use crate::stats::Stats; use crate::transport::middle_proxy::codec::WriterCommand; @@ -1772,65 +1909,4 @@ mod tests { assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20); assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30); } - - #[tokio::test] - async fn removing_draining_writer_still_quarantines_flapping_endpoint() { - let pool = make_pool(1).await; - let now_epoch_secs = MePool::now_epoch_secs(); - let writer_id = 11u64; - let writer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000 + writer_id as u16); - let conn_id = - insert_draining_writer(&pool, writer_id, now_epoch_secs.saturating_sub(5)).await; - - assert!(pool - .registry - .evict_bound_conn_if_writer(conn_id, writer_id) - .await); - pool.remove_writer_and_close_clients(writer_id).await; - - assert!(pool.is_endpoint_quarantined(writer_addr).await); - } - - #[tokio::test] - async fn single_endpoint_outage_respects_quarantine_when_bypass_disabled() { - let pool = make_pool(1).await; - pool.me_single_endpoint_outage_disable_quarantine - .store(false, Ordering::Relaxed); - - let key = (2, IpFamily::V4); - let endpoint = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7443); - let quarantine_ttl = Duration::from_millis(200); - { - let mut guard = pool.endpoint_quarantine.lock().await; - guard.insert(endpoint, Instant::now() + quarantine_ttl); - } - - let rng = Arc::new(SecureRandom::new()); - let mut outage_backoff = HashMap::new(); - let mut outage_next_attempt = HashMap::new(); - let mut reconnect_budget = 1usize; - let started_at = Instant::now(); - - recover_single_endpoint_outage( - &pool, - &rng, - key, - endpoint, - 1, - &mut outage_backoff, - &mut outage_next_attempt, - &mut reconnect_budget, - ) - .await; - - assert_eq!(reconnect_budget, 1); - assert_eq!( - pool.stats - .get_me_single_endpoint_outage_reconnect_attempt_total(), - 0 - ); - assert_eq!(pool.stats.get_me_single_endpoint_quarantine_bypass_total(), 0); - let next_attempt = outage_next_attempt.get(&key).copied().unwrap(); - assert!(next_attempt >= started_at + Duration::from_millis(120)); - } } diff --git a/src/transport/middle_proxy/health_regression_tests.rs b/src/transport/middle_proxy/health_regression_tests.rs index bcdaf2e..230cd64 100644 --- a/src/transport/middle_proxy/health_regression_tests.rs +++ b/src/transport/middle_proxy/health_regression_tests.rs @@ -12,7 +12,9 @@ use super::codec::WriterCommand; use super::health::{health_drain_close_budget, reap_draining_writers}; use super::pool::{MePool, MeWriter, WriterContour}; use super::registry::ConnMeta; -use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode}; +use crate::config::{ + GeneralConfig, MeBindStaleMode, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode, +}; use crate::crypto::SecureRandom; use crate::network::probe::NetworkDecision; use crate::stats::Stats; @@ -646,10 +648,23 @@ async fn reap_draining_writers_instadrain_removes_non_expired_writers_immediatel #[test] fn general_config_default_drain_threshold_remains_enabled() { - assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128); + assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 32); assert!(GeneralConfig::default().me_pool_drain_soft_evict_enabled); assert_eq!( - GeneralConfig::default().me_pool_drain_soft_evict_per_writer, - 1 + GeneralConfig::default().me_pool_drain_soft_evict_grace_secs, + 10 ); + assert_eq!( + GeneralConfig::default().me_pool_drain_soft_evict_per_writer, + 2 + ); + assert_eq!( + GeneralConfig::default().me_pool_drain_soft_evict_budget_per_core, + 16 + ); + assert_eq!( + GeneralConfig::default().me_pool_drain_soft_evict_cooldown_ms, + 1000 + ); + assert_eq!(GeneralConfig::default().me_bind_stale_mode, MeBindStaleMode::Never); } diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index 26ded29..8c57717 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -30,7 +30,7 @@ mod health_adversarial_tests; use bytes::Bytes; -pub use health::{me_drain_timeout_enforcer, me_health_monitor}; +pub use health::{me_drain_timeout_enforcer, me_health_monitor, me_zombie_writer_watchdog}; #[allow(unused_imports)] pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily}; pub use pool::MePool; diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 441d41d..f825058 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -18,6 +18,8 @@ use crate::transport::UpstreamManager; use super::ConnRegistry; use super::codec::WriterCommand; +const ME_FORCE_CLOSE_SAFETY_FALLBACK_SECS: u64 = 300; + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub(super) struct RefillDcKey { pub dc: i32, @@ -229,6 +231,14 @@ impl MePool { .as_secs() } + fn normalize_force_close_secs(force_close_secs: u64) -> u64 { + if force_close_secs == 0 { + ME_FORCE_CLOSE_SAFETY_FALLBACK_SECS + } else { + force_close_secs + } + } + pub fn new( proxy_tag: Option>, proxy_secret: Vec, @@ -477,7 +487,9 @@ impl MePool { me_pool_drain_soft_evict_cooldown_ms: AtomicU64::new( me_pool_drain_soft_evict_cooldown_ms.max(1), ), - me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs), + me_pool_force_close_secs: AtomicU64::new(Self::normalize_force_close_secs( + me_pool_force_close_secs, + )), me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille( me_pool_min_fresh_ratio, )), @@ -587,8 +599,10 @@ impl MePool { ); self.me_pool_drain_soft_evict_cooldown_ms .store(pool_drain_soft_evict_cooldown_ms.max(1), Ordering::Relaxed); - self.me_pool_force_close_secs - .store(force_close_secs, Ordering::Relaxed); + self.me_pool_force_close_secs.store( + Self::normalize_force_close_secs(force_close_secs), + Ordering::Relaxed, + ); self.me_pool_min_fresh_ratio_permille .store(Self::ratio_to_permille(min_fresh_ratio), Ordering::Relaxed); self.me_hardswap_warmup_delay_min_ms @@ -733,12 +747,9 @@ impl MePool { } pub(super) fn force_close_timeout(&self) -> Option { - let secs = self.me_pool_force_close_secs.load(Ordering::Relaxed); - if secs == 0 { - None - } else { - Some(Duration::from_secs(secs)) - } + let secs = + Self::normalize_force_close_secs(self.me_pool_force_close_secs.load(Ordering::Relaxed)); + Some(Duration::from_secs(secs)) } pub(super) fn drain_soft_evict_enabled(&self) -> bool { diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index 43e2e6b..e4fb95f 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -49,43 +49,36 @@ impl MePool { return Vec::new(); } - loop { - let mut guard = self.endpoint_quarantine.lock().await; - let now = Instant::now(); - guard.retain(|_, expiry| *expiry > now); + let mut guard = self.endpoint_quarantine.lock().await; + let now = Instant::now(); + guard.retain(|_, expiry| *expiry > now); - let mut ready = Vec::::with_capacity(endpoints.len()); - let mut earliest_quarantine: Option<(SocketAddr, Instant)> = None; - for addr in endpoints { - if let Some(expiry) = guard.get(addr).copied() { - match earliest_quarantine { - Some((_, current_expiry)) if current_expiry <= expiry => {} - _ => earliest_quarantine = Some((*addr, expiry)), - } - } else { - ready.push(*addr); + let mut ready = Vec::::with_capacity(endpoints.len()); + let mut earliest_quarantine: Option<(SocketAddr, Instant)> = None; + for addr in endpoints { + if let Some(expiry) = guard.get(addr).copied() { + match earliest_quarantine { + Some((_, current_expiry)) if current_expiry <= expiry => {} + _ => earliest_quarantine = Some((*addr, expiry)), } + } else { + ready.push(*addr); } + } - if !ready.is_empty() { - return ready; - } + if !ready.is_empty() { + return ready; + } - let Some((addr, expiry)) = earliest_quarantine else { - return Vec::new(); - }; - let remaining = expiry.saturating_duration_since(now); - if remaining.is_zero() { - return vec![addr]; - } - drop(guard); + if let Some((addr, expiry)) = earliest_quarantine { debug!( %addr, - wait_ms = remaining.as_millis(), - "All ME endpoints quarantined; waiting for earliest to expire" + wait_ms = expiry.saturating_duration_since(now).as_millis(), + "All ME endpoints are quarantined for the DC group; waiting for quarantine expiry" ); - tokio::time::sleep(remaining).await; } + + Vec::new() } pub(super) async fn has_refill_inflight_for_dc_key(&self, key: RefillDcKey) -> bool { diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index e368ead..e3ea44d 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -20,7 +20,6 @@ use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32}; use super::codec::{RpcWriter, WriterCommand}; use super::pool::{MePool, MeWriter, WriterContour}; use super::reader::reader_loop; -use super::registry::BoundConn; use super::wire::build_proxy_req_payload; const ME_ACTIVE_PING_SECS: u64 = 25; @@ -28,6 +27,12 @@ const ME_ACTIVE_PING_JITTER_SECS: i64 = 5; const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5; const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700; +#[derive(Clone, Copy)] +enum WriterTeardownMode { + Any, + DrainingOnly, +} + fn is_me_peer_closed_error(error: &ProxyError) -> bool { matches!(error, ProxyError::Io(ioe) if ioe.kind() == ErrorKind::UnexpectedEof) } @@ -142,10 +147,10 @@ impl MePool { seq_no: 0, crc_mode: hs.crc_mode, }; + let cancel_wr = cancel.clone(); let cleanup_done = Arc::new(AtomicBool::new(false)); let cleanup_for_writer = cleanup_done.clone(); - let pool_writer = Arc::downgrade(self); - let cancel_wr = cancel.clone(); + let pool_writer_task = Arc::downgrade(self); tokio::spawn(async move { loop { tokio::select! { @@ -163,15 +168,14 @@ impl MePool { _ = cancel_wr.cancelled() => break, } } - cancel_wr.cancel(); if cleanup_for_writer .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok() { - if let Some(pool) = pool_writer.upgrade() { + if let Some(pool) = pool_writer_task.upgrade() { pool.remove_writer_and_close_clients(writer_id).await; } else { - debug!(writer_id, "ME writer cleanup skipped: pool dropped"); + cancel_wr.cancel(); } } }); @@ -255,7 +259,6 @@ impl MePool { stats_reader_close.increment_me_idle_close_by_peer_total(); info!(writer_id, "ME socket closed by peer on idle writer"); } - cancel_reader_token.cancel(); if cleanup_for_reader .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok() @@ -263,12 +266,9 @@ impl MePool { if let Some(pool) = pool.upgrade() { pool.remove_writer_and_close_clients(writer_id).await; } else { - let remaining = writers_arc.read().await.len(); - debug!( - writer_id, - remaining, - "ME reader cleanup skipped: pool dropped" - ); + // Fallback for shutdown races: make writer task exit quickly so stale + // channels are observable by periodic prune. + cancel_reader_token.cancel(); } } if let Err(e) = res { @@ -276,6 +276,8 @@ impl MePool { warn!(error = %e, "ME reader ended"); } } + let remaining = writers_arc.read().await.len(); + debug!(writer_id, remaining, "ME reader task finished"); }); let pool_ping = Arc::downgrade(self); @@ -365,13 +367,12 @@ impl MePool { stats_ping.increment_me_keepalive_failed(); debug!("ME ping failed, removing dead writer"); cancel_ping.cancel(); - if cleanup_for_ping - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) - .is_ok() + if let Some(pool) = pool_ping.upgrade() + && cleanup_for_ping + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() { - if let Some(pool) = pool_ping.upgrade() { - pool.remove_writer_and_close_clients(writer_id).await; - } + pool.remove_writer_and_close_clients(writer_id).await; } break; } @@ -514,18 +515,49 @@ impl MePool { pub(crate) async fn remove_writer_and_close_clients(self: &Arc, writer_id: u64) { // Full client cleanup now happens inside `registry.writer_lost` to keep // writer reap/remove paths strictly non-blocking per connection. - let _ = self.remove_writer_only(writer_id).await; + let _ = self + .remove_writer_with_mode(writer_id, WriterTeardownMode::Any) + .await; } - async fn remove_writer_only(self: &Arc, writer_id: u64) -> Vec { + pub(super) async fn remove_draining_writer_hard_detach( + self: &Arc, + writer_id: u64, + ) -> bool { + self.remove_writer_with_mode(writer_id, WriterTeardownMode::DrainingOnly) + .await + } + + async fn remove_writer_only(self: &Arc, writer_id: u64) -> bool { + self.remove_writer_with_mode(writer_id, WriterTeardownMode::Any) + .await + } + + // Authoritative teardown primitive shared by normal cleanup and watchdog path. + // Lock-order invariant: + // 1) mutate `writers` under pool write lock, + // 2) release pool lock, + // 3) run registry/metrics/refill side effects. + // `registry.writer_lost` must never run while `writers` lock is held. + async fn remove_writer_with_mode( + self: &Arc, + writer_id: u64, + mode: WriterTeardownMode, + ) -> bool { let mut close_tx: Option> = None; let mut removed_addr: Option = None; let mut removed_dc: Option = None; let mut removed_uptime: Option = None; let mut trigger_refill = false; + let mut removed = false; { let mut ws = self.writers.write().await; if let Some(pos) = ws.iter().position(|w| w.id == writer_id) { + if matches!(mode, WriterTeardownMode::DrainingOnly) + && !ws[pos].draining.load(Ordering::Relaxed) + { + return false; + } let w = ws.remove(pos); let was_draining = w.draining.load(Ordering::Relaxed); if was_draining { @@ -542,6 +574,7 @@ impl MePool { } close_tx = Some(w.tx.clone()); self.conn_count.fetch_sub(1, Ordering::Relaxed); + removed = true; } } // State invariant: @@ -549,7 +582,7 @@ impl MePool { // - writer is removed from registry routing/binding maps via `writer_lost`. // The close command below is only a best-effort accelerator for task shutdown. // Cleanup progress must never depend on command-channel availability. - let conns = self.registry.writer_lost(writer_id).await; + let _ = self.registry.writer_lost(writer_id).await; { let mut tracker = self.ping_tracker.lock().await; tracker.retain(|_, (_, wid)| *wid != writer_id); @@ -576,22 +609,17 @@ impl MePool { } } } - // Quarantine flapping endpoints regardless of draining state — - // a rapidly dying endpoint is unstable whether it was draining or not. if let Some(addr) = removed_addr { if let Some(uptime) = removed_uptime { self.maybe_quarantine_flapping_endpoint(addr, uptime).await; } + if trigger_refill + && let Some(writer_dc) = removed_dc + { + self.trigger_immediate_refill_for_dc(addr, writer_dc); + } } - // Only trigger immediate refill for unexpected (non-draining) removals. - // Draining writers are intentionally being retired. - if trigger_refill - && let Some(addr) = removed_addr - && let Some(writer_dc) = removed_dc - { - self.trigger_immediate_refill_for_dc(addr, writer_dc); - } - conns + removed } pub(crate) async fn mark_writer_draining_with_timeout(