Compare commits

..

21 Commits

Author SHA1 Message Date
Alexey
342b0119dd Merge pull request #509 from telemt/bump
Update Cargo.toml
2026-03-20 16:27:39 +03:00
Alexey
2605929b93 Update Cargo.toml 2026-03-20 16:26:57 +03:00
Alexey
36814b6355 ME Draining on Dual-Stack + TLS Fetcher Upstream Selection: merge pull request #508 from telemt/flow
ME Draining on Dual-Stack + TLS Fetcher Upstream Selection
2026-03-20 16:24:17 +03:00
Alexey
269ba537ad ME Draining on Dual-Stack
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-03-20 16:07:12 +03:00
Alexey
5c0eb6dbe8 TLS Fetcher Upstream Selection
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-03-20 16:05:24 +03:00
Alexey
dd07fa9453 Merge pull request #505 from telemt/flow-me
Teardown Monitoring in API and Metrics
2026-03-20 12:59:39 +03:00
Alexey
bb1a372ac4 Merge branch 'main' into flow-me 2026-03-20 12:59:32 +03:00
Alexey
6661401a34 Merge pull request #506 from telemt/about-releases
Update README.md
2026-03-20 12:59:09 +03:00
Alexey
cd65fb432b Update README.md 2026-03-20 12:58:55 +03:00
Alexey
caf0717789 Merge branch 'main' into flow-me 2026-03-20 12:57:27 +03:00
Alexey
4a610d83a3 Update Cargo.toml
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-03-20 12:56:13 +03:00
Alexey
aba4205dcc Teardown Monitoring in Metrics
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-03-20 12:46:35 +03:00
Alexey
ef9b7b1492 Teardown Monitoring in API
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-03-20 12:45:53 +03:00
Alexey
d112f15b90 ME Writers Anti-stuck + Quarantine fixes + ME Writers Advanced Cleanup + Authoritative Teardown + Orphan Watchdog + Force-Close Safery Policy: merge pull request #504 from telemt/flow-me
ME Writers Anti-stuck + Quarantine fixes + ME Writers Advanced Cleanup + Authoritative Teardown + Orphan Watchdog + Force-Close Safery Policy
2026-03-20 12:41:45 +03:00
Alexey
b55b264345 Merge branch 'main' into flow-me 2026-03-20 12:20:51 +03:00
Alexey
f61d25ebe0 Authoritative Teardown + Orphan Watchdog + Force-Close Safery Policy
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-03-20 12:11:47 +03:00
Alexey
ed4d1167dd ME Writers Advanced Cleanup
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-03-20 12:09:23 +03:00
Alexey
dc6948cf39 Merge pull request #502 from telemt/about-releases
Update README.md
2026-03-20 11:25:19 +03:00
Alexey
4f11aa0772 Update README.md 2026-03-20 11:25:07 +03:00
Alexey
e40361b171 Cargo.toml + Cargo.lock
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
2026-03-20 00:45:04 +03:00
Alexey
1c6c73beda ME Writers Anti-stuck and Quarantine fixes
Co-Authored-By: Nook Scheel <nook@live.ru>
2026-03-20 00:41:40 +03:00
26 changed files with 2223 additions and 286 deletions

302
Cargo.lock generated
View File

@@ -45,15 +45,24 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
[[package]]
name = "anstyle"
version = "1.0.13"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78"
checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000"
[[package]]
name = "anyhow"
version = "1.0.101"
version = "1.0.102"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea"
checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c"
[[package]]
name = "arc-swap"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9f3647c145568cec02c42054e07bdf9a5a698e15b466fb2341bfc393cd24aa5"
dependencies = [
"rustversion",
]
[[package]]
name = "asn1-rs"
@@ -135,9 +144,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.10.0"
version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3"
checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af"
[[package]]
name = "block-buffer"
@@ -159,9 +168,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.19.1"
version = "3.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510"
checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb"
[[package]]
name = "bytes"
@@ -186,9 +195,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.2.55"
version = "1.2.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47b26a0954ae34af09b50f0de26458fa95369a0d478d8236d3f93082b219bd29"
checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423"
dependencies = [
"find-msvc-tools",
"shlex",
@@ -214,9 +223,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "chrono"
version = "0.4.43"
version = "0.4.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fac4744fb15ae8337dc853fee7fb3f4e48c0fbaa23d0afe49c447b4fab126118"
checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0"
dependencies = [
"iana-time-zone",
"js-sys",
@@ -265,18 +274,18 @@ dependencies = [
[[package]]
name = "clap"
version = "4.5.58"
version = "4.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63be97961acde393029492ce0be7a1af7e323e6bae9511ebfac33751be5e6806"
checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351"
dependencies = [
"clap_builder",
]
[[package]]
name = "clap_builder"
version = "4.5.58"
version = "4.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f13174bda5dfd69d7e947827e5af4b0f2f94a4a3ee92912fba07a66150f21e2"
checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f"
dependencies = [
"anstyle",
"clap_lex",
@@ -284,9 +293,9 @@ dependencies = [
[[package]]
name = "clap_lex"
version = "1.0.0"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831"
checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9"
[[package]]
name = "core-foundation-sys"
@@ -486,7 +495,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
]
[[package]]
@@ -572,9 +581,9 @@ dependencies = [
[[package]]
name = "futures"
version = "0.3.31"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d"
dependencies = [
"futures-channel",
"futures-core",
@@ -587,9 +596,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.31"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d"
dependencies = [
"futures-core",
"futures-sink",
@@ -597,15 +606,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.31"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
[[package]]
name = "futures-executor"
version = "0.3.31"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d"
dependencies = [
"futures-core",
"futures-task",
@@ -614,38 +623,38 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.31"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718"
[[package]]
name = "futures-macro"
version = "0.3.31"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
]
[[package]]
name = "futures-sink"
version = "0.3.31"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893"
[[package]]
name = "futures-task"
version = "0.3.31"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393"
[[package]]
name = "futures-util"
version = "0.3.31"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
dependencies = [
"futures-channel",
"futures-core",
@@ -655,7 +664,6 @@ dependencies = [
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
@@ -691,20 +699,20 @@ dependencies = [
"cfg-if",
"js-sys",
"libc",
"r-efi",
"r-efi 5.3.0",
"wasip2",
"wasm-bindgen",
]
[[package]]
name = "getrandom"
version = "0.4.1"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "139ef39800118c7683f2fd3c98c1b23c09ae076556b435f8e9064ae108aaeeec"
checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555"
dependencies = [
"cfg-if",
"libc",
"r-efi",
"r-efi 6.0.0",
"wasip2",
"wasip3",
]
@@ -894,7 +902,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
"socket2 0.6.2",
"socket2 0.6.3",
"tokio",
"tower-service",
"tracing",
@@ -1076,9 +1084,9 @@ dependencies = [
[[package]]
name = "ipnet"
version = "2.11.0"
version = "2.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130"
checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2"
[[package]]
name = "ipnetwork"
@@ -1127,9 +1135,9 @@ checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2"
[[package]]
name = "js-sys"
version = "0.3.85"
version = "0.3.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c942ebf8e95485ca0d52d97da7c5a2c387d0e7f0ba4c35e93bfcaee045955b3"
checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c"
dependencies = [
"once_cell",
"wasm-bindgen",
@@ -1169,26 +1177,27 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "libc"
version = "0.2.181"
version = "0.2.183"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "459427e2af2b9c839b132acb702a1c654d95e10f8c326bfc2ad11310e458b1c5"
checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d"
[[package]]
name = "libredox"
version = "0.1.12"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616"
checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a"
dependencies = [
"bitflags 2.10.0",
"bitflags 2.11.0",
"libc",
"redox_syscall 0.7.1",
"plain",
"redox_syscall 0.7.3",
]
[[package]]
name = "linux-raw-sys"
version = "0.11.0"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039"
checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53"
[[package]]
name = "litemap"
@@ -1295,7 +1304,7 @@ version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4"
dependencies = [
"bitflags 2.10.0",
"bitflags 2.11.0",
"cfg-if",
"cfg_aliases 0.1.1",
"libc",
@@ -1318,7 +1327,7 @@ version = "6.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
dependencies = [
"bitflags 2.10.0",
"bitflags 2.11.0",
"crossbeam-channel",
"filetime",
"fsevent-sys",
@@ -1385,9 +1394,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.21.3"
version = "1.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50"
[[package]]
name = "oorandom"
@@ -1426,9 +1435,9 @@ checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "pin-project-lite"
version = "0.2.16"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd"
[[package]]
name = "pin-utils"
@@ -1436,6 +1445,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "plain"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6"
[[package]]
name = "plotters"
version = "0.3.7"
@@ -1495,7 +1510,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b"
dependencies = [
"proc-macro2",
"syn 2.0.114",
"syn 2.0.117",
]
[[package]]
@@ -1515,7 +1530,7 @@ checksum = "37566cb3fdacef14c0737f9546df7cfeadbfbc9fef10991038bf5015d0c80532"
dependencies = [
"bit-set",
"bit-vec",
"bitflags 2.10.0",
"bitflags 2.11.0",
"num-traits",
"rand",
"rand_chacha",
@@ -1545,7 +1560,7 @@ dependencies = [
"quinn-udp",
"rustc-hash",
"rustls",
"socket2 0.6.2",
"socket2 0.6.3",
"thiserror 2.0.18",
"tokio",
"tracing",
@@ -1554,9 +1569,9 @@ dependencies = [
[[package]]
name = "quinn-proto"
version = "0.11.13"
version = "0.11.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31"
checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098"
dependencies = [
"bytes",
"getrandom 0.3.4",
@@ -1582,16 +1597,16 @@ dependencies = [
"cfg_aliases 0.2.1",
"libc",
"once_cell",
"socket2 0.6.2",
"socket2 0.6.3",
"tracing",
"windows-sys 0.60.2",
]
[[package]]
name = "quote"
version = "1.0.44"
version = "1.0.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b2ebcf727b7760c461f091f9f0f539b77b8e87f2fd88131e7f1b433b3cece4"
checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924"
dependencies = [
"proc-macro2",
]
@@ -1602,6 +1617,12 @@ version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "r-efi"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf"
[[package]]
name = "rand"
version = "0.9.2"
@@ -1666,16 +1687,16 @@ version = "0.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d"
dependencies = [
"bitflags 2.10.0",
"bitflags 2.11.0",
]
[[package]]
name = "redox_syscall"
version = "0.7.1"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35985aa610addc02e24fc232012c86fd11f14111180f902b67e2d5331f8ebf2b"
checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16"
dependencies = [
"bitflags 2.10.0",
"bitflags 2.11.0",
]
[[package]]
@@ -1703,9 +1724,9 @@ dependencies = [
[[package]]
name = "regex-syntax"
version = "0.8.9"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c"
checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a"
[[package]]
name = "reqwest"
@@ -1785,11 +1806,11 @@ dependencies = [
[[package]]
name = "rustix"
version = "1.1.3"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34"
checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190"
dependencies = [
"bitflags 2.10.0",
"bitflags 2.11.0",
"errno",
"libc",
"linux-raw-sys",
@@ -1798,9 +1819,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.23.36"
version = "0.23.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b"
checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4"
dependencies = [
"once_cell",
"ring",
@@ -1903,7 +1924,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
]
[[package]]
@@ -2011,12 +2032,12 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.6.2"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0"
checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e"
dependencies = [
"libc",
"windows-sys 0.60.2",
"windows-sys 0.61.2",
]
[[package]]
@@ -2044,9 +2065,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.114"
version = "2.0.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4d107df263a3013ef9b1879b0df87d706ff80f65a86ea879bd9c31f9b307c2a"
checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99"
dependencies = [
"proc-macro2",
"quote",
@@ -2082,15 +2103,16 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
]
[[package]]
name = "telemt"
version = "3.3.19"
version = "3.3.25"
dependencies = [
"aes",
"anyhow",
"arc-swap",
"base64",
"bytes",
"cbc",
@@ -2143,12 +2165,12 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.25.0"
version = "3.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1"
checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd"
dependencies = [
"fastrand",
"getrandom 0.4.1",
"getrandom 0.4.2",
"once_cell",
"rustix",
"windows-sys 0.61.2",
@@ -2180,7 +2202,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
]
[[package]]
@@ -2191,7 +2213,7 @@ checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
]
[[package]]
@@ -2256,9 +2278,9 @@ dependencies = [
[[package]]
name = "tinyvec"
version = "1.10.0"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa5fdc3bce6191a1dbc8c02d5c8bffcf557bafa17c124c5264a458f1b0613fa"
checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3"
dependencies = [
"tinyvec_macros",
]
@@ -2271,9 +2293,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.49.0"
version = "1.50.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72a2903cd7736441aac9df9d7688bd0ce48edccaadf181c3b90be801e81d3d86"
checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d"
dependencies = [
"bytes",
"libc",
@@ -2281,7 +2303,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.6.2",
"socket2 0.6.3",
"tokio-macros",
"tracing",
"windows-sys 0.61.2",
@@ -2289,13 +2311,13 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "2.6.0"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5"
checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
]
[[package]]
@@ -2409,7 +2431,7 @@ version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8"
dependencies = [
"bitflags 2.10.0",
"bitflags 2.11.0",
"bytes",
"futures-util",
"http",
@@ -2452,7 +2474,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
]
[[package]]
@@ -2478,9 +2500,9 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.22"
version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319"
dependencies = [
"matchers",
"nu-ansi-term",
@@ -2514,9 +2536,9 @@ checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94"
[[package]]
name = "unicode-ident"
version = "1.0.23"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "537dd038a89878be9b64dd4bd1b260315c1bb94f4d784956b81e27a088d9a09e"
checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75"
[[package]]
name = "unicode-xid"
@@ -2614,9 +2636,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen"
version = "0.2.108"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64024a30ec1e37399cf85a7ffefebdb72205ca1c972291c51512360d90bd8566"
checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e"
dependencies = [
"cfg-if",
"once_cell",
@@ -2627,9 +2649,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.58"
version = "0.4.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70a6e77fd0ae8029c9ea0063f87c46fde723e7d887703d74ad2616d792e51e6f"
checksum = "e9c5522b3a28661442748e09d40924dfb9ca614b21c00d3fd135720e48b67db8"
dependencies = [
"cfg-if",
"futures-util",
@@ -2641,9 +2663,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.108"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "008b239d9c740232e71bd39e8ef6429d27097518b6b30bdf9086833bd5b6d608"
checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -2651,22 +2673,22 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.108"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5256bae2d58f54820e6490f9839c49780dff84c65aeab9e772f15d5f0e913a55"
checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3"
dependencies = [
"bumpalo",
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.108"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f01b580c9ac74c8d8f0c0e4afb04eeef2acf145458e52c03845ee9cd23e3d12"
checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16"
dependencies = [
"unicode-ident",
]
@@ -2699,7 +2721,7 @@ version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe"
dependencies = [
"bitflags 2.10.0",
"bitflags 2.11.0",
"hashbrown 0.15.5",
"indexmap",
"semver",
@@ -2707,9 +2729,9 @@ dependencies = [
[[package]]
name = "web-sys"
version = "0.3.85"
version = "0.3.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "312e32e551d92129218ea9a2452120f4aabc03529ef03e4d0d82fb2780608598"
checksum = "854ba17bb104abfb26ba36da9729addc7ce7f06f5c0f90f3c391f8461cca21f9"
dependencies = [
"js-sys",
"wasm-bindgen",
@@ -2773,7 +2795,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
]
[[package]]
@@ -2784,7 +2806,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
]
[[package]]
@@ -3035,9 +3057,9 @@ checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
[[package]]
name = "winnow"
version = "0.7.14"
version = "0.7.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a5364e9d77fcdeeaa6062ced926ee3381faa2ee02d3eb83a5c27a8825540829"
checksum = "df79d97927682d2fd8adb29682d1140b343be4ac0f08fd68b7765d9c059d3945"
dependencies = [
"memchr",
]
@@ -3072,7 +3094,7 @@ dependencies = [
"heck",
"indexmap",
"prettyplease",
"syn 2.0.114",
"syn 2.0.117",
"wasm-metadata",
"wit-bindgen-core",
"wit-component",
@@ -3088,7 +3110,7 @@ dependencies = [
"prettyplease",
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
"wit-bindgen-core",
"wit-bindgen-rust",
]
@@ -3100,7 +3122,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2"
dependencies = [
"anyhow",
"bitflags 2.10.0",
"bitflags 2.11.0",
"indexmap",
"log",
"serde",
@@ -3172,28 +3194,28 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
"synstructure 0.13.2",
]
[[package]]
name = "zerocopy"
version = "0.8.39"
version = "0.8.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db6d35d663eadb6c932438e763b262fe1a70987f9ae936e60158176d710cae4a"
checksum = "efbb2a062be311f2ba113ce66f697a4dc589f85e78a4aea276200804cea0ed87"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.39"
version = "0.8.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4122cd3169e94605190e77839c9a40d40ed048d305bfdc146e7df40ab0f3e517"
checksum = "0e8bc7269b54418e7aeeef514aa68f8690b8c0489a06b0136e5f57c4c5ccab89"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
]
[[package]]
@@ -3213,7 +3235,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
"synstructure 0.13.2",
]
@@ -3234,7 +3256,7 @@ checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
]
[[package]]
@@ -3267,11 +3289,11 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
"syn 2.0.117",
]
[[package]]
name = "zmij"
version = "1.0.20"
version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4de98dfa5d5b7fef4ee834d0073d560c9ca7b6c46a71d058c48db7960f8cfaf7"
checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa"

View File

@@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.3.25"
version = "3.3.28"
edition = "2024"
[dependencies]
@@ -40,6 +40,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
parking_lot = "0.12"
dashmap = "5.5"
arc-swap = "1.7"
lru = "0.16"
rand = "0.9"
chrono = { version = "0.4", features = ["serde"] }

View File

@@ -19,9 +19,9 @@
### 🇷🇺 RU
#### Релиз 3.3.15 Semistable
#### О релизах
[3.3.15](https://github.com/telemt/telemt/releases/tag/3.3.15) по итогам работы в продакшн признан одним из самых стабильных и рекомендуется к использованию, когда cutting-edge фичи некритичны!
[3.3.27](https://github.com/telemt/telemt/releases/tag/3.3.27) даёт баланс стабильности и передового функционала, а так же последние исправления по безопасности и багам
Будем рады вашему фидбеку и предложениям по улучшению — особенно в части **API**, **статистики**, **UX**
@@ -40,9 +40,9 @@
### 🇬🇧 EN
#### Release 3.3.15 Semistable
#### About releases
[3.3.15](https://github.com/telemt/telemt/releases/tag/3.3.15) is, based on the results of his work in production, recognized as one of the most stable and recommended for use when cutting-edge features are not so necessary!
[3.3.27](https://github.com/telemt/telemt/releases/tag/3.3.27) provides a balance of stability and advanced functionality, as well as the latest security and bug fixes
We are looking forward to your feedback and improvement proposals — especially regarding **API**, **statistics**, **UX**

View File

@@ -205,6 +205,16 @@ pub(super) struct ZeroPoolData {
pub(super) refill_failed_total: u64,
pub(super) writer_restored_same_endpoint_total: u64,
pub(super) writer_restored_fallback_total: u64,
pub(super) teardown_attempt_total_normal: u64,
pub(super) teardown_attempt_total_hard_detach: u64,
pub(super) teardown_success_total_normal: u64,
pub(super) teardown_success_total_hard_detach: u64,
pub(super) teardown_timeout_total: u64,
pub(super) teardown_escalation_total: u64,
pub(super) teardown_noop_total: u64,
pub(super) teardown_cleanup_side_effect_failures_total: u64,
pub(super) teardown_duration_count_total: u64,
pub(super) teardown_duration_sum_seconds_total: f64,
}
#[derive(Serialize, Clone)]

View File

@@ -4,6 +4,9 @@ use std::time::{SystemTime, UNIX_EPOCH};
use serde::Serialize;
use crate::config::ProxyConfig;
use crate::stats::{
MeWriterCleanupSideEffectStep, MeWriterTeardownMode, MeWriterTeardownReason, Stats,
};
use super::ApiShared;
@@ -98,6 +101,50 @@ pub(super) struct RuntimeMeQualityCountersData {
pub(super) reconnect_success_total: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityTeardownAttemptData {
pub(super) reason: &'static str,
pub(super) mode: &'static str,
pub(super) total: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityTeardownSuccessData {
pub(super) mode: &'static str,
pub(super) total: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityTeardownSideEffectData {
pub(super) step: &'static str,
pub(super) total: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityTeardownDurationBucketData {
pub(super) le_seconds: &'static str,
pub(super) total: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityTeardownDurationData {
pub(super) mode: &'static str,
pub(super) count: u64,
pub(super) sum_seconds: f64,
pub(super) buckets: Vec<RuntimeMeQualityTeardownDurationBucketData>,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityTeardownData {
pub(super) attempts: Vec<RuntimeMeQualityTeardownAttemptData>,
pub(super) success: Vec<RuntimeMeQualityTeardownSuccessData>,
pub(super) timeout_total: u64,
pub(super) escalation_total: u64,
pub(super) noop_total: u64,
pub(super) cleanup_side_effect_failures: Vec<RuntimeMeQualityTeardownSideEffectData>,
pub(super) duration: Vec<RuntimeMeQualityTeardownDurationData>,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityRouteDropData {
pub(super) no_conn_total: u64,
@@ -107,6 +154,25 @@ pub(super) struct RuntimeMeQualityRouteDropData {
pub(super) queue_full_high_total: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityFamilyStateData {
pub(super) family: &'static str,
pub(super) state: &'static str,
pub(super) state_since_epoch_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) suppressed_until_epoch_secs: Option<u64>,
pub(super) fail_streak: u32,
pub(super) recover_success_streak: u32,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityDrainGateData {
pub(super) route_quorum_ok: bool,
pub(super) redundancy_ok: bool,
pub(super) block_reason: &'static str,
pub(super) updated_at_epoch_secs: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityDcRttData {
pub(super) dc: i16,
@@ -120,7 +186,10 @@ pub(super) struct RuntimeMeQualityDcRttData {
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityPayload {
pub(super) counters: RuntimeMeQualityCountersData,
pub(super) teardown: RuntimeMeQualityTeardownData,
pub(super) route_drops: RuntimeMeQualityRouteDropData,
pub(super) family_states: Vec<RuntimeMeQualityFamilyStateData>,
pub(super) drain_gate: RuntimeMeQualityDrainGateData,
pub(super) dc_rtt: Vec<RuntimeMeQualityDcRttData>,
}
@@ -361,6 +430,19 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime
};
let status = pool.api_status_snapshot().await;
let family_states = pool
.api_family_state_snapshot()
.into_iter()
.map(|entry| RuntimeMeQualityFamilyStateData {
family: entry.family,
state: entry.state,
state_since_epoch_secs: entry.state_since_epoch_secs,
suppressed_until_epoch_secs: entry.suppressed_until_epoch_secs,
fail_streak: entry.fail_streak,
recover_success_streak: entry.recover_success_streak,
})
.collect();
let drain_gate_snapshot = pool.api_drain_gate_snapshot();
RuntimeMeQualityData {
enabled: true,
reason: None,
@@ -374,6 +456,7 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime
reconnect_attempt_total: shared.stats.get_me_reconnect_attempts(),
reconnect_success_total: shared.stats.get_me_reconnect_success(),
},
teardown: build_runtime_me_teardown_data(shared),
route_drops: RuntimeMeQualityRouteDropData {
no_conn_total: shared.stats.get_me_route_drop_no_conn(),
channel_closed_total: shared.stats.get_me_route_drop_channel_closed(),
@@ -381,6 +464,13 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime
queue_full_base_total: shared.stats.get_me_route_drop_queue_full_base(),
queue_full_high_total: shared.stats.get_me_route_drop_queue_full_high(),
},
family_states,
drain_gate: RuntimeMeQualityDrainGateData {
route_quorum_ok: drain_gate_snapshot.route_quorum_ok,
redundancy_ok: drain_gate_snapshot.redundancy_ok,
block_reason: drain_gate_snapshot.block_reason,
updated_at_epoch_secs: drain_gate_snapshot.updated_at_epoch_secs,
},
dc_rtt: status
.dcs
.into_iter()
@@ -397,6 +487,81 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime
}
}
fn build_runtime_me_teardown_data(shared: &ApiShared) -> RuntimeMeQualityTeardownData {
let attempts = MeWriterTeardownReason::ALL
.iter()
.copied()
.flat_map(|reason| {
MeWriterTeardownMode::ALL
.iter()
.copied()
.map(move |mode| RuntimeMeQualityTeardownAttemptData {
reason: reason.as_str(),
mode: mode.as_str(),
total: shared.stats.get_me_writer_teardown_attempt_total(reason, mode),
})
})
.collect();
let success = MeWriterTeardownMode::ALL
.iter()
.copied()
.map(|mode| RuntimeMeQualityTeardownSuccessData {
mode: mode.as_str(),
total: shared.stats.get_me_writer_teardown_success_total(mode),
})
.collect();
let cleanup_side_effect_failures = MeWriterCleanupSideEffectStep::ALL
.iter()
.copied()
.map(|step| RuntimeMeQualityTeardownSideEffectData {
step: step.as_str(),
total: shared
.stats
.get_me_writer_cleanup_side_effect_failures_total(step),
})
.collect();
let duration = MeWriterTeardownMode::ALL
.iter()
.copied()
.map(|mode| {
let count = shared.stats.get_me_writer_teardown_duration_count(mode);
let mut buckets: Vec<RuntimeMeQualityTeardownDurationBucketData> = Stats::me_writer_teardown_duration_bucket_labels()
.iter()
.enumerate()
.map(|(bucket_idx, label)| RuntimeMeQualityTeardownDurationBucketData {
le_seconds: label,
total: shared
.stats
.get_me_writer_teardown_duration_bucket_total(mode, bucket_idx),
})
.collect();
buckets.push(RuntimeMeQualityTeardownDurationBucketData {
le_seconds: "+Inf",
total: count,
});
RuntimeMeQualityTeardownDurationData {
mode: mode.as_str(),
count,
sum_seconds: shared.stats.get_me_writer_teardown_duration_sum_seconds(mode),
buckets,
}
})
.collect();
RuntimeMeQualityTeardownData {
attempts,
success,
timeout_total: shared.stats.get_me_writer_teardown_timeout_total(),
escalation_total: shared.stats.get_me_writer_teardown_escalation_total(),
noop_total: shared.stats.get_me_writer_teardown_noop_total(),
cleanup_side_effect_failures,
duration,
}
}
pub(super) async fn build_runtime_upstream_quality_data(
shared: &ApiShared,
) -> RuntimeUpstreamQualityData {

View File

@@ -1,7 +1,7 @@
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use crate::config::ApiConfig;
use crate::stats::Stats;
use crate::stats::{MeWriterTeardownMode, Stats};
use crate::transport::upstream::IpPreference;
use crate::transport::UpstreamRouteKind;
@@ -106,6 +106,29 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer
refill_failed_total: stats.get_me_refill_failed_total(),
writer_restored_same_endpoint_total: stats.get_me_writer_restored_same_endpoint_total(),
writer_restored_fallback_total: stats.get_me_writer_restored_fallback_total(),
teardown_attempt_total_normal: stats
.get_me_writer_teardown_attempt_total_by_mode(MeWriterTeardownMode::Normal),
teardown_attempt_total_hard_detach: stats
.get_me_writer_teardown_attempt_total_by_mode(MeWriterTeardownMode::HardDetach),
teardown_success_total_normal: stats
.get_me_writer_teardown_success_total(MeWriterTeardownMode::Normal),
teardown_success_total_hard_detach: stats
.get_me_writer_teardown_success_total(MeWriterTeardownMode::HardDetach),
teardown_timeout_total: stats.get_me_writer_teardown_timeout_total(),
teardown_escalation_total: stats.get_me_writer_teardown_escalation_total(),
teardown_noop_total: stats.get_me_writer_teardown_noop_total(),
teardown_cleanup_side_effect_failures_total: stats
.get_me_writer_cleanup_side_effect_failures_total_all(),
teardown_duration_count_total: stats
.get_me_writer_teardown_duration_count(MeWriterTeardownMode::Normal)
.saturating_add(
stats.get_me_writer_teardown_duration_count(MeWriterTeardownMode::HardDetach),
),
teardown_duration_sum_seconds_total: stats
.get_me_writer_teardown_duration_sum_seconds(MeWriterTeardownMode::Normal)
+ stats.get_me_writer_teardown_duration_sum_seconds(
MeWriterTeardownMode::HardDetach,
),
},
desync: ZeroDesyncData {
secure_padding_invalid_total: stats.get_secure_padding_invalid(),

View File

@@ -199,8 +199,14 @@ update_every = 43200
hardswap = false
me_pool_drain_ttl_secs = 90
me_instadrain = false
me_pool_drain_threshold = 32
me_pool_drain_soft_evict_grace_secs = 10
me_pool_drain_soft_evict_per_writer = 2
me_pool_drain_soft_evict_budget_per_core = 16
me_pool_drain_soft_evict_cooldown_ms = 1000
me_bind_stale_mode = "never"
me_pool_min_fresh_ratio = 0.8
me_reinit_drain_timeout_secs = 120
me_reinit_drain_timeout_secs = 90
[network]
ipv4 = true

View File

@@ -40,10 +40,10 @@ const DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS: u64 = 3000;
const DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS: u64 = 250;
const DEFAULT_ME_C2ME_SEND_TIMEOUT_MS: u64 = 4000;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED: bool = true;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 30;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 1;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 8;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 5000;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 10;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 2;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 16;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 1000;
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
const DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS: u64 = 250;
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
@@ -65,6 +65,10 @@ pub(crate) fn default_tls_domain() -> String {
"petrovich.ru".to_string()
}
pub(crate) fn default_tls_fetch_scope() -> String {
String::new()
}
pub(crate) fn default_mask_port() -> u16 {
443
}
@@ -606,7 +610,7 @@ pub(crate) fn default_proxy_secret_len_max() -> usize {
}
pub(crate) fn default_me_reinit_drain_timeout_secs() -> u64 {
120
90
}
pub(crate) fn default_me_pool_drain_ttl_secs() -> u64 {
@@ -618,7 +622,7 @@ pub(crate) fn default_me_instadrain() -> bool {
}
pub(crate) fn default_me_pool_drain_threshold() -> u64 {
128
32
}
pub(crate) fn default_me_pool_drain_soft_evict_enabled() -> bool {

View File

@@ -623,6 +623,7 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
}
if old.censorship.tls_domain != new.censorship.tls_domain
|| old.censorship.tls_domains != new.censorship.tls_domains
|| old.censorship.tls_fetch_scope != new.censorship.tls_fetch_scope
|| old.censorship.mask != new.censorship.mask
|| old.censorship.mask_host != new.censorship.mask_host
|| old.censorship.mask_port != new.censorship.mask_port

View File

@@ -779,6 +779,9 @@ impl ProxyConfig {
config.censorship.mask_host = Some(config.censorship.tls_domain.clone());
}
// Normalize optional TLS fetch scope: whitespace-only values disable scoped routing.
config.censorship.tls_fetch_scope = config.censorship.tls_fetch_scope.trim().to_string();
// Merge primary + extra TLS domains, deduplicate (primary always first).
if !config.censorship.tls_domains.is_empty() {
let mut all = Vec::with_capacity(1 + config.censorship.tls_domains.len());
@@ -2037,6 +2040,45 @@ mod tests {
let _ = std::fs::remove_file(path);
}
#[test]
fn force_close_default_matches_drain_ttl() {
let toml = r#"
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_force_close_default_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert_eq!(cfg.general.me_reinit_drain_timeout_secs, 90);
assert_eq!(cfg.general.effective_me_pool_force_close_secs(), 90);
let _ = std::fs::remove_file(path);
}
#[test]
fn force_close_zero_uses_runtime_safety_fallback() {
let toml = r#"
[general]
me_reinit_drain_timeout_secs = 0
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_force_close_zero_fallback_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert_eq!(cfg.general.me_reinit_drain_timeout_secs, 0);
assert_eq!(cfg.general.effective_me_pool_force_close_secs(), 300);
let _ = std::fs::remove_file(path);
}
#[test]
fn force_close_bumped_when_below_drain_ttl() {
let toml = r#"
@@ -2058,6 +2100,59 @@ mod tests {
let _ = std::fs::remove_file(path);
}
#[test]
fn tls_fetch_scope_default_is_empty() {
let toml = r#"
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_tls_fetch_scope_default_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert!(cfg.censorship.tls_fetch_scope.is_empty());
let _ = std::fs::remove_file(path);
}
#[test]
fn tls_fetch_scope_is_trimmed_during_load() {
let toml = r#"
[censorship]
tls_domain = "example.com"
tls_fetch_scope = " me "
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_tls_fetch_scope_trim_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert_eq!(cfg.censorship.tls_fetch_scope, "me");
let _ = std::fs::remove_file(path);
}
#[test]
fn tls_fetch_scope_whitespace_becomes_empty() {
let toml = r#"
[censorship]
tls_domain = "example.com"
tls_fetch_scope = " "
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_tls_fetch_scope_blank_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert!(cfg.censorship.tls_fetch_scope.is_empty());
let _ = std::fs::remove_file(path);
}
#[test]
fn invalid_ad_tag_is_disabled_during_load() {
let toml = r#"

View File

@@ -135,8 +135,8 @@ impl MeSocksKdfPolicy {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum MeBindStaleMode {
Never,
#[default]
Never,
Ttl,
Always,
}
@@ -855,7 +855,7 @@ pub struct GeneralConfig {
pub me_pool_min_fresh_ratio: f32,
/// Drain timeout in seconds for stale ME writers after endpoint map changes.
/// Set to 0 to keep stale writers draining indefinitely (no force-close).
/// Set to 0 to use the runtime safety fallback timeout.
#[serde(default = "default_me_reinit_drain_timeout_secs")]
pub me_reinit_drain_timeout_secs: u64,
@@ -1068,8 +1068,13 @@ impl GeneralConfig {
/// Resolve force-close timeout for stale writers.
/// `me_reinit_drain_timeout_secs` remains backward-compatible alias.
/// A configured `0` uses the runtime safety fallback (300s).
pub fn effective_me_pool_force_close_secs(&self) -> u64 {
self.me_reinit_drain_timeout_secs
if self.me_reinit_drain_timeout_secs == 0 {
300
} else {
self.me_reinit_drain_timeout_secs
}
}
}
@@ -1303,6 +1308,11 @@ pub struct AntiCensorshipConfig {
#[serde(default)]
pub tls_domains: Vec<String>,
/// Upstream scope used for TLS front metadata fetches.
/// Empty value keeps default upstream routing behavior.
#[serde(default = "default_tls_fetch_scope")]
pub tls_fetch_scope: String,
#[serde(default = "default_true")]
pub mask: bool,
@@ -1360,6 +1370,7 @@ impl Default for AntiCensorshipConfig {
Self {
tls_domain: default_tls_domain(),
tls_domains: Vec::new(),
tls_fetch_scope: default_tls_fetch_scope(),
mask: default_true(),
mask_host: None,
mask_port: default_mask_port(),

View File

@@ -332,25 +332,76 @@ pub(crate) async fn initialize_me_pool(
"Middle-End pool initialized successfully"
);
let pool_health = pool_bg.clone();
let rng_health = rng_bg.clone();
let min_conns = pool_size;
tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor(
pool_health,
rng_health,
min_conns,
)
.await;
});
let pool_drain_enforcer = pool_bg.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(
pool_drain_enforcer,
)
.await;
});
break;
// ── Supervised background tasks ──────────────────
// Each task runs inside a nested tokio::spawn so
// that a panic is caught via JoinHandle and the
// outer loop restarts the task automatically.
let pool_health = pool_bg.clone();
let rng_health = rng_bg.clone();
let min_conns = pool_size;
tokio::spawn(async move {
loop {
let p = pool_health.clone();
let r = rng_health.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor(
p, r, min_conns,
)
.await;
})
.await;
match res {
Ok(()) => warn!("me_health_monitor exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_health_monitor panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
let pool_drain_enforcer = pool_bg.clone();
tokio::spawn(async move {
loop {
let p = pool_drain_enforcer.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(p).await;
})
.await;
match res {
Ok(()) => warn!("me_drain_timeout_enforcer exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_drain_timeout_enforcer panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
let pool_watchdog = pool_bg.clone();
tokio::spawn(async move {
loop {
let p = pool_watchdog.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_zombie_writer_watchdog(p).await;
})
.await;
match res {
Ok(()) => warn!("me_zombie_writer_watchdog exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_zombie_writer_watchdog panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
// CRITICAL: keep the current-thread runtime
// alive. Without this, block_on() returns,
// the Runtime is dropped, and ALL spawned
// background tasks (health monitor, drain
// enforcer, zombie watchdog) are silently
// cancelled — causing the draining-writer
// leak that brought us here.
std::future::pending::<()>().await;
unreachable!();
}
Err(e) => {
startup_tracker_bg.set_me_last_error(Some(e.to_string())).await;
@@ -408,23 +459,65 @@ pub(crate) async fn initialize_me_pool(
"Middle-End pool initialized successfully"
);
let pool_clone = pool.clone();
let rng_clone = rng.clone();
let min_conns = pool_size;
tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor(
pool_clone, rng_clone, min_conns,
)
.await;
});
let pool_drain_enforcer = pool.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(
pool_drain_enforcer,
)
.await;
});
// ── Supervised background tasks ──────────────────
let pool_clone = pool.clone();
let rng_clone = rng.clone();
let min_conns = pool_size;
tokio::spawn(async move {
loop {
let p = pool_clone.clone();
let r = rng_clone.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor(
p, r, min_conns,
)
.await;
})
.await;
match res {
Ok(()) => warn!("me_health_monitor exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_health_monitor panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
let pool_drain_enforcer = pool.clone();
tokio::spawn(async move {
loop {
let p = pool_drain_enforcer.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(p).await;
})
.await;
match res {
Ok(()) => warn!("me_drain_timeout_enforcer exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_drain_timeout_enforcer panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
let pool_watchdog = pool.clone();
tokio::spawn(async move {
loop {
let p = pool_watchdog.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_zombie_writer_watchdog(p).await;
})
.await;
match res {
Ok(()) => warn!("me_zombie_writer_watchdog exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_zombie_writer_watchdog panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
break Some(pool);
}
Err(e) => {

View File

@@ -38,12 +38,15 @@ pub(crate) async fn bootstrap_tls_front(
.clone()
.unwrap_or_else(|| config.censorship.tls_domain.clone());
let mask_unix_sock = config.censorship.mask_unix_sock.clone();
let tls_fetch_scope = (!config.censorship.tls_fetch_scope.is_empty())
.then(|| config.censorship.tls_fetch_scope.clone());
let fetch_timeout = Duration::from_secs(5);
let cache_initial = cache.clone();
let domains_initial = tls_domains.to_vec();
let host_initial = mask_host.clone();
let unix_sock_initial = mask_unix_sock.clone();
let scope_initial = tls_fetch_scope.clone();
let upstream_initial = upstream_manager.clone();
tokio::spawn(async move {
let mut join = tokio::task::JoinSet::new();
@@ -51,6 +54,7 @@ pub(crate) async fn bootstrap_tls_front(
let cache_domain = cache_initial.clone();
let host_domain = host_initial.clone();
let unix_sock_domain = unix_sock_initial.clone();
let scope_domain = scope_initial.clone();
let upstream_domain = upstream_initial.clone();
join.spawn(async move {
match crate::tls_front::fetcher::fetch_real_tls(
@@ -59,6 +63,7 @@ pub(crate) async fn bootstrap_tls_front(
&domain,
fetch_timeout,
Some(upstream_domain),
scope_domain.as_deref(),
proxy_protocol,
unix_sock_domain.as_deref(),
)
@@ -100,6 +105,7 @@ pub(crate) async fn bootstrap_tls_front(
let domains_refresh = tls_domains.to_vec();
let host_refresh = mask_host.clone();
let unix_sock_refresh = mask_unix_sock.clone();
let scope_refresh = tls_fetch_scope.clone();
let upstream_refresh = upstream_manager.clone();
tokio::spawn(async move {
loop {
@@ -112,6 +118,7 @@ pub(crate) async fn bootstrap_tls_front(
let cache_domain = cache_refresh.clone();
let host_domain = host_refresh.clone();
let unix_sock_domain = unix_sock_refresh.clone();
let scope_domain = scope_refresh.clone();
let upstream_domain = upstream_refresh.clone();
join.spawn(async move {
match crate::tls_front::fetcher::fetch_real_tls(
@@ -120,6 +127,7 @@ pub(crate) async fn bootstrap_tls_front(
&domain,
fetch_timeout,
Some(upstream_domain),
scope_domain.as_deref(),
proxy_protocol,
unix_sock_domain.as_deref(),
)

View File

@@ -16,7 +16,9 @@ use tracing::{info, warn, debug};
use crate::config::ProxyConfig;
use crate::ip_tracker::UserIpTracker;
use crate::stats::beobachten::BeobachtenStore;
use crate::stats::Stats;
use crate::stats::{
MeWriterCleanupSideEffectStep, MeWriterTeardownMode, MeWriterTeardownReason, Stats,
};
use crate::transport::{ListenOptions, create_listener};
pub async fn serve(
@@ -1770,6 +1772,169 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_teardown_attempt_total ME writer teardown attempts by reason and mode"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_attempt_total counter");
for reason in MeWriterTeardownReason::ALL {
for mode in MeWriterTeardownMode::ALL {
let _ = writeln!(
out,
"telemt_me_writer_teardown_attempt_total{{reason=\"{}\",mode=\"{}\"}} {}",
reason.as_str(),
mode.as_str(),
if me_allows_normal {
stats.get_me_writer_teardown_attempt_total(reason, mode)
} else {
0
}
);
}
}
let _ = writeln!(
out,
"# HELP telemt_me_writer_teardown_success_total ME writer teardown successes by mode"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_success_total counter");
for mode in MeWriterTeardownMode::ALL {
let _ = writeln!(
out,
"telemt_me_writer_teardown_success_total{{mode=\"{}\"}} {}",
mode.as_str(),
if me_allows_normal {
stats.get_me_writer_teardown_success_total(mode)
} else {
0
}
);
}
let _ = writeln!(
out,
"# HELP telemt_me_writer_teardown_timeout_total Teardown operations that timed out"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_timeout_total counter");
let _ = writeln!(
out,
"telemt_me_writer_teardown_timeout_total {}",
if me_allows_normal {
stats.get_me_writer_teardown_timeout_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_teardown_escalation_total Watchdog teardown escalations to hard detach"
);
let _ = writeln!(
out,
"# TYPE telemt_me_writer_teardown_escalation_total counter"
);
let _ = writeln!(
out,
"telemt_me_writer_teardown_escalation_total {}",
if me_allows_normal {
stats.get_me_writer_teardown_escalation_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_teardown_noop_total Teardown operations that became no-op"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_noop_total counter");
let _ = writeln!(
out,
"telemt_me_writer_teardown_noop_total {}",
if me_allows_normal {
stats.get_me_writer_teardown_noop_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_teardown_duration_seconds ME writer teardown latency histogram by mode"
);
let _ = writeln!(
out,
"# TYPE telemt_me_writer_teardown_duration_seconds histogram"
);
let bucket_labels = Stats::me_writer_teardown_duration_bucket_labels();
for mode in MeWriterTeardownMode::ALL {
for (bucket_idx, label) in bucket_labels.iter().enumerate() {
let _ = writeln!(
out,
"telemt_me_writer_teardown_duration_seconds_bucket{{mode=\"{}\",le=\"{}\"}} {}",
mode.as_str(),
label,
if me_allows_normal {
stats.get_me_writer_teardown_duration_bucket_total(mode, bucket_idx)
} else {
0
}
);
}
let _ = writeln!(
out,
"telemt_me_writer_teardown_duration_seconds_bucket{{mode=\"{}\",le=\"+Inf\"}} {}",
mode.as_str(),
if me_allows_normal {
stats.get_me_writer_teardown_duration_count(mode)
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_teardown_duration_seconds_sum{{mode=\"{}\"}} {:.6}",
mode.as_str(),
if me_allows_normal {
stats.get_me_writer_teardown_duration_sum_seconds(mode)
} else {
0.0
}
);
let _ = writeln!(
out,
"telemt_me_writer_teardown_duration_seconds_count{{mode=\"{}\"}} {}",
mode.as_str(),
if me_allows_normal {
stats.get_me_writer_teardown_duration_count(mode)
} else {
0
}
);
}
let _ = writeln!(
out,
"# HELP telemt_me_writer_cleanup_side_effect_failures_total Failed cleanup side effects by step"
);
let _ = writeln!(
out,
"# TYPE telemt_me_writer_cleanup_side_effect_failures_total counter"
);
for step in MeWriterCleanupSideEffectStep::ALL {
let _ = writeln!(
out,
"telemt_me_writer_cleanup_side_effect_failures_total{{step=\"{}\"}} {}",
step.as_str(),
if me_allows_normal {
stats.get_me_writer_cleanup_side_effect_failures_total(step)
} else {
0
}
);
}
let _ = writeln!(out, "# HELP telemt_me_refill_triggered_total Immediate ME refill runs started");
let _ = writeln!(out, "# TYPE telemt_me_refill_triggered_total counter");
let _ = writeln!(
@@ -2175,6 +2340,17 @@ mod tests {
assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"));
assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_teardown_attempt_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_teardown_success_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_teardown_timeout_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_teardown_escalation_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_teardown_noop_total counter"));
assert!(output.contains(
"# TYPE telemt_me_writer_teardown_duration_seconds histogram"
));
assert!(output.contains(
"# TYPE telemt_me_writer_cleanup_side_effect_failures_total counter"
));
assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter"));
assert!(output.contains(
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"

View File

@@ -19,6 +19,137 @@ use tracing::debug;
use crate::config::{MeTelemetryLevel, MeWriterPickMode};
use self::telemetry::TelemetryPolicy;
const ME_WRITER_TEARDOWN_MODE_COUNT: usize = 2;
const ME_WRITER_TEARDOWN_REASON_COUNT: usize = 11;
const ME_WRITER_CLEANUP_SIDE_EFFECT_STEP_COUNT: usize = 2;
const ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT: usize = 12;
const ME_WRITER_TEARDOWN_DURATION_BUCKET_BOUNDS_MICROS: [u64; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT] = [
1_000,
5_000,
10_000,
25_000,
50_000,
100_000,
250_000,
500_000,
1_000_000,
2_500_000,
5_000_000,
10_000_000,
];
const ME_WRITER_TEARDOWN_DURATION_BUCKET_LABELS: [&str; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT] = [
"0.001",
"0.005",
"0.01",
"0.025",
"0.05",
"0.1",
"0.25",
"0.5",
"1",
"2.5",
"5",
"10",
];
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum MeWriterTeardownMode {
Normal = 0,
HardDetach = 1,
}
impl MeWriterTeardownMode {
pub const ALL: [Self; ME_WRITER_TEARDOWN_MODE_COUNT] =
[Self::Normal, Self::HardDetach];
pub const fn as_str(self) -> &'static str {
match self {
Self::Normal => "normal",
Self::HardDetach => "hard_detach",
}
}
const fn idx(self) -> usize {
self as usize
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum MeWriterTeardownReason {
ReaderExit = 0,
WriterTaskExit = 1,
PingSendFail = 2,
SignalSendFail = 3,
RouteChannelClosed = 4,
CloseRpcChannelClosed = 5,
PruneClosedWriter = 6,
ReapTimeoutExpired = 7,
ReapThresholdForce = 8,
ReapEmpty = 9,
WatchdogStuckDraining = 10,
}
impl MeWriterTeardownReason {
pub const ALL: [Self; ME_WRITER_TEARDOWN_REASON_COUNT] = [
Self::ReaderExit,
Self::WriterTaskExit,
Self::PingSendFail,
Self::SignalSendFail,
Self::RouteChannelClosed,
Self::CloseRpcChannelClosed,
Self::PruneClosedWriter,
Self::ReapTimeoutExpired,
Self::ReapThresholdForce,
Self::ReapEmpty,
Self::WatchdogStuckDraining,
];
pub const fn as_str(self) -> &'static str {
match self {
Self::ReaderExit => "reader_exit",
Self::WriterTaskExit => "writer_task_exit",
Self::PingSendFail => "ping_send_fail",
Self::SignalSendFail => "signal_send_fail",
Self::RouteChannelClosed => "route_channel_closed",
Self::CloseRpcChannelClosed => "close_rpc_channel_closed",
Self::PruneClosedWriter => "prune_closed_writer",
Self::ReapTimeoutExpired => "reap_timeout_expired",
Self::ReapThresholdForce => "reap_threshold_force",
Self::ReapEmpty => "reap_empty",
Self::WatchdogStuckDraining => "watchdog_stuck_draining",
}
}
const fn idx(self) -> usize {
self as usize
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum MeWriterCleanupSideEffectStep {
CloseSignalChannelFull = 0,
CloseSignalChannelClosed = 1,
}
impl MeWriterCleanupSideEffectStep {
pub const ALL: [Self; ME_WRITER_CLEANUP_SIDE_EFFECT_STEP_COUNT] =
[Self::CloseSignalChannelFull, Self::CloseSignalChannelClosed];
pub const fn as_str(self) -> &'static str {
match self {
Self::CloseSignalChannelFull => "close_signal_channel_full",
Self::CloseSignalChannelClosed => "close_signal_channel_closed",
}
}
const fn idx(self) -> usize {
self as usize
}
}
// ============= Stats =============
#[derive(Default)]
@@ -128,6 +259,18 @@ pub struct Stats {
me_draining_writers_reap_progress_total: AtomicU64,
me_writer_removed_total: AtomicU64,
me_writer_removed_unexpected_total: AtomicU64,
me_writer_teardown_attempt_total:
[[AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT]; ME_WRITER_TEARDOWN_REASON_COUNT],
me_writer_teardown_success_total: [AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT],
me_writer_teardown_timeout_total: AtomicU64,
me_writer_teardown_escalation_total: AtomicU64,
me_writer_teardown_noop_total: AtomicU64,
me_writer_cleanup_side_effect_failures_total:
[AtomicU64; ME_WRITER_CLEANUP_SIDE_EFFECT_STEP_COUNT],
me_writer_teardown_duration_bucket_hits:
[[AtomicU64; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT + 1]; ME_WRITER_TEARDOWN_MODE_COUNT],
me_writer_teardown_duration_sum_micros: [AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT],
me_writer_teardown_duration_count: [AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT],
me_refill_triggered_total: AtomicU64,
me_refill_skipped_inflight_total: AtomicU64,
me_refill_failed_total: AtomicU64,
@@ -765,6 +908,74 @@ impl Stats {
self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_teardown_attempt_total(
&self,
reason: MeWriterTeardownReason,
mode: MeWriterTeardownMode,
) {
if self.telemetry_me_allows_normal() {
self.me_writer_teardown_attempt_total[reason.idx()][mode.idx()]
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_teardown_success_total(&self, mode: MeWriterTeardownMode) {
if self.telemetry_me_allows_normal() {
self.me_writer_teardown_success_total[mode.idx()].fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_teardown_timeout_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_teardown_timeout_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_teardown_escalation_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_teardown_escalation_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_teardown_noop_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_teardown_noop_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_cleanup_side_effect_failures_total(
&self,
step: MeWriterCleanupSideEffectStep,
) {
if self.telemetry_me_allows_normal() {
self.me_writer_cleanup_side_effect_failures_total[step.idx()]
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn observe_me_writer_teardown_duration(
&self,
mode: MeWriterTeardownMode,
duration: Duration,
) {
if !self.telemetry_me_allows_normal() {
return;
}
let duration_micros = duration.as_micros().min(u64::MAX as u128) as u64;
let mut bucket_idx = ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT;
for (idx, upper_bound_micros) in ME_WRITER_TEARDOWN_DURATION_BUCKET_BOUNDS_MICROS
.iter()
.copied()
.enumerate()
{
if duration_micros <= upper_bound_micros {
bucket_idx = idx;
break;
}
}
self.me_writer_teardown_duration_bucket_hits[mode.idx()][bucket_idx]
.fetch_add(1, Ordering::Relaxed);
self.me_writer_teardown_duration_sum_micros[mode.idx()]
.fetch_add(duration_micros, Ordering::Relaxed);
self.me_writer_teardown_duration_count[mode.idx()].fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_refill_triggered_total(&self) {
if self.telemetry_me_allows_debug() {
self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed);
@@ -1297,6 +1508,79 @@ impl Stats {
pub fn get_me_writer_removed_unexpected_total(&self) -> u64 {
self.me_writer_removed_unexpected_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_teardown_attempt_total(
&self,
reason: MeWriterTeardownReason,
mode: MeWriterTeardownMode,
) -> u64 {
self.me_writer_teardown_attempt_total[reason.idx()][mode.idx()]
.load(Ordering::Relaxed)
}
pub fn get_me_writer_teardown_attempt_total_by_mode(&self, mode: MeWriterTeardownMode) -> u64 {
MeWriterTeardownReason::ALL
.iter()
.copied()
.map(|reason| self.get_me_writer_teardown_attempt_total(reason, mode))
.sum()
}
pub fn get_me_writer_teardown_success_total(&self, mode: MeWriterTeardownMode) -> u64 {
self.me_writer_teardown_success_total[mode.idx()].load(Ordering::Relaxed)
}
pub fn get_me_writer_teardown_timeout_total(&self) -> u64 {
self.me_writer_teardown_timeout_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_teardown_escalation_total(&self) -> u64 {
self.me_writer_teardown_escalation_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_teardown_noop_total(&self) -> u64 {
self.me_writer_teardown_noop_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_cleanup_side_effect_failures_total(
&self,
step: MeWriterCleanupSideEffectStep,
) -> u64 {
self.me_writer_cleanup_side_effect_failures_total[step.idx()]
.load(Ordering::Relaxed)
}
pub fn get_me_writer_cleanup_side_effect_failures_total_all(&self) -> u64 {
MeWriterCleanupSideEffectStep::ALL
.iter()
.copied()
.map(|step| self.get_me_writer_cleanup_side_effect_failures_total(step))
.sum()
}
pub fn me_writer_teardown_duration_bucket_labels(
) -> &'static [&'static str; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT] {
&ME_WRITER_TEARDOWN_DURATION_BUCKET_LABELS
}
pub fn get_me_writer_teardown_duration_bucket_hits(
&self,
mode: MeWriterTeardownMode,
bucket_idx: usize,
) -> u64 {
self.me_writer_teardown_duration_bucket_hits[mode.idx()][bucket_idx]
.load(Ordering::Relaxed)
}
pub fn get_me_writer_teardown_duration_bucket_total(
&self,
mode: MeWriterTeardownMode,
bucket_idx: usize,
) -> u64 {
let capped_idx = bucket_idx.min(ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT);
let mut total = 0u64;
for idx in 0..=capped_idx {
total = total.saturating_add(self.get_me_writer_teardown_duration_bucket_hits(mode, idx));
}
total
}
pub fn get_me_writer_teardown_duration_count(&self, mode: MeWriterTeardownMode) -> u64 {
self.me_writer_teardown_duration_count[mode.idx()].load(Ordering::Relaxed)
}
pub fn get_me_writer_teardown_duration_sum_seconds(&self, mode: MeWriterTeardownMode) -> f64 {
self.me_writer_teardown_duration_sum_micros[mode.idx()].load(Ordering::Relaxed) as f64
/ 1_000_000.0
}
pub fn get_me_refill_triggered_total(&self) -> u64 {
self.me_refill_triggered_total.load(Ordering::Relaxed)
}
@@ -1800,6 +2084,79 @@ mod tests {
assert_eq!(stats.get_me_keepalive_sent(), 0);
assert_eq!(stats.get_me_route_drop_queue_full(), 0);
}
#[test]
fn test_teardown_counters_and_duration() {
let stats = Stats::new();
stats.increment_me_writer_teardown_attempt_total(
MeWriterTeardownReason::ReaderExit,
MeWriterTeardownMode::Normal,
);
stats.increment_me_writer_teardown_success_total(MeWriterTeardownMode::Normal);
stats.observe_me_writer_teardown_duration(
MeWriterTeardownMode::Normal,
Duration::from_millis(3),
);
stats.increment_me_writer_cleanup_side_effect_failures_total(
MeWriterCleanupSideEffectStep::CloseSignalChannelFull,
);
assert_eq!(
stats.get_me_writer_teardown_attempt_total(
MeWriterTeardownReason::ReaderExit,
MeWriterTeardownMode::Normal
),
1
);
assert_eq!(
stats.get_me_writer_teardown_success_total(MeWriterTeardownMode::Normal),
1
);
assert_eq!(
stats.get_me_writer_teardown_duration_count(MeWriterTeardownMode::Normal),
1
);
assert!(
stats.get_me_writer_teardown_duration_sum_seconds(MeWriterTeardownMode::Normal) > 0.0
);
assert_eq!(
stats.get_me_writer_cleanup_side_effect_failures_total(
MeWriterCleanupSideEffectStep::CloseSignalChannelFull
),
1
);
}
#[test]
fn test_teardown_counters_respect_me_silent() {
let stats = Stats::new();
stats.apply_telemetry_policy(TelemetryPolicy {
core_enabled: true,
user_enabled: true,
me_level: MeTelemetryLevel::Silent,
});
stats.increment_me_writer_teardown_attempt_total(
MeWriterTeardownReason::ReaderExit,
MeWriterTeardownMode::Normal,
);
stats.increment_me_writer_teardown_timeout_total();
stats.observe_me_writer_teardown_duration(
MeWriterTeardownMode::Normal,
Duration::from_millis(1),
);
assert_eq!(
stats.get_me_writer_teardown_attempt_total(
MeWriterTeardownReason::ReaderExit,
MeWriterTeardownMode::Normal
),
0
);
assert_eq!(stats.get_me_writer_teardown_timeout_total(), 0);
assert_eq!(
stats.get_me_writer_teardown_duration_count(MeWriterTeardownMode::Normal),
0
);
}
#[test]
fn test_replay_checker_basic() {

View File

@@ -394,15 +394,17 @@ async fn connect_tcp_with_upstream(
port: u16,
connect_timeout: Duration,
upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>,
scope: Option<&str>,
) -> Result<TcpStream> {
if let Some(manager) = upstream {
if let Some(addr) = resolve_socket_addr(host, port) {
match manager.connect(addr, None, None).await {
match manager.connect(addr, None, scope).await {
Ok(stream) => return Ok(stream),
Err(e) => {
warn!(
host = %host,
port = port,
scope = ?scope,
error = %e,
"Upstream connect failed, using direct connect"
);
@@ -410,12 +412,13 @@ async fn connect_tcp_with_upstream(
}
} else if let Ok(mut addrs) = tokio::net::lookup_host((host, port)).await {
if let Some(addr) = addrs.find(|a| a.is_ipv4()) {
match manager.connect(addr, None, None).await {
match manager.connect(addr, None, scope).await {
Ok(stream) => return Ok(stream),
Err(e) => {
warn!(
host = %host,
port = port,
scope = ?scope,
error = %e,
"Upstream connect failed, using direct connect"
);
@@ -537,6 +540,7 @@ async fn fetch_via_raw_tls(
sni: &str,
connect_timeout: Duration,
upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>,
scope: Option<&str>,
proxy_protocol: u8,
unix_sock: Option<&str>,
) -> Result<TlsFetchResult> {
@@ -572,7 +576,7 @@ async fn fetch_via_raw_tls(
#[cfg(not(unix))]
let _ = unix_sock;
let stream = connect_tcp_with_upstream(host, port, connect_timeout, upstream).await?;
let stream = connect_tcp_with_upstream(host, port, connect_timeout, upstream, scope).await?;
fetch_via_raw_tls_stream(stream, sni, connect_timeout, proxy_protocol).await
}
@@ -675,6 +679,7 @@ async fn fetch_via_rustls(
sni: &str,
connect_timeout: Duration,
upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>,
scope: Option<&str>,
proxy_protocol: u8,
unix_sock: Option<&str>,
) -> Result<TlsFetchResult> {
@@ -710,7 +715,7 @@ async fn fetch_via_rustls(
#[cfg(not(unix))]
let _ = unix_sock;
let stream = connect_tcp_with_upstream(host, port, connect_timeout, upstream).await?;
let stream = connect_tcp_with_upstream(host, port, connect_timeout, upstream, scope).await?;
fetch_via_rustls_stream(stream, host, sni, proxy_protocol).await
}
@@ -726,6 +731,7 @@ pub async fn fetch_real_tls(
sni: &str,
connect_timeout: Duration,
upstream: Option<std::sync::Arc<crate::transport::UpstreamManager>>,
scope: Option<&str>,
proxy_protocol: u8,
unix_sock: Option<&str>,
) -> Result<TlsFetchResult> {
@@ -735,6 +741,7 @@ pub async fn fetch_real_tls(
sni,
connect_timeout,
upstream.clone(),
scope,
proxy_protocol,
unix_sock,
)
@@ -753,6 +760,7 @@ pub async fn fetch_real_tls(
sni,
connect_timeout,
upstream,
scope,
proxy_protocol,
unix_sock,
)

View File

@@ -10,9 +10,10 @@ use tracing::{debug, info, warn};
use crate::config::MeFloorMode;
use crate::crypto::SecureRandom;
use crate::network::IpFamily;
use crate::stats::MeWriterTeardownReason;
use super::MePool;
use super::pool::MeWriter;
use super::pool::{MeFamilyRuntimeState, MeWriter};
const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff
#[allow(dead_code)]
@@ -33,6 +34,33 @@ const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MIN: usize = 8;
const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MAX: usize = 256;
const HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS: u64 = 1;
const HEALTH_DRAIN_TIMEOUT_ENFORCER_INTERVAL_SECS: u64 = 1;
const FAMILY_SUPPRESS_FAIL_STREAK_THRESHOLD: u32 = 6;
const FAMILY_SUPPRESS_WINDOW_SECS: u64 = 120;
const FAMILY_RECOVER_PROBE_INTERVAL_SECS: u64 = 5;
const FAMILY_RECOVER_SUCCESS_STREAK_REQUIRED: u32 = 3;
#[derive(Debug, Clone)]
struct FamilyCircuitState {
state: MeFamilyRuntimeState,
state_since_at: Instant,
suppressed_until: Option<Instant>,
next_probe_at: Instant,
fail_streak: u32,
recover_success_streak: u32,
}
impl FamilyCircuitState {
fn new(now: Instant) -> Self {
Self {
state: MeFamilyRuntimeState::Healthy,
state_since_at: now,
suppressed_until: None,
next_probe_at: now,
fail_streak: 0,
recover_success_streak: 0,
}
}
}
#[derive(Debug, Clone)]
struct DcFloorPlanEntry {
@@ -72,6 +100,25 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut drain_warn_next_allowed: HashMap<u64, Instant> = HashMap::new();
let mut drain_soft_evict_next_allowed: HashMap<u64, Instant> = HashMap::new();
let mut family_v4_circuit = FamilyCircuitState::new(Instant::now());
let mut family_v6_circuit = FamilyCircuitState::new(Instant::now());
let init_epoch_secs = MePool::now_epoch_secs();
pool.set_family_runtime_state(
IpFamily::V4,
family_v4_circuit.state,
init_epoch_secs,
0,
family_v4_circuit.fail_streak,
family_v4_circuit.recover_success_streak,
);
pool.set_family_runtime_state(
IpFamily::V6,
family_v6_circuit.state,
init_epoch_secs,
0,
family_v6_circuit.fail_streak,
family_v6_circuit.recover_success_streak,
);
let mut degraded_interval = true;
loop {
let interval = if degraded_interval {
@@ -87,7 +134,9 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut drain_soft_evict_next_allowed,
)
.await;
let v4_degraded = check_family(
let now = Instant::now();
let now_epoch_secs = MePool::now_epoch_secs();
let v4_degraded_raw = check_family(
IpFamily::V4,
&pool,
&rng,
@@ -106,25 +155,53 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut drain_soft_evict_next_allowed,
)
.await;
let v6_degraded = check_family(
IpFamily::V6,
let v4_degraded = apply_family_circuit_result(
&pool,
&rng,
&mut backoff,
&mut next_attempt,
&mut inflight,
&mut outage_backoff,
&mut outage_next_attempt,
&mut single_endpoint_outage,
&mut shadow_rotate_deadline,
&mut idle_refresh_next_attempt,
&mut adaptive_idle_since,
&mut adaptive_recover_until,
&mut floor_warn_next_allowed,
&mut drain_warn_next_allowed,
&mut drain_soft_evict_next_allowed,
)
.await;
IpFamily::V4,
&mut family_v4_circuit,
Some(v4_degraded_raw),
false,
now,
now_epoch_secs,
);
let v6_check_ran = should_run_family_check(&mut family_v6_circuit, now);
let v6_degraded_raw = if v6_check_ran {
check_family(
IpFamily::V6,
&pool,
&rng,
&mut backoff,
&mut next_attempt,
&mut inflight,
&mut outage_backoff,
&mut outage_next_attempt,
&mut single_endpoint_outage,
&mut shadow_rotate_deadline,
&mut idle_refresh_next_attempt,
&mut adaptive_idle_since,
&mut adaptive_recover_until,
&mut floor_warn_next_allowed,
&mut drain_warn_next_allowed,
&mut drain_soft_evict_next_allowed,
)
.await
} else {
false
};
let v6_degraded = apply_family_circuit_result(
&pool,
IpFamily::V6,
&mut family_v6_circuit,
if v6_check_ran {
Some(v6_degraded_raw)
} else {
None
},
true,
now,
now_epoch_secs,
);
degraded_interval = v4_degraded || v6_degraded;
}
}
@@ -146,6 +223,148 @@ pub async fn me_drain_timeout_enforcer(pool: Arc<MePool>) {
}
}
fn should_run_family_check(circuit: &mut FamilyCircuitState, now: Instant) -> bool {
match circuit.state {
MeFamilyRuntimeState::Suppressed => {
if now < circuit.next_probe_at {
return false;
}
circuit.next_probe_at =
now + Duration::from_secs(FAMILY_RECOVER_PROBE_INTERVAL_SECS);
true
}
_ => true,
}
}
fn apply_family_circuit_result(
pool: &Arc<MePool>,
family: IpFamily,
circuit: &mut FamilyCircuitState,
degraded: Option<bool>,
allow_suppress: bool,
now: Instant,
now_epoch_secs: u64,
) -> bool {
let Some(degraded) = degraded else {
// Preserve suppression state when probe tick is intentionally skipped.
return false;
};
let previous_state = circuit.state;
match circuit.state {
MeFamilyRuntimeState::Suppressed => {
if degraded {
circuit.fail_streak = circuit.fail_streak.saturating_add(1);
circuit.recover_success_streak = 0;
let until = now + Duration::from_secs(FAMILY_SUPPRESS_WINDOW_SECS);
circuit.suppressed_until = Some(until);
circuit.state_since_at = now;
warn!(
?family,
fail_streak = circuit.fail_streak,
suppress_secs = FAMILY_SUPPRESS_WINDOW_SECS,
"ME family remains suppressed due to ongoing failures"
);
} else {
circuit.fail_streak = 0;
circuit.recover_success_streak = 1;
circuit.state = MeFamilyRuntimeState::Recovering;
}
}
MeFamilyRuntimeState::Recovering => {
if degraded {
circuit.fail_streak = circuit.fail_streak.saturating_add(1);
if allow_suppress {
circuit.state = MeFamilyRuntimeState::Suppressed;
let until = now + Duration::from_secs(FAMILY_SUPPRESS_WINDOW_SECS);
circuit.suppressed_until = Some(until);
circuit.next_probe_at =
now + Duration::from_secs(FAMILY_RECOVER_PROBE_INTERVAL_SECS);
warn!(
?family,
fail_streak = circuit.fail_streak,
suppress_secs = FAMILY_SUPPRESS_WINDOW_SECS,
"ME family temporarily suppressed after repeated degradation"
);
} else {
circuit.state = MeFamilyRuntimeState::Degraded;
}
} else {
circuit.recover_success_streak = circuit.recover_success_streak.saturating_add(1);
if circuit.recover_success_streak >= FAMILY_RECOVER_SUCCESS_STREAK_REQUIRED {
circuit.fail_streak = 0;
circuit.recover_success_streak = 0;
circuit.suppressed_until = None;
circuit.state = MeFamilyRuntimeState::Healthy;
info!(
?family,
"ME family suppression lifted after stable recovery probes"
);
}
}
}
_ => {
if degraded {
circuit.fail_streak = circuit.fail_streak.saturating_add(1);
circuit.recover_success_streak = 0;
circuit.state = MeFamilyRuntimeState::Degraded;
if allow_suppress && circuit.fail_streak >= FAMILY_SUPPRESS_FAIL_STREAK_THRESHOLD {
circuit.state = MeFamilyRuntimeState::Suppressed;
let until = now + Duration::from_secs(FAMILY_SUPPRESS_WINDOW_SECS);
circuit.suppressed_until = Some(until);
circuit.next_probe_at =
now + Duration::from_secs(FAMILY_RECOVER_PROBE_INTERVAL_SECS);
warn!(
?family,
fail_streak = circuit.fail_streak,
suppress_secs = FAMILY_SUPPRESS_WINDOW_SECS,
"ME family temporarily suppressed after repeated degradation"
);
}
} else {
circuit.fail_streak = 0;
circuit.recover_success_streak = 0;
circuit.suppressed_until = None;
circuit.state = MeFamilyRuntimeState::Healthy;
}
}
}
if previous_state != circuit.state {
circuit.state_since_at = now;
}
let suppressed_until_epoch_secs = circuit
.suppressed_until
.and_then(|until| {
if until > now {
Some(
now_epoch_secs
.saturating_add(until.saturating_duration_since(now).as_secs()),
)
} else {
None
}
})
.unwrap_or(0);
let state_since_epoch_secs = if previous_state == circuit.state {
pool.family_runtime_state_since_epoch_secs(family)
} else {
now_epoch_secs
};
pool.set_family_runtime_state(
family,
circuit.state,
state_since_epoch_secs,
suppressed_until_epoch_secs,
circuit.fail_streak,
circuit.recover_success_streak,
);
!matches!(circuit.state, MeFamilyRuntimeState::Suppressed) && degraded
}
fn draining_writer_timeout_expired(
pool: &MePool,
writer: &MeWriter,
@@ -358,7 +577,8 @@ pub(super) async fn reap_draining_writers(
continue;
}
pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer_id).await;
pool.remove_writer_and_close_clients(writer_id, MeWriterTeardownReason::ReapTimeoutExpired)
.await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
}
@@ -376,7 +596,8 @@ pub(super) async fn reap_draining_writers(
continue;
}
pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer_id).await;
pool.remove_writer_and_close_clients(writer_id, MeWriterTeardownReason::ReapThresholdForce)
.await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
closed_total = closed_total.saturating_add(1);
@@ -388,7 +609,8 @@ pub(super) async fn reap_draining_writers(
if !closed_writer_ids.insert(writer_id) {
continue;
}
pool.remove_writer_and_close_clients(writer_id).await;
pool.remove_writer_and_close_clients(writer_id, MeWriterTeardownReason::ReapEmpty)
.await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
closed_total = closed_total.saturating_add(1);
@@ -1550,6 +1772,187 @@ async fn maybe_rotate_single_endpoint_shadow(
);
}
/// Last-resort safety net for draining writers stuck past their deadline.
///
/// Runs every `TICK_SECS` and force-closes any draining writer whose
/// `drain_deadline_epoch_secs` has been exceeded by more than a threshold.
///
/// Two thresholds:
/// - `SOFT_THRESHOLD_SECS` (60s): writers with no bound clients
/// - `HARD_THRESHOLD_SECS` (300s): writers WITH bound clients (unconditional)
///
/// Intentionally kept trivial and independent of pool config to minimise
/// the probability of panicking itself. Uses `SystemTime` directly
/// as a fallback clock source and timeouts on every lock acquisition
/// and writer removal so one stuck writer cannot block the rest.
pub async fn me_zombie_writer_watchdog(pool: Arc<MePool>) {
use std::time::{SystemTime, UNIX_EPOCH};
const TICK_SECS: u64 = 30;
const SOFT_THRESHOLD_SECS: u64 = 60;
const HARD_THRESHOLD_SECS: u64 = 300;
const LOCK_TIMEOUT_SECS: u64 = 5;
const REMOVE_TIMEOUT_SECS: u64 = 10;
const HARD_DETACH_TIMEOUT_STREAK: u8 = 3;
let mut removal_timeout_streak = HashMap::<u64, u8>::new();
loop {
tokio::time::sleep(Duration::from_secs(TICK_SECS)).await;
let now = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(d) => d.as_secs(),
Err(_) => continue,
};
// Phase 1: collect zombie IDs under a short read-lock with timeout.
let zombie_ids_with_meta: Vec<(u64, bool)> = {
let Ok(ws) = tokio::time::timeout(
Duration::from_secs(LOCK_TIMEOUT_SECS),
pool.writers.read(),
)
.await
else {
warn!("zombie_watchdog: writers read-lock timeout, skipping tick");
continue;
};
ws.iter()
.filter(|w| w.draining.load(std::sync::atomic::Ordering::Relaxed))
.filter_map(|w| {
let deadline = w
.drain_deadline_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if deadline == 0 {
return None;
}
let overdue = now.saturating_sub(deadline);
if overdue == 0 {
return None;
}
let started = w
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
let drain_age = now.saturating_sub(started);
if drain_age > HARD_THRESHOLD_SECS {
return Some((w.id, true));
}
if overdue > SOFT_THRESHOLD_SECS {
return Some((w.id, false));
}
None
})
.collect()
};
// read lock released here
if zombie_ids_with_meta.is_empty() {
removal_timeout_streak.clear();
continue;
}
let mut active_zombie_ids = HashSet::<u64>::with_capacity(zombie_ids_with_meta.len());
for (writer_id, _) in &zombie_ids_with_meta {
active_zombie_ids.insert(*writer_id);
}
removal_timeout_streak.retain(|writer_id, _| active_zombie_ids.contains(writer_id));
warn!(
zombie_count = zombie_ids_with_meta.len(),
soft_threshold_secs = SOFT_THRESHOLD_SECS,
hard_threshold_secs = HARD_THRESHOLD_SECS,
"Zombie draining writers detected by watchdog, force-closing"
);
// Phase 2: remove each writer individually with a timeout.
// One stuck removal cannot block the rest.
for (writer_id, had_clients) in &zombie_ids_with_meta {
let result = tokio::time::timeout(
Duration::from_secs(REMOVE_TIMEOUT_SECS),
pool.remove_writer_and_close_clients(
*writer_id,
MeWriterTeardownReason::WatchdogStuckDraining,
),
)
.await;
match result {
Ok(true) => {
removal_timeout_streak.remove(writer_id);
pool.stats.increment_pool_force_close_total();
pool.stats
.increment_me_draining_writers_reap_progress_total();
info!(
writer_id,
had_clients,
"Zombie writer removed by watchdog"
);
}
Ok(false) => {
removal_timeout_streak.remove(writer_id);
debug!(
writer_id,
had_clients,
"Zombie writer watchdog removal became no-op"
);
}
Err(_) => {
pool.stats.increment_me_writer_teardown_timeout_total();
let streak = removal_timeout_streak
.entry(*writer_id)
.and_modify(|value| *value = value.saturating_add(1))
.or_insert(1);
warn!(
writer_id,
had_clients,
timeout_streak = *streak,
"Zombie writer removal timed out"
);
if *streak < HARD_DETACH_TIMEOUT_STREAK {
continue;
}
pool.stats.increment_me_writer_teardown_escalation_total();
let hard_detach = tokio::time::timeout(
Duration::from_secs(REMOVE_TIMEOUT_SECS),
pool.remove_draining_writer_hard_detach(
*writer_id,
MeWriterTeardownReason::WatchdogStuckDraining,
),
)
.await;
match hard_detach {
Ok(true) => {
removal_timeout_streak.remove(writer_id);
pool.stats.increment_pool_force_close_total();
pool.stats
.increment_me_draining_writers_reap_progress_total();
info!(
writer_id,
had_clients,
"Zombie writer hard-detached after repeated timeouts"
);
}
Ok(false) => {
removal_timeout_streak.remove(writer_id);
debug!(
writer_id,
had_clients,
"Zombie hard-detach skipped (writer already gone or no longer draining)"
);
}
Err(_) => {
pool.stats.increment_me_writer_teardown_timeout_total();
warn!(
writer_id,
had_clients,
"Zombie hard-detach timed out, will retry next tick"
);
}
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
@@ -1561,13 +1964,19 @@ mod tests {
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use super::reap_draining_writers;
use super::{
FamilyCircuitState, apply_family_circuit_result, reap_draining_writers,
should_run_family_check,
};
use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode};
use crate::crypto::SecureRandom;
use crate::network::IpFamily;
use crate::network::probe::NetworkDecision;
use crate::stats::Stats;
use crate::transport::middle_proxy::codec::WriterCommand;
use crate::transport::middle_proxy::pool::{MePool, MeWriter, WriterContour};
use crate::transport::middle_proxy::pool::{
MeFamilyRuntimeState, MePool, MeWriter, WriterContour,
};
use crate::transport::middle_proxy::registry::ConnMeta;
async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
@@ -1745,4 +2154,47 @@ mod tests {
assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20);
assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30);
}
#[tokio::test]
async fn suppressed_family_probe_skip_preserves_suppressed_state() {
let pool = make_pool(0).await;
let now = Instant::now();
let now_epoch_secs = MePool::now_epoch_secs();
let suppressed_until_epoch_secs = now_epoch_secs.saturating_add(60);
pool.set_family_runtime_state(
IpFamily::V6,
MeFamilyRuntimeState::Suppressed,
now_epoch_secs,
suppressed_until_epoch_secs,
7,
0,
);
let mut circuit = FamilyCircuitState {
state: MeFamilyRuntimeState::Suppressed,
state_since_at: now,
suppressed_until: Some(now + Duration::from_secs(60)),
next_probe_at: now + Duration::from_secs(5),
fail_streak: 7,
recover_success_streak: 0,
};
assert!(!should_run_family_check(&mut circuit, now));
assert!(!apply_family_circuit_result(
&pool,
IpFamily::V6,
&mut circuit,
None,
true,
now,
now_epoch_secs,
));
assert_eq!(circuit.state, MeFamilyRuntimeState::Suppressed);
assert_eq!(circuit.fail_streak, 7);
assert_eq!(circuit.recover_success_streak, 0);
assert_eq!(
pool.family_runtime_state(IpFamily::V6),
MeFamilyRuntimeState::Suppressed,
);
}
}

View File

@@ -316,7 +316,12 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c
let ids = sorted_writer_ids(&pool).await;
for writer_id in ids.into_iter().take(3) {
let _ = pool.remove_writer_and_close_clients(writer_id).await;
let _ = pool
.remove_writer_and_close_clients(
writer_id,
crate::stats::MeWriterTeardownReason::ReapEmpty,
)
.await;
}
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;

View File

@@ -12,7 +12,9 @@ use super::codec::WriterCommand;
use super::health::{health_drain_close_budget, reap_draining_writers};
use super::pool::{MePool, MeWriter, WriterContour};
use super::registry::ConnMeta;
use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode};
use crate::config::{
GeneralConfig, MeBindStaleMode, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode,
};
use crate::crypto::SecureRandom;
use crate::network::probe::NetworkDecision;
use crate::stats::Stats;
@@ -195,7 +197,9 @@ async fn reap_draining_writers_drops_warn_state_for_removed_writer() {
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.contains_key(&7));
let _ = pool.remove_writer_and_close_clients(7).await;
let _ = pool
.remove_writer_and_close_clients(7, crate::stats::MeWriterTeardownReason::ReapEmpty)
.await;
assert!(pool.registry.get_writer(conn_ids[0]).await.is_none());
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
@@ -525,7 +529,12 @@ async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population
let existing_writer_ids = current_writer_ids(&pool).await;
for writer_id in existing_writer_ids.into_iter().take(4) {
let _ = pool.remove_writer_and_close_clients(writer_id).await;
let _ = pool
.remove_writer_and_close_clients(
writer_id,
crate::stats::MeWriterTeardownReason::ReapEmpty,
)
.await;
}
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.len() <= pool.writers.read().await.len());
@@ -646,10 +655,23 @@ async fn reap_draining_writers_instadrain_removes_non_expired_writers_immediatel
#[test]
fn general_config_default_drain_threshold_remains_enabled() {
assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128);
assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 32);
assert!(GeneralConfig::default().me_pool_drain_soft_evict_enabled);
assert_eq!(
GeneralConfig::default().me_pool_drain_soft_evict_per_writer,
1
GeneralConfig::default().me_pool_drain_soft_evict_grace_secs,
10
);
assert_eq!(
GeneralConfig::default().me_pool_drain_soft_evict_per_writer,
2
);
assert_eq!(
GeneralConfig::default().me_pool_drain_soft_evict_budget_per_core,
16
);
assert_eq!(
GeneralConfig::default().me_pool_drain_soft_evict_cooldown_ms,
1000
);
assert_eq!(GeneralConfig::default().me_bind_stale_mode, MeBindStaleMode::Never);
}

View File

@@ -30,7 +30,7 @@ mod health_adversarial_tests;
use bytes::Bytes;
pub use health::{me_drain_timeout_enforcer, me_health_monitor};
pub use health::{me_drain_timeout_enforcer, me_health_monitor, me_zombie_writer_watchdog};
#[allow(unused_imports)]
pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily};
pub use pool::MePool;

View File

@@ -18,6 +18,8 @@ use crate::transport::UpstreamManager;
use super::ConnRegistry;
use super::codec::WriterCommand;
const ME_FORCE_CLOSE_SAFETY_FALLBACK_SECS: u64 = 300;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(super) struct RefillDcKey {
pub dc: i32,
@@ -72,6 +74,64 @@ impl WriterContour {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub(crate) enum MeFamilyRuntimeState {
Healthy = 0,
Degraded = 1,
Suppressed = 2,
Recovering = 3,
}
impl MeFamilyRuntimeState {
pub(crate) fn from_u8(value: u8) -> Self {
match value {
1 => Self::Degraded,
2 => Self::Suppressed,
3 => Self::Recovering,
_ => Self::Healthy,
}
}
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Healthy => "healthy",
Self::Degraded => "degraded",
Self::Suppressed => "suppressed",
Self::Recovering => "recovering",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub(crate) enum MeDrainGateReason {
Open = 0,
CoverageQuorum = 1,
Redundancy = 2,
SuppressionActive = 3,
}
impl MeDrainGateReason {
pub(crate) fn from_u8(value: u8) -> Self {
match value {
1 => Self::CoverageQuorum,
2 => Self::Redundancy,
3 => Self::SuppressionActive,
_ => Self::Open,
}
}
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Open => "open",
Self::CoverageQuorum => "coverage_quorum",
Self::Redundancy => "redundancy",
Self::SuppressionActive => "suppression_active",
}
}
}
#[derive(Debug, Clone)]
pub struct SecretSnapshot {
pub epoch: u64,
@@ -201,6 +261,20 @@ pub struct MePool {
pub(super) me_health_interval_ms_unhealthy: AtomicU64,
pub(super) me_health_interval_ms_healthy: AtomicU64,
pub(super) me_warn_rate_limit_ms: AtomicU64,
pub(super) me_family_v4_runtime_state: AtomicU8,
pub(super) me_family_v6_runtime_state: AtomicU8,
pub(super) me_family_v4_state_since_epoch_secs: AtomicU64,
pub(super) me_family_v6_state_since_epoch_secs: AtomicU64,
pub(super) me_family_v4_suppressed_until_epoch_secs: AtomicU64,
pub(super) me_family_v6_suppressed_until_epoch_secs: AtomicU64,
pub(super) me_family_v4_fail_streak: AtomicU32,
pub(super) me_family_v6_fail_streak: AtomicU32,
pub(super) me_family_v4_recover_success_streak: AtomicU32,
pub(super) me_family_v6_recover_success_streak: AtomicU32,
pub(super) me_last_drain_gate_route_quorum_ok: AtomicBool,
pub(super) me_last_drain_gate_redundancy_ok: AtomicBool,
pub(super) me_last_drain_gate_block_reason: AtomicU8,
pub(super) me_last_drain_gate_updated_at_epoch_secs: AtomicU64,
pub(super) runtime_ready: AtomicBool,
pool_size: usize,
pub(super) preferred_endpoints_by_dc: Arc<RwLock<HashMap<i32, Vec<SocketAddr>>>>,
@@ -229,6 +303,14 @@ impl MePool {
.as_secs()
}
fn normalize_force_close_secs(force_close_secs: u64) -> u64 {
if force_close_secs == 0 {
ME_FORCE_CLOSE_SAFETY_FALLBACK_SECS
} else {
force_close_secs
}
}
pub fn new(
proxy_tag: Option<Vec<u8>>,
proxy_secret: Vec<u8>,
@@ -477,7 +559,9 @@ impl MePool {
me_pool_drain_soft_evict_cooldown_ms: AtomicU64::new(
me_pool_drain_soft_evict_cooldown_ms.max(1),
),
me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs),
me_pool_force_close_secs: AtomicU64::new(Self::normalize_force_close_secs(
me_pool_force_close_secs,
)),
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(
me_pool_min_fresh_ratio,
)),
@@ -506,6 +590,20 @@ impl MePool {
me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)),
me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)),
me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)),
me_family_v4_runtime_state: AtomicU8::new(MeFamilyRuntimeState::Healthy as u8),
me_family_v6_runtime_state: AtomicU8::new(MeFamilyRuntimeState::Healthy as u8),
me_family_v4_state_since_epoch_secs: AtomicU64::new(Self::now_epoch_secs()),
me_family_v6_state_since_epoch_secs: AtomicU64::new(Self::now_epoch_secs()),
me_family_v4_suppressed_until_epoch_secs: AtomicU64::new(0),
me_family_v6_suppressed_until_epoch_secs: AtomicU64::new(0),
me_family_v4_fail_streak: AtomicU32::new(0),
me_family_v6_fail_streak: AtomicU32::new(0),
me_family_v4_recover_success_streak: AtomicU32::new(0),
me_family_v6_recover_success_streak: AtomicU32::new(0),
me_last_drain_gate_route_quorum_ok: AtomicBool::new(false),
me_last_drain_gate_redundancy_ok: AtomicBool::new(false),
me_last_drain_gate_block_reason: AtomicU8::new(MeDrainGateReason::Open as u8),
me_last_drain_gate_updated_at_epoch_secs: AtomicU64::new(Self::now_epoch_secs()),
runtime_ready: AtomicBool::new(false),
preferred_endpoints_by_dc: Arc::new(RwLock::new(preferred_endpoints_by_dc)),
})
@@ -523,6 +621,153 @@ impl MePool {
self.runtime_ready.load(Ordering::Relaxed)
}
pub(super) fn set_family_runtime_state(
&self,
family: IpFamily,
state: MeFamilyRuntimeState,
state_since_epoch_secs: u64,
suppressed_until_epoch_secs: u64,
fail_streak: u32,
recover_success_streak: u32,
) {
match family {
IpFamily::V4 => {
self.me_family_v4_runtime_state
.store(state as u8, Ordering::Relaxed);
self.me_family_v4_state_since_epoch_secs
.store(state_since_epoch_secs, Ordering::Relaxed);
self.me_family_v4_suppressed_until_epoch_secs
.store(suppressed_until_epoch_secs, Ordering::Relaxed);
self.me_family_v4_fail_streak
.store(fail_streak, Ordering::Relaxed);
self.me_family_v4_recover_success_streak
.store(recover_success_streak, Ordering::Relaxed);
}
IpFamily::V6 => {
self.me_family_v6_runtime_state
.store(state as u8, Ordering::Relaxed);
self.me_family_v6_state_since_epoch_secs
.store(state_since_epoch_secs, Ordering::Relaxed);
self.me_family_v6_suppressed_until_epoch_secs
.store(suppressed_until_epoch_secs, Ordering::Relaxed);
self.me_family_v6_fail_streak
.store(fail_streak, Ordering::Relaxed);
self.me_family_v6_recover_success_streak
.store(recover_success_streak, Ordering::Relaxed);
}
}
}
pub(crate) fn family_runtime_state(&self, family: IpFamily) -> MeFamilyRuntimeState {
match family {
IpFamily::V4 => MeFamilyRuntimeState::from_u8(
self.me_family_v4_runtime_state.load(Ordering::Relaxed),
),
IpFamily::V6 => MeFamilyRuntimeState::from_u8(
self.me_family_v6_runtime_state.load(Ordering::Relaxed),
),
}
}
pub(crate) fn family_runtime_state_since_epoch_secs(&self, family: IpFamily) -> u64 {
match family {
IpFamily::V4 => self
.me_family_v4_state_since_epoch_secs
.load(Ordering::Relaxed),
IpFamily::V6 => self
.me_family_v6_state_since_epoch_secs
.load(Ordering::Relaxed),
}
}
pub(crate) fn family_suppressed_until_epoch_secs(&self, family: IpFamily) -> u64 {
match family {
IpFamily::V4 => self
.me_family_v4_suppressed_until_epoch_secs
.load(Ordering::Relaxed),
IpFamily::V6 => self
.me_family_v6_suppressed_until_epoch_secs
.load(Ordering::Relaxed),
}
}
pub(crate) fn family_fail_streak(&self, family: IpFamily) -> u32 {
match family {
IpFamily::V4 => self.me_family_v4_fail_streak.load(Ordering::Relaxed),
IpFamily::V6 => self.me_family_v6_fail_streak.load(Ordering::Relaxed),
}
}
pub(crate) fn family_recover_success_streak(&self, family: IpFamily) -> u32 {
match family {
IpFamily::V4 => self
.me_family_v4_recover_success_streak
.load(Ordering::Relaxed),
IpFamily::V6 => self
.me_family_v6_recover_success_streak
.load(Ordering::Relaxed),
}
}
pub(crate) fn is_family_temporarily_suppressed(
&self,
family: IpFamily,
now_epoch_secs: u64,
) -> bool {
self.family_suppressed_until_epoch_secs(family) > now_epoch_secs
}
pub(super) fn family_enabled_for_drain_coverage(
&self,
family: IpFamily,
now_epoch_secs: u64,
) -> bool {
let configured = match family {
IpFamily::V4 => self.decision.ipv4_me,
IpFamily::V6 => self.decision.ipv6_me,
};
configured && !self.is_family_temporarily_suppressed(family, now_epoch_secs)
}
pub(super) fn set_last_drain_gate(
&self,
route_quorum_ok: bool,
redundancy_ok: bool,
block_reason: MeDrainGateReason,
updated_at_epoch_secs: u64,
) {
self.me_last_drain_gate_route_quorum_ok
.store(route_quorum_ok, Ordering::Relaxed);
self.me_last_drain_gate_redundancy_ok
.store(redundancy_ok, Ordering::Relaxed);
self.me_last_drain_gate_block_reason
.store(block_reason as u8, Ordering::Relaxed);
self.me_last_drain_gate_updated_at_epoch_secs
.store(updated_at_epoch_secs, Ordering::Relaxed);
}
pub(crate) fn last_drain_gate_route_quorum_ok(&self) -> bool {
self.me_last_drain_gate_route_quorum_ok
.load(Ordering::Relaxed)
}
pub(crate) fn last_drain_gate_redundancy_ok(&self) -> bool {
self.me_last_drain_gate_redundancy_ok
.load(Ordering::Relaxed)
}
pub(crate) fn last_drain_gate_block_reason(&self) -> MeDrainGateReason {
MeDrainGateReason::from_u8(
self.me_last_drain_gate_block_reason
.load(Ordering::Relaxed),
)
}
pub(crate) fn last_drain_gate_updated_at_epoch_secs(&self) -> u64 {
self.me_last_drain_gate_updated_at_epoch_secs
.load(Ordering::Relaxed)
}
pub fn update_runtime_reinit_policy(
&self,
hardswap: bool,
@@ -587,8 +832,10 @@ impl MePool {
);
self.me_pool_drain_soft_evict_cooldown_ms
.store(pool_drain_soft_evict_cooldown_ms.max(1), Ordering::Relaxed);
self.me_pool_force_close_secs
.store(force_close_secs, Ordering::Relaxed);
self.me_pool_force_close_secs.store(
Self::normalize_force_close_secs(force_close_secs),
Ordering::Relaxed,
);
self.me_pool_min_fresh_ratio_permille
.store(Self::ratio_to_permille(min_fresh_ratio), Ordering::Relaxed);
self.me_hardswap_warmup_delay_min_ms
@@ -733,12 +980,9 @@ impl MePool {
}
pub(super) fn force_close_timeout(&self) -> Option<Duration> {
let secs = self.me_pool_force_close_secs.load(Ordering::Relaxed);
if secs == 0 {
None
} else {
Some(Duration::from_secs(secs))
}
let secs =
Self::normalize_force_close_secs(self.me_pool_force_close_secs.load(Ordering::Relaxed));
Some(Duration::from_secs(secs))
}
pub(super) fn drain_soft_evict_enabled(&self) -> bool {
@@ -1010,9 +1254,10 @@ impl MePool {
}
pub(super) async fn active_coverage_required_total(&self) -> usize {
let now_epoch_secs = Self::now_epoch_secs();
let mut endpoints_by_dc = HashMap::<i32, HashSet<SocketAddr>>::new();
if self.decision.ipv4_me {
if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch_secs) {
let map = self.proxy_map_v4.read().await;
for (dc, addrs) in map.iter() {
let entry = endpoints_by_dc.entry(*dc).or_default();
@@ -1022,7 +1267,7 @@ impl MePool {
}
}
if self.decision.ipv6_me {
if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch_secs) {
let map = self.proxy_map_v6.read().await;
for (dc, addrs) in map.iter() {
let entry = endpoints_by_dc.entry(*dc).or_default();

View File

@@ -74,9 +74,8 @@ impl MePool {
debug!(
%addr,
wait_ms = expiry.saturating_duration_since(now).as_millis(),
"All ME endpoints are quarantined for the DC group; retrying earliest one"
"All ME endpoints are quarantined for the DC group; waiting for quarantine expiry"
);
return vec![addr];
}
Vec::new()
@@ -165,9 +164,10 @@ impl MePool {
}
async fn endpoints_for_dc(&self, target_dc: i32) -> Vec<SocketAddr> {
let now_epoch_secs = Self::now_epoch_secs();
let mut endpoints = HashSet::<SocketAddr>::new();
if self.decision.ipv4_me {
if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch_secs) {
let map = self.proxy_map_v4.read().await;
if let Some(addrs) = map.get(&target_dc) {
for (ip, port) in addrs {
@@ -176,7 +176,7 @@ impl MePool {
}
}
if self.decision.ipv6_me {
if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch_secs) {
let map = self.proxy_map_v6.read().await;
if let Some(addrs) = map.get(&target_dc) {
for (ip, port) in addrs {

View File

@@ -11,8 +11,9 @@ use tracing::{debug, info, warn};
use std::collections::hash_map::DefaultHasher;
use crate::crypto::SecureRandom;
use crate::network::IpFamily;
use super::pool::{MePool, WriterContour};
use super::pool::{MeDrainGateReason, MePool, WriterContour};
const ME_HARDSWAP_PENDING_TTL_SECS: u64 = 1800;
@@ -120,9 +121,10 @@ impl MePool {
}
async fn desired_dc_endpoints(&self) -> HashMap<i32, HashSet<SocketAddr>> {
let now_epoch_secs = Self::now_epoch_secs();
let mut out: HashMap<i32, HashSet<SocketAddr>> = HashMap::new();
if self.decision.ipv4_me {
if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch_secs) {
let map_v4 = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map_v4 {
let entry = out.entry(dc).or_default();
@@ -132,7 +134,7 @@ impl MePool {
}
}
if self.decision.ipv6_me {
if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch_secs) {
let map_v6 = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map_v6 {
let entry = out.entry(dc).or_default();
@@ -313,13 +315,23 @@ impl MePool {
pub async fn zero_downtime_reinit_after_map_change(self: &Arc<Self>, rng: &SecureRandom) {
let desired_by_dc = self.desired_dc_endpoints().await;
let now_epoch_secs = Self::now_epoch_secs();
let v4_suppressed = self.is_family_temporarily_suppressed(IpFamily::V4, now_epoch_secs);
let v6_suppressed = self.is_family_temporarily_suppressed(IpFamily::V6, now_epoch_secs);
if desired_by_dc.is_empty() {
warn!("ME endpoint map is empty; skipping stale writer drain");
let reason = if (self.decision.ipv4_me && v4_suppressed)
|| (self.decision.ipv6_me && v6_suppressed)
{
MeDrainGateReason::SuppressionActive
} else {
MeDrainGateReason::CoverageQuorum
};
self.set_last_drain_gate(false, false, reason, now_epoch_secs);
return;
}
let desired_map_hash = Self::desired_map_hash(&desired_by_dc);
let now_epoch_secs = Self::now_epoch_secs();
let previous_generation = self.current_generation();
let hardswap = self.hardswap.load(Ordering::Relaxed);
let generation = if hardswap {
@@ -390,7 +402,17 @@ impl MePool {
.load(Ordering::Relaxed),
);
let (coverage_ratio, missing_dc) = Self::coverage_ratio(&desired_by_dc, &active_writer_addrs);
let mut route_quorum_ok = coverage_ratio >= min_ratio;
let mut redundancy_ok = missing_dc.is_empty();
let mut redundancy_missing_dc = missing_dc.clone();
let mut gate_coverage_ratio = coverage_ratio;
if !hardswap && coverage_ratio < min_ratio {
self.set_last_drain_gate(
false,
redundancy_ok,
MeDrainGateReason::CoverageQuorum,
now_epoch_secs,
);
warn!(
previous_generation,
generation,
@@ -411,7 +433,17 @@ impl MePool {
.collect();
let (fresh_coverage_ratio, fresh_missing_dc) =
Self::coverage_ratio(&desired_by_dc, &fresh_writer_addrs);
if !fresh_missing_dc.is_empty() {
route_quorum_ok = fresh_coverage_ratio >= min_ratio;
redundancy_ok = fresh_missing_dc.is_empty();
redundancy_missing_dc = fresh_missing_dc.clone();
gate_coverage_ratio = fresh_coverage_ratio;
if fresh_coverage_ratio < min_ratio {
self.set_last_drain_gate(
false,
redundancy_ok,
MeDrainGateReason::CoverageQuorum,
now_epoch_secs,
);
warn!(
previous_generation,
generation,
@@ -421,13 +453,16 @@ impl MePool {
);
return;
}
} else if !missing_dc.is_empty() {
}
self.set_last_drain_gate(route_quorum_ok, redundancy_ok, MeDrainGateReason::Open, now_epoch_secs);
if !redundancy_ok {
warn!(
missing_dc = ?missing_dc,
// Keep stale writers alive when fresh coverage is incomplete.
"ME reinit coverage incomplete; keeping stale writers"
missing_dc = ?redundancy_missing_dc,
coverage_ratio = format_args!("{gate_coverage_ratio:.3}"),
min_ratio = format_args!("{min_ratio:.3}"),
"ME reinit proceeds with weighted quorum while some DC groups remain uncovered"
);
return;
}
if hardswap {

View File

@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::time::Instant;
use super::pool::{MePool, RefillDcKey};
use super::pool::{MeDrainGateReason, MePool, RefillDcKey};
use crate::network::IpFamily;
#[derive(Clone, Debug)]
@@ -36,6 +36,24 @@ pub(crate) struct MeApiNatStunSnapshot {
pub stun_backoff_remaining_ms: Option<u64>,
}
#[derive(Clone, Debug)]
pub(crate) struct MeApiFamilyStateSnapshot {
pub family: &'static str,
pub state: &'static str,
pub state_since_epoch_secs: u64,
pub suppressed_until_epoch_secs: Option<u64>,
pub fail_streak: u32,
pub recover_success_streak: u32,
}
#[derive(Clone, Debug)]
pub(crate) struct MeApiDrainGateSnapshot {
pub route_quorum_ok: bool,
pub redundancy_ok: bool,
pub block_reason: &'static str,
pub updated_at_epoch_secs: u64,
}
impl MePool {
pub(crate) async fn api_refill_snapshot(&self) -> MeApiRefillSnapshot {
let inflight_endpoints_total = self.refill_inflight.lock().await.len();
@@ -125,4 +143,35 @@ impl MePool {
stun_backoff_remaining_ms,
}
}
pub(crate) fn api_family_state_snapshot(&self) -> Vec<MeApiFamilyStateSnapshot> {
[IpFamily::V4, IpFamily::V6]
.into_iter()
.map(|family| {
let state = self.family_runtime_state(family);
let suppressed_until = self.family_suppressed_until_epoch_secs(family);
MeApiFamilyStateSnapshot {
family: match family {
IpFamily::V4 => "v4",
IpFamily::V6 => "v6",
},
state: state.as_str(),
state_since_epoch_secs: self.family_runtime_state_since_epoch_secs(family),
suppressed_until_epoch_secs: (suppressed_until != 0).then_some(suppressed_until),
fail_streak: self.family_fail_streak(family),
recover_success_streak: self.family_recover_success_streak(family),
}
})
.collect()
}
pub(crate) fn api_drain_gate_snapshot(&self) -> MeApiDrainGateSnapshot {
let reason: MeDrainGateReason = self.last_drain_gate_block_reason();
MeApiDrainGateSnapshot {
route_quorum_ok: self.last_drain_gate_route_quorum_ok(),
redundancy_ok: self.last_drain_gate_redundancy_ok(),
block_reason: reason.as_str(),
updated_at_epoch_secs: self.last_drain_gate_updated_at_epoch_secs(),
}
}
}

View File

@@ -16,11 +16,13 @@ use crate::config::MeBindStaleMode;
use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result};
use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32};
use crate::stats::{
MeWriterCleanupSideEffectStep, MeWriterTeardownMode, MeWriterTeardownReason,
};
use super::codec::{RpcWriter, WriterCommand};
use super::pool::{MePool, MeWriter, WriterContour};
use super::reader::reader_loop;
use super::registry::BoundConn;
use super::wire::build_proxy_req_payload;
const ME_ACTIVE_PING_SECS: u64 = 25;
@@ -28,6 +30,12 @@ const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5;
const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700;
#[derive(Clone, Copy)]
enum WriterRemoveGuardMode {
Any,
DrainingOnly,
}
fn is_me_peer_closed_error(error: &ProxyError) -> bool {
matches!(error, ProxyError::Io(ioe) if ioe.kind() == ErrorKind::UnexpectedEof)
}
@@ -44,9 +52,16 @@ impl MePool {
for writer_id in closed_writer_ids {
if self.registry.is_writer_empty(writer_id).await {
let _ = self.remove_writer_only(writer_id).await;
let _ = self
.remove_writer_only(writer_id, MeWriterTeardownReason::PruneClosedWriter)
.await;
} else {
let _ = self.remove_writer_and_close_clients(writer_id).await;
let _ = self
.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::PruneClosedWriter,
)
.await;
}
}
}
@@ -143,6 +158,9 @@ impl MePool {
crc_mode: hs.crc_mode,
};
let cancel_wr = cancel.clone();
let cleanup_done = Arc::new(AtomicBool::new(false));
let cleanup_for_writer = cleanup_done.clone();
let pool_writer_task = Arc::downgrade(self);
tokio::spawn(async move {
loop {
tokio::select! {
@@ -160,6 +178,20 @@ impl MePool {
_ = cancel_wr.cancelled() => break,
}
}
if cleanup_for_writer
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
if let Some(pool) = pool_writer_task.upgrade() {
pool.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::WriterTaskExit,
)
.await;
} else {
cancel_wr.cancel();
}
}
});
let writer = MeWriter {
id: writer_id,
@@ -196,7 +228,6 @@ impl MePool {
let cancel_ping = cancel.clone();
let tx_ping = tx.clone();
let ping_tracker_ping = ping_tracker.clone();
let cleanup_done = Arc::new(AtomicBool::new(false));
let cleanup_for_reader = cleanup_done.clone();
let cleanup_for_ping = cleanup_done.clone();
let keepalive_enabled = self.me_keepalive_enabled;
@@ -242,21 +273,29 @@ impl MePool {
stats_reader_close.increment_me_idle_close_by_peer_total();
info!(writer_id, "ME socket closed by peer on idle writer");
}
if let Some(pool) = pool.upgrade()
&& cleanup_for_reader
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
if cleanup_for_reader
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
if let Some(pool) = pool.upgrade() {
pool.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::ReaderExit,
)
.await;
} else {
// Fallback for shutdown races: make writer task exit quickly so stale
// channels are observable by periodic prune.
cancel_reader_token.cancel();
}
}
if let Err(e) = res {
if !idle_close_by_peer {
warn!(error = %e, "ME reader ended");
}
}
let mut ws = writers_arc.write().await;
ws.retain(|w| w.id != writer_id);
info!(remaining = ws.len(), "Dead ME writer removed from pool");
let remaining = writers_arc.read().await.len();
debug!(writer_id, remaining, "ME reader task finished");
});
let pool_ping = Arc::downgrade(self);
@@ -351,7 +390,11 @@ impl MePool {
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
pool.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::PingSendFail,
)
.await;
}
break;
}
@@ -444,7 +487,11 @@ impl MePool {
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
pool.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::SignalSendFail,
)
.await;
}
break;
}
@@ -478,7 +525,11 @@ impl MePool {
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
pool.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::SignalSendFail,
)
.await;
}
break;
}
@@ -491,21 +542,83 @@ impl MePool {
Ok(())
}
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
pub(crate) async fn remove_writer_and_close_clients(
self: &Arc<Self>,
writer_id: u64,
reason: MeWriterTeardownReason,
) -> bool {
// Full client cleanup now happens inside `registry.writer_lost` to keep
// writer reap/remove paths strictly non-blocking per connection.
let _ = self.remove_writer_only(writer_id).await;
self.remove_writer_with_mode(
writer_id,
reason,
MeWriterTeardownMode::Normal,
WriterRemoveGuardMode::Any,
)
.await
}
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> {
pub(super) async fn remove_draining_writer_hard_detach(
self: &Arc<Self>,
writer_id: u64,
reason: MeWriterTeardownReason,
) -> bool {
self.remove_writer_with_mode(
writer_id,
reason,
MeWriterTeardownMode::HardDetach,
WriterRemoveGuardMode::DrainingOnly,
)
.await
}
async fn remove_writer_only(
self: &Arc<Self>,
writer_id: u64,
reason: MeWriterTeardownReason,
) -> bool {
self.remove_writer_with_mode(
writer_id,
reason,
MeWriterTeardownMode::Normal,
WriterRemoveGuardMode::Any,
)
.await
}
// Authoritative teardown primitive shared by normal cleanup and watchdog path.
// Lock-order invariant:
// 1) mutate `writers` under pool write lock,
// 2) release pool lock,
// 3) run registry/metrics/refill side effects.
// `registry.writer_lost` must never run while `writers` lock is held.
async fn remove_writer_with_mode(
self: &Arc<Self>,
writer_id: u64,
reason: MeWriterTeardownReason,
mode: MeWriterTeardownMode,
guard_mode: WriterRemoveGuardMode,
) -> bool {
let started_at = Instant::now();
self.stats
.increment_me_writer_teardown_attempt_total(reason, mode);
let mut close_tx: Option<mpsc::Sender<WriterCommand>> = None;
let mut removed_addr: Option<SocketAddr> = None;
let mut removed_dc: Option<i32> = None;
let mut removed_uptime: Option<Duration> = None;
let mut trigger_refill = false;
let mut removed = false;
{
let mut ws = self.writers.write().await;
if let Some(pos) = ws.iter().position(|w| w.id == writer_id) {
if matches!(guard_mode, WriterRemoveGuardMode::DrainingOnly)
&& !ws[pos].draining.load(Ordering::Relaxed)
{
self.stats.increment_me_writer_teardown_noop_total();
self.stats
.observe_me_writer_teardown_duration(mode, started_at.elapsed());
return false;
}
let w = ws.remove(pos);
let was_draining = w.draining.load(Ordering::Relaxed);
if was_draining {
@@ -522,6 +635,7 @@ impl MePool {
}
close_tx = Some(w.tx.clone());
self.conn_count.fetch_sub(1, Ordering::Relaxed);
removed = true;
}
}
// State invariant:
@@ -529,7 +643,7 @@ impl MePool {
// - writer is removed from registry routing/binding maps via `writer_lost`.
// The close command below is only a best-effort accelerator for task shutdown.
// Cleanup progress must never depend on command-channel availability.
let conns = self.registry.writer_lost(writer_id).await;
let _ = self.registry.writer_lost(writer_id).await;
{
let mut tracker = self.ping_tracker.lock().await;
tracker.retain(|_, (_, wid)| *wid != writer_id);
@@ -542,6 +656,9 @@ impl MePool {
self.stats.increment_me_writer_close_signal_drop_total();
self.stats
.increment_me_writer_close_signal_channel_full_total();
self.stats.increment_me_writer_cleanup_side_effect_failures_total(
MeWriterCleanupSideEffectStep::CloseSignalChannelFull,
);
debug!(
writer_id,
"Skipping close signal for removed writer: command channel is full"
@@ -549,6 +666,9 @@ impl MePool {
}
Err(TrySendError::Closed(_)) => {
self.stats.increment_me_writer_close_signal_drop_total();
self.stats.increment_me_writer_cleanup_side_effect_failures_total(
MeWriterCleanupSideEffectStep::CloseSignalChannelClosed,
);
debug!(
writer_id,
"Skipping close signal for removed writer: command channel is closed"
@@ -556,16 +676,24 @@ impl MePool {
}
}
}
if trigger_refill
&& let Some(addr) = removed_addr
&& let Some(writer_dc) = removed_dc
{
if let Some(addr) = removed_addr {
if let Some(uptime) = removed_uptime {
self.maybe_quarantine_flapping_endpoint(addr, uptime).await;
}
self.trigger_immediate_refill_for_dc(addr, writer_dc);
if trigger_refill
&& let Some(writer_dc) = removed_dc
{
self.trigger_immediate_refill_for_dc(addr, writer_dc);
}
}
conns
if removed {
self.stats.increment_me_writer_teardown_success_total(mode);
} else {
self.stats.increment_me_writer_teardown_noop_total();
}
self.stats
.observe_me_writer_teardown_duration(mode, started_at.elapsed());
removed
}
pub(crate) async fn mark_writer_draining_with_timeout(

View File

@@ -14,6 +14,7 @@ use crate::config::{MeRouteNoWriterMode, MeWriterPickMode};
use crate::error::{ProxyError, Result};
use crate::network::IpFamily;
use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32};
use crate::stats::MeWriterTeardownReason;
use super::MePool;
use super::codec::WriterCommand;
@@ -134,7 +135,11 @@ impl MePool {
Ok(()) => return Ok(()),
Err(TimedSendError::Closed(_)) => {
warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await;
self.remove_writer_and_close_clients(
current.writer_id,
MeWriterTeardownReason::RouteChannelClosed,
)
.await;
continue;
}
Err(TimedSendError::Timeout(_)) => {
@@ -151,7 +156,11 @@ impl MePool {
}
Err(TrySendError::Closed(_)) => {
warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await;
self.remove_writer_and_close_clients(
current.writer_id,
MeWriterTeardownReason::RouteChannelClosed,
)
.await;
continue;
}
}
@@ -458,7 +467,11 @@ impl MePool {
Err(TrySendError::Closed(_)) => {
self.stats.increment_me_writer_pick_closed_total(pick_mode);
warn!(writer_id = w.id, "ME writer channel closed");
self.remove_writer_and_close_clients(w.id).await;
self.remove_writer_and_close_clients(
w.id,
MeWriterTeardownReason::RouteChannelClosed,
)
.await;
continue;
}
}
@@ -503,7 +516,11 @@ impl MePool {
Err(TimedSendError::Closed(_)) => {
self.stats.increment_me_writer_pick_closed_total(pick_mode);
warn!(writer_id = w.id, "ME writer channel closed (blocking)");
self.remove_writer_and_close_clients(w.id).await;
self.remove_writer_and_close_clients(
w.id,
MeWriterTeardownReason::RouteChannelClosed,
)
.await;
}
Err(TimedSendError::Timeout(_)) => {
self.stats.increment_me_writer_pick_full_total(pick_mode);
@@ -654,7 +671,11 @@ impl MePool {
}
Err(TrySendError::Closed(_)) => {
debug!("ME close write failed");
self.remove_writer_and_close_clients(w.writer_id).await;
self.remove_writer_and_close_clients(
w.writer_id,
MeWriterTeardownReason::CloseRpcChannelClosed,
)
.await;
}
}
} else {