diff --git a/Cargo.lock b/Cargo.lock index 9f14499..c17574d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -90,9 +90,9 @@ checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" [[package]] name = "arc-swap" -version = "1.9.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a07d1f37ff60921c83bdfc7407723bdefe89b44b98a9b772f225c8f9d67141a6" +checksum = "6a3a1fd6f75306b68087b831f025c712524bcb19aad54e557b1129cfa0a2b207" dependencies = [ "rustversion", ] @@ -173,9 +173,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-lc-rs" -version = "1.16.2" +version = "1.16.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a054912289d18629dc78375ba2c3726a3afe3ff71b4edba9dedfca0e3446d1fc" +checksum = "0ec6fb3fe69024a75fa7e1bfb48aa6cf59706a101658ea01bfd33b2b248a038f" dependencies = [ "aws-lc-sys", "zeroize", @@ -183,9 +183,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.39.1" +version = "0.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83a25cf98105baa966497416dbd42565ce3a8cf8dbfd59803ec9ad46f3126399" +checksum = "f50037ee5e1e41e7b8f9d161680a725bd1626cb6f8c7e901f91f942850852fe7" dependencies = [ "cc", "cmake", @@ -228,9 +228,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" +checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" [[package]] name = "blake3" @@ -299,9 +299,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.58" +version = "1.2.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1e928d4b69e3077709075a938a05ffbedfa53a84c8f766efbf8220bb1ff60e1" +checksum = "43c5703da9466b66a946814e1adf53ea2c90f10063b86290cc9eb67ce3478a20" dependencies = [ "find-msvc-tools", "jobserver", @@ -346,7 +346,7 @@ checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" dependencies = [ "cfg-if", "cpufeatures 0.3.0", - "rand_core 0.10.0", + "rand_core 0.10.1", ] [[package]] @@ -416,9 +416,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.6.0" +version = "4.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351" +checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51" dependencies = [ "clap_builder", ] @@ -805,9 +805,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.3.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" [[package]] name = "fiat-crypto" @@ -997,7 +997,7 @@ dependencies = [ "cfg-if", "libc", "r-efi 6.0.0", - "rand_core 0.10.0", + "rand_core 0.10.1", "wasip2", "wasip3", ] @@ -1068,6 +1068,12 @@ dependencies = [ "foldhash 0.2.0", ] +[[package]] +name = "hashbrown" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" + [[package]] name = "heck" version = "0.5.0" @@ -1096,7 +1102,7 @@ dependencies = [ "idna", "ipnet", "once_cell", - "rand 0.9.2", + "rand 0.9.4", "ring", "thiserror 2.0.18", "tinyvec", @@ -1118,7 +1124,7 @@ dependencies = [ "moka", "once_cell", "parking_lot", - "rand 0.9.2", + "rand 0.9.4", "resolv-conf", "smallvec", "thiserror 2.0.18", @@ -1213,15 +1219,14 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.27.7" +version = "0.27.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" +checksum = "33ca68d021ef39cf6463ab54c1d0f5daf03377b70561305bb89a8f83aab66e0f" dependencies = [ "http", "hyper", "hyper-util", "rustls", - "rustls-pki-types", "tokio", "tokio-rustls", "tower-service", @@ -1385,12 +1390,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.13.0" +version = "2.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" +checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" dependencies = [ "equivalent", - "hashbrown 0.16.1", + "hashbrown 0.17.0", "serde", "serde_core", ] @@ -1401,7 +1406,7 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd5b3eaf1a28b758ac0faa5a4254e8ab2705605496f1b1f3fbbc3988ad73d199" dependencies = [ - "bitflags 2.11.0", + "bitflags 2.11.1", "inotify-sys", "libc", ] @@ -1534,9 +1539,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.94" +version = "0.3.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e04e2ef80ce82e13552136fabeef8a5ed1f985a96805761cbb9a2c34e7664d9" +checksum = "2964e92d1d9dc3364cae4d718d93f227e3abb088e747d92e0395bfdedf1c12ca" dependencies = [ "cfg-if", "futures-util", @@ -1578,9 +1583,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.184" +version = "0.2.185" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af" +checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" [[package]] name = "linux-raw-sys" @@ -1611,9 +1616,9 @@ checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" [[package]] name = "lru" -version = "0.16.3" +version = "0.16.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1dc47f592c06f33f8e3aea9591776ec7c9f9e4124778ff8a3c3b87159f7e593" +checksum = "7f66e8d5d03f609abc3a39e6f08e4164ebf1447a732906d39eb9b99b7919ef39" dependencies = [ "hashbrown 0.16.1", ] @@ -1705,7 +1710,7 @@ version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3" dependencies = [ - "bitflags 2.11.0", + "bitflags 2.11.1", "cfg-if", "cfg_aliases", "libc", @@ -1728,7 +1733,7 @@ version = "8.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" dependencies = [ - "bitflags 2.11.0", + "bitflags 2.11.1", "fsevent-sys", "inotify", "kqueue", @@ -1746,7 +1751,7 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42b8cfee0e339a0337359f3c88165702ac6e600dc01c0cc9579a92d62b08477a" dependencies = [ - "bitflags 2.11.0", + "bitflags 2.11.1", ] [[package]] @@ -2012,9 +2017,9 @@ checksum = "4b45fcc2344c680f5025fe57779faef368840d0bd1f42f216291f0dc4ace4744" dependencies = [ "bit-set", "bit-vec", - "bitflags 2.11.0", + "bitflags 2.11.1", "num-traits", - "rand 0.9.2", + "rand 0.9.4", "rand_chacha", "rand_xorshift", "regex-syntax", @@ -2059,7 +2064,7 @@ dependencies = [ "bytes", "getrandom 0.3.4", "lru-slab", - "rand 0.9.2", + "rand 0.9.4", "ring", "rustc-hash", "rustls", @@ -2108,9 +2113,9 @@ checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" [[package]] name = "rand" -version = "0.9.2" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" +checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea" dependencies = [ "rand_chacha", "rand_core 0.9.5", @@ -2118,13 +2123,13 @@ dependencies = [ [[package]] name = "rand" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc266eb313df6c5c09c1c7b1fbe2510961e5bcd3add930c1e31f7ed9da0feff8" +checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" dependencies = [ "chacha20 0.10.0", "getrandom 0.4.2", - "rand_core 0.10.0", + "rand_core 0.10.1", ] [[package]] @@ -2157,9 +2162,9 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba" +checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" [[package]] name = "rand_xorshift" @@ -2172,9 +2177,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" +checksum = "fb39b166781f92d482534ef4b4b1b2568f42613b53e5b6c160e24cfbfa30926d" dependencies = [ "either", "rayon-core", @@ -2196,7 +2201,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags 2.11.0", + "bitflags 2.11.1", ] [[package]] @@ -2326,7 +2331,7 @@ version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" dependencies = [ - "bitflags 2.11.0", + "bitflags 2.11.1", "errno", "libc", "linux-raw-sys", @@ -2335,9 +2340,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.37" +version = "0.23.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4" +checksum = "69f9466fb2c14ea04357e91413efb882e2a6d4a406e625449bc0a5d360d53a21" dependencies = [ "aws-lc-rs", "once_cell", @@ -2399,9 +2404,9 @@ checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" [[package]] name = "rustls-webpki" -version = "0.103.10" +version = "0.103.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef" +checksum = "8279bb85272c9f10811ae6a6c547ff594d6a7f3c6c6b02ee9726d1d0dcfcdd06" dependencies = [ "aws-lc-rs", "ring", @@ -2474,7 +2479,7 @@ version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" dependencies = [ - "bitflags 2.11.0", + "bitflags 2.11.1", "core-foundation", "core-foundation-sys", "libc", @@ -2493,9 +2498,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" [[package]] name = "sendfd" @@ -2615,7 +2620,7 @@ dependencies = [ "notify", "percent-encoding", "pin-project", - "rand 0.9.2", + "rand 0.9.4", "sealed", "sendfd", "serde", @@ -2646,7 +2651,7 @@ dependencies = [ "chacha20poly1305", "hkdf", "md-5", - "rand 0.9.2", + "rand 0.9.4", "ring-compat", "sha1", ] @@ -2741,6 +2746,12 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "symlink" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7973cce6668464ea31f176d85b13c7ab3bba2cb3b77a2ed26abd7801688010a" + [[package]] name = "syn" version = "2.0.117" @@ -2780,7 +2791,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" [[package]] name = "telemt" -version = "3.4.3" +version = "3.4.4" dependencies = [ "aes", "anyhow", @@ -2812,7 +2823,7 @@ dependencies = [ "num-traits", "parking_lot", "proptest", - "rand 0.10.0", + "rand 0.10.1", "regex", "reqwest", "rustls", @@ -2970,9 +2981,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.50.0" +version = "1.52.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" +checksum = "b67dee974fe86fd92cc45b7a95fdd2f99a36a6d7b0d431a231178d3d670bbcc6" dependencies = [ "bytes", "libc", @@ -2988,9 +2999,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.6.1" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" +checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496" dependencies = [ "proc-macro2", "quote", @@ -3123,7 +3134,7 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ - "bitflags 2.11.0", + "bitflags 2.11.1", "bytes", "futures-util", "http", @@ -3160,11 +3171,12 @@ dependencies = [ [[package]] name = "tracing-appender" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "786d480bce6247ab75f005b14ae1624ad978d3029d9113f0a22fa1ac773faeaf" +checksum = "050686193eb999b4bb3bc2acfa891a13da00f79734704c4b8b4ef1a10b368a3c" dependencies = [ "crossbeam-channel", + "symlink", "thiserror 2.0.18", "time", "tracing-subscriber", @@ -3239,9 +3251,9 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "typenum" -version = "1.19.0" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" +checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de" [[package]] name = "unarray" @@ -3297,9 +3309,9 @@ checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] name = "uuid" -version = "1.23.0" +version = "1.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ac8b6f42ead25368cf5b098aeb3dc8a1a2c05a3eee8a9a1a68c640edbfc79d9" +checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" dependencies = [ "getrandom 0.4.2", "js-sys", @@ -3354,11 +3366,11 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" [[package]] name = "wasip2" -version = "1.0.2+wasi-0.2.9" +version = "1.0.3+wasi-0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5" +checksum = "20064672db26d7cdc89c7798c48a0fdfac8213434a1186e5ef29fd560ae223d6" dependencies = [ - "wit-bindgen", + "wit-bindgen 0.57.1", ] [[package]] @@ -3367,14 +3379,14 @@ version = "0.4.0+wasi-0.3.0-rc-2026-01-06" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" dependencies = [ - "wit-bindgen", + "wit-bindgen 0.51.0", ] [[package]] name = "wasm-bindgen" -version = "0.2.117" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0551fc1bb415591e3372d0bc4780db7e587d84e2a7e79da121051c5c4b89d0b0" +checksum = "0bf938a0bacb0469e83c1e148908bd7d5a6010354cf4fb73279b7447422e3a89" dependencies = [ "cfg-if", "once_cell", @@ -3385,9 +3397,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.67" +version = "0.4.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03623de6905b7206edd0a75f69f747f134b7f0a2323392d664448bf2d3c5d87e" +checksum = "f371d383f2fb139252e0bfac3b81b265689bf45b6874af544ffa4c975ac1ebf8" dependencies = [ "js-sys", "wasm-bindgen", @@ -3395,9 +3407,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.117" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fbdf9a35adf44786aecd5ff89b4563a90325f9da0923236f6104e603c7e86be" +checksum = "eeff24f84126c0ec2db7a449f0c2ec963c6a49efe0698c4242929da037ca28ed" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3405,9 +3417,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.117" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dca9693ef2bab6d4e6707234500350d8dad079eb508dca05530c85dc3a529ff2" +checksum = "9d08065faf983b2b80a79fd87d8254c409281cf7de75fc4b773019824196c904" dependencies = [ "bumpalo", "proc-macro2", @@ -3418,9 +3430,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.117" +version = "0.2.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39129a682a6d2d841b6c429d0c51e5cb0ed1a03829d8b3d1e69a011e62cb3d3b" +checksum = "5fd04d9e306f1907bd13c6361b5c6bfc7b3b3c095ed3f8a9246390f8dbdee129" dependencies = [ "unicode-ident", ] @@ -3453,7 +3465,7 @@ version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ - "bitflags 2.11.0", + "bitflags 2.11.1", "hashbrown 0.15.5", "indexmap", "semver", @@ -3461,9 +3473,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.94" +version = "0.3.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd70027e39b12f0849461e08ffc50b9cd7688d942c1c8e3c7b22273236b4dd0a" +checksum = "4f2dfbb17949fa2088e5d39408c48368947b86f7834484e87b73de55bc14d97d" dependencies = [ "js-sys", "wasm-bindgen", @@ -3481,18 +3493,18 @@ dependencies = [ [[package]] name = "webpki-root-certs" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "804f18a4ac2676ffb4e8b5b5fa9ae38af06df08162314f96a68d2a363e21a8ca" +checksum = "f31141ce3fc3e300ae89b78c0dd67f9708061d1d2eda54b8209346fd6be9a92c" dependencies = [ "rustls-pki-types", ] [[package]] name = "webpki-roots" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed" +checksum = "52f5ee44c96cf55f1b349600768e3ece3a8f26010c05265ab73f945bb1a2eb9d" dependencies = [ "rustls-pki-types", ] @@ -3841,6 +3853,12 @@ dependencies = [ "wit-bindgen-rust-macro", ] +[[package]] +name = "wit-bindgen" +version = "0.57.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" + [[package]] name = "wit-bindgen-core" version = "0.51.0" @@ -3890,7 +3908,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", - "bitflags 2.11.0", + "bitflags 2.11.1", "indexmap", "log", "serde", @@ -3922,9 +3940,9 @@ dependencies = [ [[package]] name = "writeable" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" [[package]] name = "x25519-dalek" diff --git a/Cargo.toml b/Cargo.toml index be2df6f..c29e3c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.4.3" +version = "3.4.4" edition = "2024" [features] diff --git a/docs/Config_params/CONFIG_PARAMS.en.md b/docs/Config_params/CONFIG_PARAMS.en.md index 26abe44..df9249b 100644 --- a/docs/Config_params/CONFIG_PARAMS.en.md +++ b/docs/Config_params/CONFIG_PARAMS.en.md @@ -255,13 +255,22 @@ This document lists all configuration keys accepted by `config.toml`. ``` ## proxy_secret_path - **Constraints / validation**: `String`. When omitted, the default path is `"proxy-secret"`. Empty values are accepted by TOML/serde but will likely fail at runtime (invalid file path). - - **Description**: Path to Telegram infrastructure `proxy-secret` cache file used by ME handshake/RPC auth. Telemt always tries a fresh download from `https://core.telegram.org/getProxySecret` first, caches it to this path on success, and falls back to reading the cached file (any age) on download failure. + - **Description**: Path to Telegram infrastructure `proxy-secret` cache file used by ME handshake/RPC auth. Telemt always tries a fresh download from `https://core.telegram.org/getProxySecret` first (unless `proxy_secret_url` is set) , caches it to this path on success, and falls back to reading the cached file (any age) on download failure. - **Example**: ```toml [general] proxy_secret_path = "proxy-secret" ``` +## proxy_secret_url + - **Constraints / validation**: `String`. When omitted, the `"https://core.telegram.org/getProxySecret"` is used. + - **Description**: Optional URL to obtain `proxy-secret` file used by ME handshake/RPC auth. Telemt always tries a fresh download from this URL first (with fallback to `https://core.telegram.org/getProxySecret` if absent). + - **Example**: + + ```toml + [general] + proxy_secret_url = "https://core.telegram.org/getProxySecret" + ``` ## proxy_config_v4_cache_path - **Constraints / validation**: `String`. When set, must not be empty/whitespace-only. - **Description**: Optional disk cache path for raw `getProxyConfig` (IPv4) snapshot. At startup Telemt tries to fetch a fresh snapshot first; on fetch failure or empty snapshot it falls back to this cache file when present and non-empty. @@ -271,6 +280,15 @@ This document lists all configuration keys accepted by `config.toml`. [general] proxy_config_v4_cache_path = "cache/proxy-config-v4.txt" ``` +## proxy_config_v4_url +- **Constraints / validation**: `String`. When omitted, the `"https://core.telegram.org/getProxyConfig"` is used. +- **Description**: Optional URL to obtain raw `getProxyConfig` (IPv4). Telemt always tries a fresh download from this URL first (with fallback to `https://core.telegram.org/getProxyConfig` if absent). +- **Example**: + + ```toml + [general] + proxy_config_v4_url = "https://core.telegram.org/getProxyConfig" + ``` ## proxy_config_v6_cache_path - **Constraints / validation**: `String`. When set, must not be empty/whitespace-only. - **Description**: Optional disk cache path for raw `getProxyConfigV6` (IPv6) snapshot. At startup Telemt tries to fetch a fresh snapshot first; on fetch failure or empty snapshot it falls back to this cache file when present and non-empty. @@ -280,6 +298,15 @@ This document lists all configuration keys accepted by `config.toml`. [general] proxy_config_v6_cache_path = "cache/proxy-config-v6.txt" ``` +## proxy_config_v6_url +- **Constraints / validation**: `String`. When omitted, the `"https://core.telegram.org/getProxyConfigV6"` is used. +- **Description**: Optional URL to obtain raw `getProxyConfigV6` (IPv6). Telemt always tries a fresh download from this URL first (with fallback to `https://core.telegram.org/getProxyConfigV6` if absent). +- **Example**: + + ```toml + [general] + proxy_config_v6_url = "https://core.telegram.org/getProxyConfigV6" + ``` ## ad_tag - **Constraints / validation**: `String` (optional). When set, must be exactly 32 hex characters; invalid values are disabled during config load. - **Description**: Global fallback sponsored-channel `ad_tag` (used when user has no override in `access.user_ad_tags`). An all-zero tag is accepted but has no effect (and is warned about) until replaced with a real tag from `@MTProxybot`. @@ -2270,7 +2297,7 @@ Note: This section also accepts the legacy alias `[server.admin_api]` (same sche | --- | ---- | ------- | | [`tls_domain`](#tls_domain) | `String` | `"petrovich.ru"` | | [`tls_domains`](#tls_domains) | `String[]` | `[]` | -| [`unknown_sni_action`](#unknown_sni_action) | `"drop"`, `"mask"`, `"accept"` | `"drop"` | +| [`unknown_sni_action`](#unknown_sni_action) | `"drop"`, `"mask"`, `"accept"`, `"reject_handshake"` | `"drop"` | | [`tls_fetch_scope`](#tls_fetch_scope) | `String` | `""` | | [`tls_fetch`](#tls_fetch) | `Table` | built-in defaults | | [`mask`](#mask) | `bool` | `true` | @@ -2321,13 +2348,17 @@ Note: This section also accepts the legacy alias `[server.admin_api]` (same sche tls_domains = ["example.net", "example.org"] ``` ## unknown_sni_action - - **Constraints / validation**: `"drop"`, `"mask"` or `"accept"`. + - **Constraints / validation**: `"drop"`, `"mask"`, `"accept"` or `"reject_handshake"`. - **Description**: Action for TLS ClientHello with unknown / non-configured SNI. + - `drop` — close the connection without any response (silent FIN after `server_hello_delay` is applied). Timing-indistinguishable from the Success branch, but wire-quieter than what a real web server would do. + - `mask` — transparently proxy the connection to `mask_host:mask_port` (TLS fronting). The client receives a real ServerHello from the backend with its real certificate. Maximum camouflage, but opens an outbound connection for every misdirected request. + - `accept` — pretend the SNI is valid and continue on the auth path. Weakens active-probing resistance; only meaningful in narrow scenarios. + - `reject_handshake` — emit a fatal TLS `unrecognized_name` alert (RFC 6066, AlertDescription = 112) and close the connection. Identical on the wire to a modern nginx with `ssl_reject_handshake on;` on its default vhost: looks like an ordinary HTTPS server that simply does not host the requested name. Recommended when the goal is maximal parity with a stock web server rather than TLS fronting. `server_hello_delay` is intentionally **not** applied to this branch, so the alert is emitted "instantly" the way a reference nginx would. - **Example**: ```toml [censorship] - unknown_sni_action = "drop" + unknown_sni_action = "reject_handshake" ``` ## tls_fetch_scope - **Constraints / validation**: `String`. Value is trimmed during load; whitespace-only becomes empty. @@ -3083,5 +3114,3 @@ If your backend or network is very bandwidth-constrained, reduce cap first. If p username = "alice" password = "secret" ``` - - diff --git a/docs/Config_params/CONFIG_PARAMS.ru.md b/docs/Config_params/CONFIG_PARAMS.ru.md index eb00ffc..df0819c 100644 --- a/docs/Config_params/CONFIG_PARAMS.ru.md +++ b/docs/Config_params/CONFIG_PARAMS.ru.md @@ -255,13 +255,22 @@ ``` ## proxy_secret_path - **Ограничения / валидация**: `String`. Если этот параметр не указан, используется путь по умолчанию — «proxy-secret». Пустые значения принимаются TOML/serde, но во время выполнения произойдет ошибка (invalid file path). - - **Описание**: Путь к файлу кэша `proxy-secret` инфраструктуры Telegram, используемому ME-handshake/аутентификацией RPC. Telemt всегда сначала пытается выполнить новую загрузку с https://core.telegram.org/getProxySecret, в случае успеха кэширует ее по этому пути и возвращается к чтению кэшированного файла в случае сбоя загрузки. + - **Описание**: Путь к файлу кэша `proxy-secret` инфраструктуры Telegram, используемому ME-handshake/аутентификацией RPC. Telemt всегда сначала пытается выполнить новую загрузку с https://core.telegram.org/getProxySecret (если не установлен `proxy_secret_url`), в случае успеха кэширует ее по этому пути и возвращается к чтению кэшированного файла в случае сбоя загрузки. - **Пример**: ```toml [general] proxy_secret_path = "proxy-secret" ``` +## proxy_secret_url + - **Ограничения / валидация**: `String`. Если не указан, используется `"https://core.telegram.org/getProxySecret"`. + - **Описание**: Необязательный URL для получения файла `proxy-secret` используемого ME-handshake/аутентификацией RPC. Telemt всегда сначала пытается выполнить новую загрузку с этого URL (если не задан, используется https://core.telegram.org/getProxySecret). + - **Пример**: + + ```toml + [general] + proxy_secret_url = "https://core.telegram.org/getProxySecret" + ``` ## proxy_config_v4_cache_path - **Ограничения / валидация**: `String`. Если используется, значение не должно быть пустым или содержать только пробелы. - **Описание**: Необязательный путь к кэшу для необработанного (raw) снимка getProxyConfig (IPv4). При запуске Telemt сначала пытается получить свежий снимок; в случае сбоя выборки или пустого снимка он возвращается к этому файлу кэша, если он присутствует и не пуст. @@ -271,6 +280,15 @@ [general] proxy_config_v4_cache_path = "cache/proxy-config-v4.txt" ``` +## proxy_config_v4_url + - **Ограничения / валидация**: `String`. Если не указан, используется `"https://core.telegram.org/getProxyConfig"`. + - **Описание**: Необязательный URL для получения `getProxyConfig` (IPv4). Telemt при всегда пытается выполнить новую загрузку с этого URL (и если не задан, использует `https://core.telegram.org/getProxyConfig`). + - **Example**: + + ```toml + [general] + proxy_config_v4_url = "https://core.telegram.org/getProxyConfig" + ``` ## proxy_config_v6_cache_path - **Ограничения / валидация**: `String`. Если используется, значение не должно быть пустым или содержать только пробелы. - **Описание**: Необязательный путь к кэшу для необработанного (raw) снимка getProxyConfigV6 (IPv6). При запуске Telemt сначала пытается получить свежий снимок; в случае сбоя выборки или пустого снимка он возвращается к этому файлу кэша, если он присутствует и не пуст. @@ -280,6 +298,15 @@ [general] proxy_config_v6_cache_path = "cache/proxy-config-v6.txt" ``` +## proxy_config_v6_url +- **Ограничения / валидация**: `String`. Если не указан, используется `"https://core.telegram.org/getProxyConfigV6"`. +- **Описание**: Необязательный URL для получения `getProxyConfigV6` (IPv6). Telemt при всегда пытается выполнить новую загрузку с этого URL (и если не задан, использует `https://core.telegram.org/getProxyConfigV6`). +- **Example**: + + ```toml + [general] + proxy_config_v6_url = "https://core.telegram.org/getProxyConfigV6" + ``` ## ad_tag - **Ограничения / валидация**: `String` (необязательный параметр). Если используется, значение должно быть ровно 32 символа в шестнадцатеричной системе; недопустимые значения отключаются во время загрузки конфигурации. - **Описание**: Глобальный резервный спонсируемый канал `ad_tag` (используется, когда у пользователя нет переопределения в `access.user_ad_tags`). Тег со всеми нулями принимается, но не имеет никакого эффекта, пока не будет заменен реальным тегом от `@MTProxybot`. @@ -2197,7 +2224,7 @@ ``` ## relay_client_idle_soft_secs - **Ограничения / валидация**: Должно быть `> 0`; Должно быть меньше или равно `relay_client_idle_hard_secs`. - - **Описание**: Мягкий порог простоя (в секундах) для неактивности uplink клиента в промежуточном узле. При достижении этого порога сессия помечается как кандидат на простой и может быть удалена в зависимости от политики. + - **Описание**: Мягкий порог простоя (в секундах) для неактивности uplink клиента в промежуточном узле. При достижении этого порога сессия помечается как кандидат на простой и может быть удалена в зависимости от политики. - **Пример**: ```toml @@ -2276,7 +2303,7 @@ | --- | ---- | ------- | | [`tls_domain`](#tls_domain) | `String` | `"petrovich.ru"` | | [`tls_domains`](#tls_domains) | `String[]` | `[]` | -| [`unknown_sni_action`](#unknown_sni_action) | `"drop"`, `"mask"`, `"accept"` | `"drop"` | +| [`unknown_sni_action`](#unknown_sni_action) | `"drop"`, `"mask"`, `"accept"`, `"reject_handshake"` | `"drop"` | | [`tls_fetch_scope`](#tls_fetch_scope) | `String` | `""` | | [`tls_fetch`](#tls_fetch) | `Table` | built-in defaults | | [`mask`](#mask) | `bool` | `true` | @@ -2326,13 +2353,17 @@ tls_domains = ["example.net", "example.org"] ``` ## unknown_sni_action - - **Ограничения / валидация**: `"drop"`, `"mask"` или `"accept"`. + - **Ограничения / валидация**: `"drop"`, `"mask"`, `"accept"` или `"reject_handshake"`. - **Описание**: Действие для TLS ClientHello с неизвестным/ненастроенным SNI. + - `drop` — закрыть соединение без ответа (молчаливый FIN после применения `server_hello_delay`). Поведение, неотличимое по таймингу от Success-ветки, но более «тихое», чем у обычного веб-сервера. + - `mask` — прозрачно проксировать соединение на `mask_host:mask_port` (TLS-fronting). Клиент получает настоящий ServerHello от реального бэкенда с его сертификатом. Максимальный камуфляж, но порождает исходящее соединение на каждый чужой запрос. + - `accept` — притвориться, что SNI валиден, и продолжить auth-путь. Снижает защиту от активного пробинга; осмысленно только в узких сценариях. + - `reject_handshake` — отправить фатальный TLS-alert `unrecognized_name` (RFC 6066, AlertDescription = 112) и закрыть соединение. Поведение, идентичное современному nginx с `ssl_reject_handshake on;` на дефолтном vhost'е: на wire-уровне выглядит как обычный HTTPS-сервер, у которого просто нет такого домена. Рекомендуется, если цель — максимальная похожесть на стоковый веб-сервер, а не tls-fronting. `server_hello_delay` на эту ветку не применяется, чтобы alert улетал «мгновенно», как у эталонного nginx. - **Пример**: ```toml [censorship] - unknown_sni_action = "drop" + unknown_sni_action = "reject_handshake" ``` ## tls_fetch_scope - **Ограничения / валидация**: `String`. Значение обрезается во время загрузки; значение, состоящее только из пробелов, становится пустым. @@ -3090,5 +3121,3 @@ username = "alice" password = "secret" ``` - - diff --git a/docs/FAQ.en.md b/docs/FAQ.en.md index dec93f7..0e55c1f 100644 --- a/docs/FAQ.en.md +++ b/docs/FAQ.en.md @@ -210,6 +210,13 @@ If you need to allow connections with any domains (ignoring SNI mismatches), add unknown_sni_action = "mask" ``` +Alternatively, if you want telemt to behave like a vanilla nginx with `ssl_reject_handshake on;` on unknown SNI (emit a TLS `unrecognized_name` alert and close the connection), use: +```toml +[censorship] +unknown_sni_action = "reject_handshake" +``` +This does not recover stale clients, but it makes port 443 wire-indistinguishable from a stock web server that simply does not host the requested vhost. + ### How to view metrics 1. Open the configuration file: `nano /etc/telemt/telemt.toml`. diff --git a/docs/FAQ.ru.md b/docs/FAQ.ru.md index 73a2ba4..9b1ec52 100644 --- a/docs/FAQ.ru.md +++ b/docs/FAQ.ru.md @@ -227,6 +227,13 @@ curl -s http://127.0.0.1:9091/v1/users | jq unknown_sni_action = "mask" ``` +Альтернатива: если вы хотите, чтобы telemt на неизвестный SNI вёл себя как обычный nginx с `ssl_reject_handshake on;` (отдавал TLS-alert `unrecognized_name` и закрывал соединение), используйте: +```toml +[censorship] +unknown_sni_action = "reject_handshake" +``` +Это не пропускает старых клиентов, но делает поведение на 443-м порту неотличимым от стокового веб-сервера, у которого просто нет такого виртуального хоста. + ## Как посмотреть метрики 1. Откройте файл конфигурации: `nano /etc/telemt/telemt.toml`. diff --git a/src/config/load.rs b/src/config/load.rs index f80982f..55f38ca 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -1977,6 +1977,22 @@ mod tests { cfg_accept.censorship.unknown_sni_action, UnknownSniAction::Accept ); + + let cfg_reject: ProxyConfig = toml::from_str( + r#" + [server] + [general] + [network] + [access] + [censorship] + unknown_sni_action = "reject_handshake" + "#, + ) + .unwrap(); + assert_eq!( + cfg_reject.censorship.unknown_sni_action, + UnknownSniAction::RejectHandshake + ); } #[test] diff --git a/src/config/types.rs b/src/config/types.rs index 82ae9c3..9914b63 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -392,14 +392,26 @@ pub struct GeneralConfig { #[serde(default = "default_proxy_secret_path")] pub proxy_secret_path: Option, + /// Optional custom URL for infrastructure secret (https://core.telegram.org/getProxySecret if absent). + #[serde(default)] + pub proxy_secret_url: Option, + /// Optional path to cache raw getProxyConfig (IPv4) snapshot for startup fallback. #[serde(default = "default_proxy_config_v4_cache_path")] pub proxy_config_v4_cache_path: Option, + /// Optional custom URL for getProxyConfig (https://core.telegram.org/getProxyConfig if absent). + #[serde(default)] + pub proxy_config_v4_url: Option, + /// Optional path to cache raw getProxyConfigV6 snapshot for startup fallback. #[serde(default = "default_proxy_config_v6_cache_path")] pub proxy_config_v6_cache_path: Option, + /// Optional custom URL for getProxyConfigV6 (https://core.telegram.org/getProxyConfigV6 if absent). + #[serde(default)] + pub proxy_config_v6_url: Option, + /// Global ad_tag (32 hex chars from @MTProxybot). Fallback when user has no per-user tag in access.user_ad_tags. #[serde(default)] pub ad_tag: Option, @@ -960,8 +972,11 @@ impl Default for GeneralConfig { use_middle_proxy: default_true(), ad_tag: None, proxy_secret_path: default_proxy_secret_path(), + proxy_secret_url: None, proxy_config_v4_cache_path: default_proxy_config_v4_cache_path(), + proxy_config_v4_url: None, proxy_config_v6_cache_path: default_proxy_config_v6_cache_path(), + proxy_config_v6_url: None, middle_proxy_nat_ip: None, middle_proxy_nat_probe: default_true(), middle_proxy_nat_stun: default_middle_proxy_nat_stun(), @@ -1556,6 +1571,13 @@ pub enum UnknownSniAction { Drop, Mask, Accept, + /// Reject the TLS handshake by sending a fatal `unrecognized_name` alert + /// (RFC 6066, AlertDescription = 112) before closing the connection. + /// Mimics nginx `ssl_reject_handshake on;` behavior on the default vhost — + /// the wire response indistinguishable from a stock modern web server + /// that simply does not host the requested name. + #[serde(rename = "reject_handshake")] + RejectHandshake, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index 4e49e9e..b647915 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -66,6 +66,7 @@ pub(crate) async fn initialize_me_pool( match crate::transport::middle_proxy::fetch_proxy_secret_with_upstream( proxy_secret_path, config.general.proxy_secret_len_max, + config.general.proxy_secret_url.as_deref(), Some(upstream_manager.clone()), ) .await @@ -126,7 +127,11 @@ pub(crate) async fn initialize_me_pool( .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_PROXY_CONFIG_V4) .await; let cfg_v4 = load_startup_proxy_config_snapshot( - "https://core.telegram.org/getProxyConfig", + config + .general + .proxy_config_v4_url + .as_deref() + .unwrap_or("https://core.telegram.org/getProxyConfig"), config.general.proxy_config_v4_cache_path.as_deref(), me2dc_fallback, "getProxyConfig", @@ -158,7 +163,11 @@ pub(crate) async fn initialize_me_pool( .set_me_status(StartupMeStatus::Initializing, COMPONENT_ME_PROXY_CONFIG_V6) .await; let cfg_v6 = load_startup_proxy_config_snapshot( - "https://core.telegram.org/getProxyConfigV6", + config + .general + .proxy_config_v6_url + .as_deref() + .unwrap_or("https://core.telegram.org/getProxyConfigV6"), config.general.proxy_config_v6_cache_path.as_deref(), me2dc_fallback, "getProxyConfigV6", diff --git a/src/proxy/handshake.rs b/src/proxy/handshake.rs index 904b8f9..a9ab0ff 100644 --- a/src/proxy/handshake.rs +++ b/src/proxy/handshake.rs @@ -1132,9 +1132,20 @@ where "TLS handshake accepted by unknown SNI policy" ); } - action @ (UnknownSniAction::Drop | UnknownSniAction::Mask) => { + action @ (UnknownSniAction::Drop + | UnknownSniAction::Mask + | UnknownSniAction::RejectHandshake) => { auth_probe_record_failure_in(shared, peer.ip(), Instant::now()); - maybe_apply_server_hello_delay(config).await; + // For Drop/Mask we apply the synthetic ServerHello delay so + // the fail-closed path is timing-indistinguishable from the + // success path. For RejectHandshake we deliberately skip the + // delay: a stock modern nginx with `ssl_reject_handshake on;` + // responds with the alert essentially immediately, so + // injecting 8-24ms here would itself become a distinguisher + // against the public baseline we are trying to blend into. + if !matches!(action, UnknownSniAction::RejectHandshake) { + maybe_apply_server_hello_delay(config).await; + } let log_now = Instant::now(); if should_emit_unknown_sni_warn_in(shared, log_now) { warn!( @@ -1153,8 +1164,33 @@ where "TLS handshake rejected by unknown SNI policy" ); } + if matches!(action, UnknownSniAction::RejectHandshake) { + // TLS alert record layer: + // 0x15 ContentType.alert + // 0x03 0x03 legacy_record_version = TLS 1.2 + // (matches what modern nginx emits in + // the first server -> client record, + // per RFC 8446 5.1 guidance) + // 0x00 0x02 length = 2 + // Alert payload: + // 0x02 AlertLevel.fatal + // 0x70 AlertDescription.unrecognized_name (112, RFC 6066) + const TLS_ALERT_UNRECOGNIZED_NAME: [u8; 7] = + [0x15, 0x03, 0x03, 0x00, 0x02, 0x02, 0x70]; + if let Err(e) = writer.write_all(&TLS_ALERT_UNRECOGNIZED_NAME).await { + debug!( + peer = %peer, + error = %e, + "Failed to write unrecognized_name TLS alert" + ); + } else { + let _ = writer.flush().await; + } + } return match action { - UnknownSniAction::Drop => HandshakeResult::Error(ProxyError::UnknownTlsSni), + UnknownSniAction::Drop | UnknownSniAction::RejectHandshake => { + HandshakeResult::Error(ProxyError::UnknownTlsSni) + } UnknownSniAction::Mask => HandshakeResult::BadClient { reader, writer }, UnknownSniAction::Accept => unreachable!(), }; diff --git a/src/proxy/tests/handshake_security_tests.rs b/src/proxy/tests/handshake_security_tests.rs index 0f8fe03..df91cac 100644 --- a/src/proxy/tests/handshake_security_tests.rs +++ b/src/proxy/tests/handshake_security_tests.rs @@ -1007,6 +1007,55 @@ async fn tls_unknown_sni_mask_policy_falls_back_to_bad_client() { assert!(matches!(result, HandshakeResult::BadClient { .. })); } +#[tokio::test] +async fn tls_unknown_sni_reject_handshake_policy_emits_unrecognized_name_alert() { + use tokio::io::{AsyncReadExt, duplex}; + + let secret = [0x4Au8; 16]; + let mut config = test_config_with_secret_hex("4a4a4a4a4a4a4a4a4a4a4a4a4a4a4a4a"); + config.censorship.unknown_sni_action = UnknownSniAction::RejectHandshake; + + let replay_checker = ReplayChecker::new(128, Duration::from_secs(60)); + let rng = SecureRandom::new(); + let peer: SocketAddr = "198.51.100.192:44326".parse().unwrap(); + let handshake = + make_valid_tls_client_hello_with_sni_and_alpn(&secret, 0, "unknown.example", &[b"h2"]); + + // Wire up a duplex so we can inspect what the server writes towards the + // client. We own the "peer side" half to read from it. + let (server_side, mut peer_side) = duplex(1024); + let (server_read, server_write) = tokio::io::split(server_side); + + let result = handle_tls_handshake( + &handshake, + server_read, + server_write, + peer, + &config, + &replay_checker, + &rng, + None, + ) + .await; + + assert!(matches!( + result, + HandshakeResult::Error(ProxyError::UnknownTlsSni) + )); + + // Drain what the server wrote. We expect exactly one TLS alert record: + // 0x15 0x03 0x03 0x00 0x02 0x02 0x70 + // (ContentType.alert, TLS 1.2, length=2, fatal, unrecognized_name) + drop(result); // drops the server-side writer so peer_side sees EOF + let mut buf = Vec::new(); + peer_side.read_to_end(&mut buf).await.unwrap(); + assert_eq!( + buf, + [0x15, 0x03, 0x03, 0x00, 0x02, 0x02, 0x70], + "reject_handshake must emit a fatal unrecognized_name TLS alert" + ); +} + #[tokio::test] async fn tls_unknown_sni_accept_policy_continues_auth_path() { let secret = [0x4Bu8; 16]; diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index ebe45fc..b777445 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -321,7 +321,14 @@ async fn run_update_cycle( let mut maps_changed = false; let mut ready_v4: Option<(ProxyConfigData, u64)> = None; - let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig", upstream.clone()).await; + let cfg_v4 = retry_fetch( + cfg.general + .proxy_config_v4_url + .as_deref() + .unwrap_or("https://core.telegram.org/getProxyConfig"), + upstream.clone(), + ) + .await; if let Some(cfg_v4) = cfg_v4 && snapshot_passes_guards(cfg, &cfg_v4, "getProxyConfig") { @@ -346,7 +353,10 @@ async fn run_update_cycle( let mut ready_v6: Option<(ProxyConfigData, u64)> = None; let cfg_v6 = retry_fetch( - "https://core.telegram.org/getProxyConfigV6", + cfg.general + .proxy_config_v6_url + .as_deref() + .unwrap_or("https://core.telegram.org/getProxyConfigV6"), upstream.clone(), ) .await; @@ -430,6 +440,7 @@ async fn run_update_cycle( match download_proxy_secret_with_max_len_via_upstream( cfg.general.proxy_secret_len_max, upstream, + cfg.general.proxy_secret_url.as_deref(), ) .await { diff --git a/src/transport/middle_proxy/fairness/mod.rs b/src/transport/middle_proxy/fairness/mod.rs index 58eb890..dc9898d 100644 --- a/src/transport/middle_proxy/fairness/mod.rs +++ b/src/transport/middle_proxy/fairness/mod.rs @@ -7,7 +7,6 @@ mod model; mod pressure; mod scheduler; -#[cfg(test)] pub(crate) use model::PressureState; pub(crate) use model::{AdmissionDecision, DispatchAction, DispatchFeedback, SchedulerDecision}; pub(crate) use scheduler::{WorkerFairnessConfig, WorkerFairnessSnapshot, WorkerFairnessState}; diff --git a/src/transport/middle_proxy/fairness/model.rs b/src/transport/middle_proxy/fairness/model.rs index bdf4f9f..99a9330 100644 --- a/src/transport/middle_proxy/fairness/model.rs +++ b/src/transport/middle_proxy/fairness/model.rs @@ -77,11 +77,12 @@ pub(crate) struct FlowFairnessState { pub(crate) standing_state: StandingQueueState, pub(crate) scheduler_state: FlowSchedulerState, pub(crate) bucket_id: usize, + pub(crate) weight_quanta: u8, pub(crate) in_active_ring: bool, } impl FlowFairnessState { - pub(crate) fn new(flow_id: u64, worker_id: u16, bucket_id: usize) -> Self { + pub(crate) fn new(flow_id: u64, worker_id: u16, bucket_id: usize, weight_quanta: u8) -> Self { Self { _flow_id: flow_id, _worker_id: worker_id, @@ -97,6 +98,7 @@ impl FlowFairnessState { standing_state: StandingQueueState::Transient, scheduler_state: FlowSchedulerState::Idle, bucket_id, + weight_quanta: weight_quanta.max(1), in_active_ring: false, } } diff --git a/src/transport/middle_proxy/fairness/pressure.rs b/src/transport/middle_proxy/fairness/pressure.rs index 02a5942..6c84a4c 100644 --- a/src/transport/middle_proxy/fairness/pressure.rs +++ b/src/transport/middle_proxy/fairness/pressure.rs @@ -146,59 +146,57 @@ impl PressureEvaluator { ((signals.standing_flows.saturating_mul(100)) / signals.active_flows).min(100) as u8 }; - let mut pressure_score = 0u8; + let mut pressured = false; + let mut saturated = false; + let queue_saturated_pct = cfg + .queue_ratio_shedding_pct + .min(cfg.queue_ratio_saturated_pct); if queue_ratio_pct >= cfg.queue_ratio_pressured_pct { - pressure_score = pressure_score.max(1); + pressured = true; } - if queue_ratio_pct >= cfg.queue_ratio_shedding_pct { - pressure_score = pressure_score.max(2); - } - if queue_ratio_pct >= cfg.queue_ratio_saturated_pct { - pressure_score = pressure_score.max(3); + if queue_ratio_pct >= queue_saturated_pct { + saturated = true; } + let standing_saturated_pct = cfg + .standing_ratio_shedding_pct + .min(cfg.standing_ratio_saturated_pct); if standing_ratio_pct >= cfg.standing_ratio_pressured_pct { - pressure_score = pressure_score.max(1); + pressured = true; } - if standing_ratio_pct >= cfg.standing_ratio_shedding_pct { - pressure_score = pressure_score.max(2); - } - if standing_ratio_pct >= cfg.standing_ratio_saturated_pct { - pressure_score = pressure_score.max(3); + if standing_ratio_pct >= standing_saturated_pct { + saturated = true; } + let rejects_saturated = cfg.rejects_shedding.min(cfg.rejects_saturated); if self.admission_rejects_window >= cfg.rejects_pressured { - pressure_score = pressure_score.max(1); + pressured = true; } - if self.admission_rejects_window >= cfg.rejects_shedding { - pressure_score = pressure_score.max(2); - } - if self.admission_rejects_window >= cfg.rejects_saturated { - pressure_score = pressure_score.max(3); + if self.admission_rejects_window >= rejects_saturated { + saturated = true; } + let stalls_saturated = cfg.stalls_shedding.min(cfg.stalls_saturated); if self.route_stalls_window >= cfg.stalls_pressured { - pressure_score = pressure_score.max(1); + pressured = true; } - if self.route_stalls_window >= cfg.stalls_shedding { - pressure_score = pressure_score.max(2); - } - if self.route_stalls_window >= cfg.stalls_saturated { - pressure_score = pressure_score.max(3); + if self.route_stalls_window >= stalls_saturated { + saturated = true; } if signals.backpressured_flows > signals.active_flows.saturating_div(2) && signals.active_flows > 0 { - pressure_score = pressure_score.max(2); + pressured = true; } - match pressure_score { - 0 => PressureState::Normal, - 1 => PressureState::Pressured, - 2 => PressureState::Shedding, - _ => PressureState::Saturated, + if saturated { + PressureState::Saturated + } else if pressured { + PressureState::Pressured + } else { + PressureState::Normal } } diff --git a/src/transport/middle_proxy/fairness/scheduler.rs b/src/transport/middle_proxy/fairness/scheduler.rs index 8da3636..b8079ce 100644 --- a/src/transport/middle_proxy/fairness/scheduler.rs +++ b/src/transport/middle_proxy/fairness/scheduler.rs @@ -1,6 +1,7 @@ -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::time::{Duration, Instant}; +use crate::protocol::constants::RPC_FLAG_QUICKACK; use bytes::Bytes; use super::model::{ @@ -26,6 +27,8 @@ pub(crate) struct WorkerFairnessConfig { pub(crate) max_consecutive_stalls_before_close: u8, pub(crate) soft_bucket_count: usize, pub(crate) soft_bucket_share_pct: u8, + pub(crate) default_flow_weight: u8, + pub(crate) quickack_flow_weight: u8, pub(crate) pressure: PressureConfig, } @@ -46,6 +49,8 @@ impl Default for WorkerFairnessConfig { max_consecutive_stalls_before_close: 16, soft_bucket_count: 64, soft_bucket_share_pct: 25, + default_flow_weight: 1, + quickack_flow_weight: 4, pressure: PressureConfig::default(), } } @@ -57,9 +62,9 @@ struct FlowEntry { } impl FlowEntry { - fn new(flow_id: u64, worker_id: u16, bucket_id: usize) -> Self { + fn new(flow_id: u64, worker_id: u16, bucket_id: usize, weight_quanta: u8) -> Self { Self { - fairness: FlowFairnessState::new(flow_id, worker_id, bucket_id), + fairness: FlowFairnessState::new(flow_id, worker_id, bucket_id, weight_quanta), queue: VecDeque::new(), } } @@ -86,6 +91,7 @@ pub(crate) struct WorkerFairnessState { pressure: PressureEvaluator, flows: HashMap, active_ring: VecDeque, + active_ring_members: HashSet, total_queued_bytes: u64, bucket_queued_bytes: Vec, bucket_active_flows: Vec, @@ -108,6 +114,7 @@ impl WorkerFairnessState { pressure: PressureEvaluator::new(now), flows: HashMap::new(), active_ring: VecDeque::new(), + active_ring_members: HashSet::new(), total_queued_bytes: 0, bucket_queued_bytes: vec![0; bucket_count], bucket_active_flows: vec![0; bucket_count], @@ -184,6 +191,7 @@ impl WorkerFairnessState { } let bucket_id = self.bucket_for(conn_id); + let frame_weight = Self::weight_for_flags(&self.config, flags); let bucket_cap = self .config .max_total_queued_bytes @@ -205,12 +213,13 @@ impl WorkerFairnessState { self.bucket_active_flows[bucket_id].saturating_add(1); self.flows.insert( conn_id, - FlowEntry::new(conn_id, self.config.worker_id, bucket_id), + FlowEntry::new(conn_id, self.config.worker_id, bucket_id, frame_weight), ); self.flows .get_mut(&conn_id) .expect("flow inserted must be retrievable") }; + entry.fairness.weight_quanta = entry.fairness.weight_quanta.max(frame_weight); if entry.fairness.pending_bytes.saturating_add(frame_bytes) > self.config.max_flow_queued_bytes @@ -242,11 +251,24 @@ impl WorkerFairnessState { self.bucket_queued_bytes[bucket_id] = self.bucket_queued_bytes[bucket_id].saturating_add(frame_bytes); + let mut enqueue_active = false; if !entry.fairness.in_active_ring { entry.fairness.in_active_ring = true; - self.active_ring.push_back(conn_id); + enqueue_active = true; } + let pressure_state = self.pressure.state(); + let (before_membership, after_membership) = { + let before = Self::flow_membership(&entry.fairness); + Self::classify_flow(&self.config, pressure_state, now, &mut entry.fairness); + let after = Self::flow_membership(&entry.fairness); + (before, after) + }; + if enqueue_active { + self.enqueue_active_conn(conn_id); + } + self.apply_flow_membership_delta(before_membership, after_membership); + self.evaluate_pressure(now, true); AdmissionDecision::Admit } @@ -260,62 +282,89 @@ impl WorkerFairnessState { let Some(conn_id) = self.active_ring.pop_front() else { break; }; + if !self.active_ring_members.remove(&conn_id) { + continue; + } let mut candidate = None; let mut requeue_active = false; let mut drained_bytes = 0u64; let mut bucket_id = 0usize; + let mut should_continue = false; + let mut enqueue_active = false; + let mut membership_delta = None; let pressure_state = self.pressure.state(); if let Some(flow) = self.flows.get_mut(&conn_id) { bucket_id = flow.fairness.bucket_id; + flow.fairness.in_active_ring = false; + let before_membership = Self::flow_membership(&flow.fairness); if flow.queue.is_empty() { flow.fairness.in_active_ring = false; flow.fairness.scheduler_state = FlowSchedulerState::Idle; flow.fairness.pending_bytes = 0; + flow.fairness.deficit_bytes = 0; flow.fairness.queue_started_at = None; - continue; - } + should_continue = true; + } else { + Self::classify_flow(&self.config, pressure_state, now, &mut flow.fairness); - Self::classify_flow(&self.config, pressure_state, now, &mut flow.fairness); - - let quantum = - Self::effective_quantum_bytes(&self.config, pressure_state, &flow.fairness); - flow.fairness.deficit_bytes = flow - .fairness - .deficit_bytes - .saturating_add(i64::from(quantum)); - self.deficit_grants = self.deficit_grants.saturating_add(1); - - let front_len = flow.queue.front().map_or(0, |front| front.queued_bytes()); - if flow.fairness.deficit_bytes < front_len as i64 { - flow.fairness.consecutive_skips = - flow.fairness.consecutive_skips.saturating_add(1); - self.deficit_skips = self.deficit_skips.saturating_add(1); - requeue_active = true; - } else if let Some(frame) = flow.queue.pop_front() { - drained_bytes = frame.queued_bytes(); - flow.fairness.pending_bytes = - flow.fairness.pending_bytes.saturating_sub(drained_bytes); + let quantum = + Self::effective_quantum_bytes(&self.config, pressure_state, &flow.fairness); flow.fairness.deficit_bytes = flow .fairness .deficit_bytes - .saturating_sub(drained_bytes as i64); - flow.fairness.consecutive_skips = 0; - flow.fairness.queue_started_at = - flow.queue.front().map(|front| front.enqueued_at); - requeue_active = !flow.queue.is_empty(); - if !requeue_active { - flow.fairness.scheduler_state = FlowSchedulerState::Idle; - flow.fairness.in_active_ring = false; + .saturating_add(i64::from(quantum)); + Self::clamp_deficit_bytes(&self.config, &mut flow.fairness); + self.deficit_grants = self.deficit_grants.saturating_add(1); + + let front_len = flow.queue.front().map_or(0, |front| front.queued_bytes()); + if flow.fairness.deficit_bytes < front_len as i64 { + flow.fairness.consecutive_skips = + flow.fairness.consecutive_skips.saturating_add(1); + self.deficit_skips = self.deficit_skips.saturating_add(1); + requeue_active = true; + flow.fairness.in_active_ring = true; + enqueue_active = true; + } else if let Some(frame) = flow.queue.pop_front() { + drained_bytes = frame.queued_bytes(); + flow.fairness.pending_bytes = + flow.fairness.pending_bytes.saturating_sub(drained_bytes); + flow.fairness.deficit_bytes = flow + .fairness + .deficit_bytes + .saturating_sub(drained_bytes as i64); + Self::clamp_deficit_bytes(&self.config, &mut flow.fairness); + flow.fairness.consecutive_skips = 0; + flow.fairness.queue_started_at = + flow.queue.front().map(|front| front.enqueued_at); + requeue_active = !flow.queue.is_empty(); + if !requeue_active { + flow.fairness.scheduler_state = FlowSchedulerState::Idle; + flow.fairness.in_active_ring = false; + flow.fairness.deficit_bytes = 0; + } else { + flow.fairness.in_active_ring = true; + enqueue_active = true; + } + candidate = Some(DispatchCandidate { + pressure_state, + flow_class: flow.fairness.pressure_class, + frame, + }); } - candidate = Some(DispatchCandidate { - pressure_state, - flow_class: flow.fairness.pressure_class, - frame, - }); } + + membership_delta = Some((before_membership, Self::flow_membership(&flow.fairness))); + } + + if let Some((before_membership, after_membership)) = membership_delta { + self.apply_flow_membership_delta(before_membership, after_membership); + } + + if should_continue { + continue; } if drained_bytes > 0 { @@ -324,11 +373,8 @@ impl WorkerFairnessState { self.bucket_queued_bytes[bucket_id].saturating_sub(drained_bytes); } - if requeue_active { - if let Some(flow) = self.flows.get_mut(&conn_id) { - flow.fairness.in_active_ring = true; - } - self.active_ring.push_back(conn_id); + if requeue_active && enqueue_active { + self.enqueue_active_conn(conn_id); } if let Some(candidate) = candidate { @@ -348,7 +394,9 @@ impl WorkerFairnessState { ) -> DispatchAction { match feedback { DispatchFeedback::Routed => { + let mut membership_delta = None; if let Some(flow) = self.flows.get_mut(&conn_id) { + let before_membership = Self::flow_membership(&flow.fairness); flow.fairness.last_drain_at = Some(now); flow.fairness.recent_drain_bytes = flow .fairness @@ -358,6 +406,17 @@ impl WorkerFairnessState { if flow.fairness.scheduler_state != FlowSchedulerState::Idle { flow.fairness.scheduler_state = FlowSchedulerState::Active; } + Self::classify_flow( + &self.config, + self.pressure.state(), + now, + &mut flow.fairness, + ); + membership_delta = + Some((before_membership, Self::flow_membership(&flow.fairness))); + } + if let Some((before_membership, after_membership)) = membership_delta { + self.apply_flow_membership_delta(before_membership, after_membership); } self.evaluate_pressure(now, false); DispatchAction::Continue @@ -365,47 +424,65 @@ impl WorkerFairnessState { DispatchFeedback::QueueFull => { self.pressure.note_route_stall(now, &self.config.pressure); self.downstream_stalls = self.downstream_stalls.saturating_add(1); + let state = self.pressure.state(); let Some(flow) = self.flows.get_mut(&conn_id) else { self.evaluate_pressure(now, true); return DispatchAction::Continue; }; + let (before_membership, after_membership, should_close_flow, enqueue_active) = { + let before_membership = Self::flow_membership(&flow.fairness); + let mut enqueue_active = false; - flow.fairness.consecutive_stalls = - flow.fairness.consecutive_stalls.saturating_add(1); - flow.fairness.scheduler_state = FlowSchedulerState::Backpressured; - flow.fairness.pressure_class = FlowPressureClass::Backpressured; + flow.fairness.consecutive_stalls = + flow.fairness.consecutive_stalls.saturating_add(1); + flow.fairness.scheduler_state = FlowSchedulerState::Backpressured; + flow.fairness.pressure_class = FlowPressureClass::Backpressured; - let state = self.pressure.state(); - let should_shed_frame = matches!(state, PressureState::Saturated) - || (matches!(state, PressureState::Shedding) - && flow.fairness.standing_state == StandingQueueState::Standing - && flow.fairness.consecutive_stalls - >= self.config.max_consecutive_stalls_before_shed); + let should_shed_frame = matches!(state, PressureState::Saturated) + || (matches!(state, PressureState::Shedding) + && flow.fairness.standing_state == StandingQueueState::Standing + && flow.fairness.consecutive_stalls + >= self.config.max_consecutive_stalls_before_shed); - if should_shed_frame { - self.shed_drops = self.shed_drops.saturating_add(1); - self.fairness_penalties = self.fairness_penalties.saturating_add(1); - } else { - let frame_bytes = candidate.frame.queued_bytes(); - flow.queue.push_front(candidate.frame); - flow.fairness.pending_bytes = - flow.fairness.pending_bytes.saturating_add(frame_bytes); - flow.fairness.queue_started_at = - flow.queue.front().map(|front| front.enqueued_at); - self.total_queued_bytes = self.total_queued_bytes.saturating_add(frame_bytes); - self.bucket_queued_bytes[flow.fairness.bucket_id] = self.bucket_queued_bytes - [flow.fairness.bucket_id] - .saturating_add(frame_bytes); - if !flow.fairness.in_active_ring { - flow.fairness.in_active_ring = true; - self.active_ring.push_back(conn_id); + if should_shed_frame { + self.shed_drops = self.shed_drops.saturating_add(1); + self.fairness_penalties = self.fairness_penalties.saturating_add(1); + } else { + let frame_bytes = candidate.frame.queued_bytes(); + flow.queue.push_front(candidate.frame); + flow.fairness.pending_bytes = + flow.fairness.pending_bytes.saturating_add(frame_bytes); + flow.fairness.queue_started_at = + flow.queue.front().map(|front| front.enqueued_at); + self.total_queued_bytes = + self.total_queued_bytes.saturating_add(frame_bytes); + self.bucket_queued_bytes[flow.fairness.bucket_id] = self + .bucket_queued_bytes[flow.fairness.bucket_id] + .saturating_add(frame_bytes); + if !flow.fairness.in_active_ring { + flow.fairness.in_active_ring = true; + enqueue_active = true; + } } - } - if flow.fairness.consecutive_stalls - >= self.config.max_consecutive_stalls_before_close - && self.pressure.state() == PressureState::Saturated - { + Self::classify_flow(&self.config, state, now, &mut flow.fairness); + let after_membership = Self::flow_membership(&flow.fairness); + let should_close_flow = flow.fairness.consecutive_stalls + >= self.config.max_consecutive_stalls_before_close + && self.pressure.state() == PressureState::Saturated; + ( + before_membership, + after_membership, + should_close_flow, + enqueue_active, + ) + }; + if enqueue_active { + self.enqueue_active_conn(conn_id); + } + self.apply_flow_membership_delta(before_membership, after_membership); + + if should_close_flow { self.remove_flow(conn_id); self.evaluate_pressure(now, true); return DispatchAction::CloseFlow; @@ -426,6 +503,16 @@ impl WorkerFairnessState { let Some(entry) = self.flows.remove(&conn_id) else { return; }; + self.active_ring_members.remove(&conn_id); + self.active_ring + .retain(|queued_conn_id| *queued_conn_id != conn_id); + let (was_standing, was_backpressured) = Self::flow_membership(&entry.fairness); + if was_standing { + self.standing_flow_count = self.standing_flow_count.saturating_sub(1); + } + if was_backpressured { + self.backpressured_flow_count = self.backpressured_flow_count.saturating_sub(1); + } self.bucket_active_flows[entry.fairness.bucket_id] = self.bucket_active_flows[entry.fairness.bucket_id].saturating_sub(1); @@ -440,27 +527,6 @@ impl WorkerFairnessState { } fn evaluate_pressure(&mut self, now: Instant, force: bool) { - let mut standing = 0usize; - let mut backpressured = 0usize; - - for flow in self.flows.values_mut() { - Self::classify_flow(&self.config, self.pressure.state(), now, &mut flow.fairness); - if flow.fairness.standing_state == StandingQueueState::Standing { - standing = standing.saturating_add(1); - } - if matches!( - flow.fairness.scheduler_state, - FlowSchedulerState::Backpressured - | FlowSchedulerState::Penalized - | FlowSchedulerState::SheddingCandidate - ) { - backpressured = backpressured.saturating_add(1); - } - } - - self.standing_flow_count = standing; - self.backpressured_flow_count = backpressured; - let _ = self.pressure.maybe_evaluate( now, &self.config.pressure, @@ -468,8 +534,8 @@ impl WorkerFairnessState { PressureSignals { active_flows: self.flows.len(), total_queued_bytes: self.total_queued_bytes, - standing_flows: standing, - backpressured_flows: backpressured, + standing_flows: self.standing_flow_count, + backpressured_flows: self.backpressured_flow_count, }, force, ); @@ -481,12 +547,39 @@ impl WorkerFairnessState { now: Instant, fairness: &mut FlowFairnessState, ) { - if fairness.pending_bytes == 0 { - fairness.pressure_class = FlowPressureClass::Healthy; - fairness.standing_state = StandingQueueState::Transient; - fairness.scheduler_state = FlowSchedulerState::Idle; + let (pressure_class, standing_state, scheduler_state, standing) = + Self::derive_flow_classification(config, pressure_state, now, fairness); + fairness.pressure_class = pressure_class; + fairness.standing_state = standing_state; + fairness.scheduler_state = scheduler_state; + if scheduler_state == FlowSchedulerState::Idle { + fairness.deficit_bytes = 0; + } + if standing { + fairness.penalty_score = fairness.penalty_score.saturating_add(1); + } else { fairness.penalty_score = fairness.penalty_score.saturating_sub(1); - return; + } + } + + fn derive_flow_classification( + config: &WorkerFairnessConfig, + pressure_state: PressureState, + now: Instant, + fairness: &FlowFairnessState, + ) -> ( + FlowPressureClass, + StandingQueueState, + FlowSchedulerState, + bool, + ) { + if fairness.pending_bytes == 0 { + return ( + FlowPressureClass::Healthy, + StandingQueueState::Transient, + FlowSchedulerState::Idle, + false, + ); } let queue_age = fairness @@ -503,29 +596,165 @@ impl WorkerFairnessState { && (fairness.consecutive_stalls >= config.standing_stall_threshold || drain_stalled); if standing { - fairness.standing_state = StandingQueueState::Standing; - fairness.pressure_class = FlowPressureClass::Standing; - fairness.penalty_score = fairness.penalty_score.saturating_add(1); - fairness.scheduler_state = if pressure_state >= PressureState::Shedding { + let scheduler_state = if pressure_state >= PressureState::Shedding { FlowSchedulerState::SheddingCandidate } else { FlowSchedulerState::Penalized }; - return; + return ( + FlowPressureClass::Standing, + StandingQueueState::Standing, + scheduler_state, + true, + ); } - fairness.standing_state = StandingQueueState::Transient; if fairness.consecutive_stalls > 0 { - fairness.pressure_class = FlowPressureClass::Backpressured; - fairness.scheduler_state = FlowSchedulerState::Backpressured; - } else if fairness.pending_bytes >= config.standing_queue_min_backlog_bytes { - fairness.pressure_class = FlowPressureClass::Bursty; - fairness.scheduler_state = FlowSchedulerState::Active; - } else { - fairness.pressure_class = FlowPressureClass::Healthy; - fairness.scheduler_state = FlowSchedulerState::Active; + return ( + FlowPressureClass::Backpressured, + StandingQueueState::Transient, + FlowSchedulerState::Backpressured, + false, + ); } - fairness.penalty_score = fairness.penalty_score.saturating_sub(1); + + if fairness.pending_bytes >= config.standing_queue_min_backlog_bytes { + return ( + FlowPressureClass::Bursty, + StandingQueueState::Transient, + FlowSchedulerState::Active, + false, + ); + } + + ( + FlowPressureClass::Healthy, + StandingQueueState::Transient, + FlowSchedulerState::Active, + false, + ) + } + + #[inline] + fn flow_membership(fairness: &FlowFairnessState) -> (bool, bool) { + ( + fairness.standing_state == StandingQueueState::Standing, + Self::scheduler_state_is_backpressured(fairness.scheduler_state), + ) + } + + #[inline] + fn scheduler_state_is_backpressured(state: FlowSchedulerState) -> bool { + matches!( + state, + FlowSchedulerState::Backpressured + | FlowSchedulerState::Penalized + | FlowSchedulerState::SheddingCandidate + ) + } + + fn apply_flow_membership_delta( + &mut self, + before_membership: (bool, bool), + after_membership: (bool, bool), + ) { + if before_membership.0 != after_membership.0 { + if after_membership.0 { + self.standing_flow_count = self.standing_flow_count.saturating_add(1); + } else { + self.standing_flow_count = self.standing_flow_count.saturating_sub(1); + } + } + if before_membership.1 != after_membership.1 { + if after_membership.1 { + self.backpressured_flow_count = self.backpressured_flow_count.saturating_add(1); + } else { + self.backpressured_flow_count = self.backpressured_flow_count.saturating_sub(1); + } + } + } + + #[inline] + fn clamp_deficit_bytes(config: &WorkerFairnessConfig, fairness: &mut FlowFairnessState) { + let max_deficit = config.max_flow_queued_bytes.min(i64::MAX as u64) as i64; + fairness.deficit_bytes = fairness.deficit_bytes.clamp(0, max_deficit); + } + + #[inline] + fn enqueue_active_conn(&mut self, conn_id: u64) { + if self.active_ring_members.insert(conn_id) { + self.active_ring.push_back(conn_id); + } + } + + #[inline] + fn weight_for_flags(config: &WorkerFairnessConfig, flags: u32) -> u8 { + if (flags & RPC_FLAG_QUICKACK) != 0 { + return config.quickack_flow_weight.max(1); + } + config.default_flow_weight.max(1) + } + + #[cfg(test)] + pub(crate) fn debug_recompute_flow_counters(&self, now: Instant) -> (usize, usize) { + let pressure_state = self.pressure.state(); + let mut standing = 0usize; + let mut backpressured = 0usize; + for flow in self.flows.values() { + let (_, standing_state, scheduler_state, _) = + Self::derive_flow_classification(&self.config, pressure_state, now, &flow.fairness); + if standing_state == StandingQueueState::Standing { + standing = standing.saturating_add(1); + } + if Self::scheduler_state_is_backpressured(scheduler_state) { + backpressured = backpressured.saturating_add(1); + } + } + (standing, backpressured) + } + + #[cfg(test)] + pub(crate) fn debug_check_active_ring_consistency(&self) -> bool { + if self.active_ring.len() != self.active_ring_members.len() { + return false; + } + + let mut seen = HashSet::with_capacity(self.active_ring.len()); + for conn_id in self.active_ring.iter().copied() { + if !seen.insert(conn_id) { + return false; + } + if !self.active_ring_members.contains(&conn_id) { + return false; + } + let Some(flow) = self.flows.get(&conn_id) else { + return false; + }; + if !flow.fairness.in_active_ring || flow.queue.is_empty() { + return false; + } + } + + for (conn_id, flow) in self.flows.iter() { + let in_ring = self.active_ring_members.contains(conn_id); + if flow.fairness.in_active_ring != in_ring { + return false; + } + if in_ring && flow.queue.is_empty() { + return false; + } + } + + true + } + + #[cfg(test)] + pub(crate) fn debug_max_deficit_bytes(&self) -> i64 { + self.flows + .values() + .map(|entry| entry.fairness.deficit_bytes) + .max() + .unwrap_or(0) } fn effective_quantum_bytes( @@ -542,12 +771,14 @@ impl WorkerFairnessState { return config.penalized_quantum_bytes.max(1); } - match pressure_state { + let base_quantum = match pressure_state { PressureState::Normal => config.base_quantum_bytes.max(1), PressureState::Pressured => config.pressured_quantum_bytes.max(1), PressureState::Shedding => config.pressured_quantum_bytes.max(1), PressureState::Saturated => config.penalized_quantum_bytes.max(1), - } + }; + let weighted_quantum = base_quantum.saturating_mul(fairness.weight_quanta.max(1) as u32); + weighted_quantum.max(1) } fn bucket_for(&self, conn_id: u64) -> usize { diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index 8041185..e1e919f 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use std::io::ErrorKind; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; -use std::time::Instant; +use std::time::{Duration, Instant}; use bytes::{Bytes, BytesMut}; use tokio::io::AsyncReadExt; @@ -21,8 +21,8 @@ use crate::stats::Stats; use super::codec::{RpcChecksumMode, WriterCommand, rpc_crc}; use super::fairness::{ - AdmissionDecision, DispatchAction, DispatchFeedback, SchedulerDecision, WorkerFairnessConfig, - WorkerFairnessSnapshot, WorkerFairnessState, + AdmissionDecision, DispatchAction, DispatchFeedback, PressureState, SchedulerDecision, + WorkerFairnessConfig, WorkerFairnessSnapshot, WorkerFairnessState, }; use super::registry::RouteResult; use super::{ConnRegistry, MeResponse}; @@ -45,10 +45,22 @@ fn is_data_route_queue_full(result: RouteResult) -> bool { ) } -fn should_close_on_queue_full_streak(streak: u8) -> bool { +fn should_close_on_queue_full_streak(streak: u8, pressure_state: PressureState) -> bool { + if pressure_state < PressureState::Shedding { + return false; + } + streak >= DATA_ROUTE_QUEUE_FULL_STARVATION_THRESHOLD } +fn should_schedule_fairness_retry(snapshot: &WorkerFairnessSnapshot) -> bool { + snapshot.total_queued_bytes > 0 +} + +fn fairness_retry_delay(route_wait_ms: u64) -> Duration { + Duration::from_millis(route_wait_ms.max(1)) +} + async fn route_data_with_retry( reg: &ConnRegistry, conn_id: u64, @@ -157,7 +169,7 @@ async fn drain_fairness_scheduler( break; }; let cid = candidate.frame.conn_id; - let _pressure_state = candidate.pressure_state; + let pressure_state = candidate.pressure_state; let _flow_class = candidate.flow_class; let routed = route_data_with_retry( reg, @@ -176,7 +188,7 @@ async fn drain_fairness_scheduler( if is_data_route_queue_full(routed) { let streak = data_route_queue_full_streak.entry(cid).or_insert(0); *streak = streak.saturating_add(1); - if should_close_on_queue_full_streak(*streak) { + if should_close_on_queue_full_streak(*streak, pressure_state) { fairness.remove_flow(cid); data_route_queue_full_streak.remove(&cid); reg.unregister(cid).await; @@ -231,10 +243,33 @@ pub(crate) async fn reader_loop( let mut fairness_snapshot = fairness.snapshot(); loop { let mut tmp = [0u8; 65_536]; + let backlog_retry_enabled = should_schedule_fairness_retry(&fairness_snapshot); + let backlog_retry_delay = + fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed)); + let mut retry_only = false; let n = tokio::select! { res = rd.read(&mut tmp) => res.map_err(ProxyError::Io)?, + _ = tokio::time::sleep(backlog_retry_delay), if backlog_retry_enabled => { + retry_only = true; + 0usize + }, _ = cancel.cancelled() => return Ok(()), }; + if retry_only { + let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed); + drain_fairness_scheduler( + &mut fairness, + reg.as_ref(), + &tx, + &mut data_route_queue_full_streak, + route_wait_ms, + stats.as_ref(), + ) + .await; + let current_snapshot = fairness.snapshot(); + apply_fairness_metrics_delta(stats.as_ref(), &mut fairness_snapshot, current_snapshot); + continue; + } if n == 0 { stats.increment_me_reader_eof_total(); return Err(ProxyError::Io(std::io::Error::new( @@ -317,12 +352,9 @@ pub(crate) async fn reader_loop( stats.increment_me_route_drop_queue_full_high(); let streak = data_route_queue_full_streak.entry(cid).or_insert(0); *streak = streak.saturating_add(1); - if should_close_on_queue_full_streak(*streak) - || matches!( - admission, - AdmissionDecision::RejectSaturated - | AdmissionDecision::RejectStandingFlow - ) + let pressure_state = fairness.pressure_state(); + if should_close_on_queue_full_streak(*streak, pressure_state) + || matches!(admission, AdmissionDecision::RejectSaturated) { fairness.remove_flow(cid); data_route_queue_full_streak.remove(&cid); @@ -445,14 +477,18 @@ pub(crate) async fn reader_loop( #[cfg(test)] mod tests { + use std::time::Duration; + use bytes::Bytes; + use super::PressureState; use crate::transport::middle_proxy::ConnRegistry; use super::{ - MeResponse, RouteResult, is_data_route_queue_full, route_data_with_retry, - should_close_on_queue_full_streak, should_close_on_route_result_for_ack, - should_close_on_route_result_for_data, + MeResponse, RouteResult, WorkerFairnessSnapshot, fairness_retry_delay, + is_data_route_queue_full, route_data_with_retry, should_close_on_queue_full_streak, + should_close_on_route_result_for_ack, should_close_on_route_result_for_data, + should_schedule_fairness_retry, }; #[test] @@ -475,10 +511,38 @@ mod tests { assert!(is_data_route_queue_full(RouteResult::QueueFullBase)); assert!(is_data_route_queue_full(RouteResult::QueueFullHigh)); assert!(!is_data_route_queue_full(RouteResult::NoConn)); - assert!(!should_close_on_queue_full_streak(1)); - assert!(!should_close_on_queue_full_streak(2)); - assert!(should_close_on_queue_full_streak(3)); - assert!(should_close_on_queue_full_streak(u8::MAX)); + assert!(!should_close_on_queue_full_streak(1, PressureState::Normal)); + assert!(!should_close_on_queue_full_streak( + 2, + PressureState::Pressured + )); + assert!(!should_close_on_queue_full_streak( + 3, + PressureState::Pressured + )); + assert!(should_close_on_queue_full_streak( + 3, + PressureState::Shedding + )); + assert!(should_close_on_queue_full_streak( + u8::MAX, + PressureState::Saturated + )); + } + + #[test] + fn fairness_retry_is_scheduled_only_when_queue_has_pending_bytes() { + let mut snapshot = WorkerFairnessSnapshot::default(); + assert!(!should_schedule_fairness_retry(&snapshot)); + + snapshot.total_queued_bytes = 1; + assert!(should_schedule_fairness_retry(&snapshot)); + } + + #[test] + fn fairness_retry_delay_never_drops_below_one_millisecond() { + assert_eq!(fairness_retry_delay(0), Duration::from_millis(1)); + assert_eq!(fairness_retry_delay(2), Duration::from_millis(2)); } #[test] diff --git a/src/transport/middle_proxy/secret.rs b/src/transport/middle_proxy/secret.rs index a167773..e3495e8 100644 --- a/src/transport/middle_proxy/secret.rs +++ b/src/transport/middle_proxy/secret.rs @@ -37,20 +37,26 @@ pub(super) fn validate_proxy_secret_len(data_len: usize, max_len: usize) -> Resu /// Fetch Telegram proxy-secret binary. #[allow(dead_code)] -pub async fn fetch_proxy_secret(cache_path: Option<&str>, max_len: usize) -> Result> { - fetch_proxy_secret_with_upstream(cache_path, max_len, None).await +pub async fn fetch_proxy_secret( + cache_path: Option<&str>, + max_len: usize, + proxy_secret_url: Option<&str>, +) -> Result> { + fetch_proxy_secret_with_upstream(cache_path, max_len, proxy_secret_url, None).await } /// Fetch Telegram proxy-secret binary, optionally through upstream routing. pub async fn fetch_proxy_secret_with_upstream( cache_path: Option<&str>, max_len: usize, + proxy_secret_url: Option<&str>, upstream: Option>, ) -> Result> { let cache = cache_path.unwrap_or("proxy-secret"); // 1) Try fresh download first. - match download_proxy_secret_with_max_len_via_upstream(max_len, upstream).await { + match download_proxy_secret_with_max_len_via_upstream(max_len, upstream, proxy_secret_url).await + { Ok(data) => { if let Err(e) = tokio::fs::write(cache, &data).await { warn!(error = %e, "Failed to cache proxy-secret (non-fatal)"); @@ -91,14 +97,19 @@ pub async fn fetch_proxy_secret_with_upstream( #[allow(dead_code)] pub async fn download_proxy_secret_with_max_len(max_len: usize) -> Result> { - download_proxy_secret_with_max_len_via_upstream(max_len, None).await + download_proxy_secret_with_max_len_via_upstream(max_len, None, None).await } pub async fn download_proxy_secret_with_max_len_via_upstream( max_len: usize, upstream: Option>, + proxy_secret_url: Option<&str>, ) -> Result> { - let resp = https_get("https://core.telegram.org/getProxySecret", upstream).await?; + let resp = https_get( + proxy_secret_url.unwrap_or("https://core.telegram.org/getProxySecret"), + upstream, + ) + .await?; if !(200..=299).contains(&resp.status) { return Err(ProxyError::Proxy(format!( diff --git a/src/transport/middle_proxy/tests/fairness_security_tests.rs b/src/transport/middle_proxy/tests/fairness_security_tests.rs index 41a8d86..b32493b 100644 --- a/src/transport/middle_proxy/tests/fairness_security_tests.rs +++ b/src/transport/middle_proxy/tests/fairness_security_tests.rs @@ -2,6 +2,7 @@ use std::time::{Duration, Instant}; use bytes::Bytes; +use crate::protocol::constants::RPC_FLAG_QUICKACK; use crate::transport::middle_proxy::fairness::{ AdmissionDecision, DispatchAction, DispatchFeedback, PressureState, SchedulerDecision, WorkerFairnessConfig, WorkerFairnessState, @@ -114,6 +115,62 @@ fn fairness_keeps_fast_flow_progress_under_slow_neighbor() { assert!(snapshot.total_queued_bytes <= 64 * 1024); } +#[test] +fn fairness_prioritizes_quickack_flow_when_weights_enabled() { + let mut now = Instant::now(); + let mut fairness = WorkerFairnessState::new( + WorkerFairnessConfig { + max_total_queued_bytes: 256 * 1024, + max_flow_queued_bytes: 128 * 1024, + base_quantum_bytes: 8 * 1024, + pressured_quantum_bytes: 8 * 1024, + penalized_quantum_bytes: 8 * 1024, + default_flow_weight: 1, + quickack_flow_weight: 4, + ..WorkerFairnessConfig::default() + }, + now, + ); + + for _ in 0..8 { + assert_eq!( + fairness.enqueue_data(10, RPC_FLAG_QUICKACK, enqueue_payload(16 * 1024), now), + AdmissionDecision::Admit + ); + assert_eq!( + fairness.enqueue_data(20, 0, enqueue_payload(16 * 1024), now), + AdmissionDecision::Admit + ); + } + + let mut quickack_dispatched = 0u64; + let mut bulk_dispatched = 0u64; + for _ in 0..64 { + now += Duration::from_millis(1); + let SchedulerDecision::Dispatch(candidate) = fairness.next_decision(now) else { + break; + }; + + if candidate.frame.conn_id == 10 { + quickack_dispatched = quickack_dispatched.saturating_add(1); + } else if candidate.frame.conn_id == 20 { + bulk_dispatched = bulk_dispatched.saturating_add(1); + } + + let _ = fairness.apply_dispatch_feedback( + candidate.frame.conn_id, + candidate, + DispatchFeedback::Routed, + now, + ); + } + + assert!( + quickack_dispatched > bulk_dispatched, + "quickack flow must receive higher dispatch rate with larger weight" + ); +} + #[test] fn fairness_pressure_hysteresis_prevents_instant_flapping() { let mut now = Instant::now(); @@ -128,7 +185,7 @@ fn fairness_pressure_hysteresis_prevents_instant_flapping() { let mut fairness = WorkerFairnessState::new(cfg, now); - for _ in 0..4 { + for _ in 0..3 { assert_eq!( fairness.enqueue_data(9, 0, enqueue_payload(900), now), AdmissionDecision::Admit @@ -180,6 +237,12 @@ fn fairness_randomized_sequence_preserves_memory_bounds() { } let snapshot = fairness.snapshot(); + let (standing_recomputed, backpressured_recomputed) = + fairness.debug_recompute_flow_counters(now); assert!(snapshot.total_queued_bytes <= 32 * 1024); + assert_eq!(snapshot.standing_flows, standing_recomputed); + assert_eq!(snapshot.backpressured_flows, backpressured_recomputed); + assert!(fairness.debug_check_active_ring_consistency()); + assert!(fairness.debug_max_deficit_bytes() <= 4 * 1024); } }