diff --git a/Cargo.lock b/Cargo.lock index a44206a..71423e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -111,9 +111,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "asn1-rs" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56624a96882bb8c26d61312ae18cb45868e5a9992ea73c58e45c3101e56a1e60" +checksum = "b7f43a50ac4fdca5df8e885c21b835997f0a1cdee65494a6847694a98652d9d8" dependencies = [ "asn1-rs-derive", "asn1-rs-impl", @@ -121,7 +121,7 @@ dependencies = [ "nom", "num-traits", "rusticata-macros", - "thiserror 2.0.18", + "thiserror", "time", ] @@ -167,15 +167,15 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "autocfg" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +checksum = "f2032f911046de80f0a198e0901378627c33f59ea0ac00e363d481118bd70a53" [[package]] name = "aws-lc-rs" -version = "1.16.3" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ec6fb3fe69024a75fa7e1bfb48aa6cf59706a101658ea01bfd33b2b248a038f" +checksum = "5ec2f1fc3ec205783a5da9a7e6c1509cc69dedf09a1949e412c1e18469326d00" dependencies = [ "aws-lc-sys", "zeroize", @@ -183,9 +183,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.40.0" +version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f50037ee5e1e41e7b8f9d161680a725bd1626cb6f8c7e901f91f942850852fe7" +checksum = "1a2f9779ce85b93ab6170dd940ad0169b5766ff848247aff13bb788b832fe3f4" dependencies = [ "cc", "cmake", @@ -220,12 +220,6 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - [[package]] name = "bitflags" version = "2.11.1" @@ -234,9 +228,9 @@ checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" [[package]] name = "blake3" -version = "1.8.4" +version = "1.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d2d5991425dfd0785aed03aedcf0b321d61975c9b5b3689c774a2610ae0b51e" +checksum = "0aa83c34e62843d924f905e0f5c866eb1dd6545fc4d719e803d9ba6030371fce" dependencies = [ "arrayref", "arrayvec", @@ -266,9 +260,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.20.2" +version = "3.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" +checksum = "72f5acc6cb2ba439de613abc23857ec3d78374d8ed5ac84e9d11336e87da8649" [[package]] name = "byte_string" @@ -299,9 +293,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.60" +version = "1.2.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43c5703da9466b66a946814e1adf53ea2c90f10063b86290cc9eb67ce3478a20" +checksum = "556e016178bb5662a08681bbe0f00f8e17631781a4dfc8c45e466e4b185ec27f" dependencies = [ "find-msvc-tools", "jobserver", @@ -309,12 +303,6 @@ dependencies = [ "shlex", ] -[[package]] -name = "cesu8" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" - [[package]] name = "cfg-if" version = "1.0.4" @@ -660,9 +648,9 @@ dependencies = [ [[package]] name = "dashmap" -version = "6.1.0" +version = "6.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +checksum = "e6361d5c062261c78a176addb82d4c821ae42bed6089de0e12603cd25de2059c" dependencies = [ "cfg-if", "crossbeam-utils", @@ -674,9 +662,9 @@ dependencies = [ [[package]] name = "data-encoding" -version = "2.10.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" +checksum = "a4ae5f15dda3c708c0ade84bfee31ccab44a3da4f88015ed22f63732abe300c8" [[package]] name = "der" @@ -724,9 +712,9 @@ dependencies = [ [[package]] name = "displaydoc" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +checksum = "1ac70aa55017e108007fbaf5aa0f54b021c98f92ff8af59d42eda9da96e3dd4f" dependencies = [ "proc-macro2", "quote", @@ -771,9 +759,9 @@ dependencies = [ [[package]] name = "either" -version = "1.15.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +checksum = "91622ff5e7162018101f2fea40d6ebf4a78bbe5a49736a2020649edf9693679e" [[package]] name = "enum-as-inner" @@ -1014,9 +1002,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" +checksum = "171fefbc92fe4a4de27e0698d6a5b392d6a0e333506bc49133760b3bcf948733" dependencies = [ "atomic-waker", "bytes", @@ -1070,9 +1058,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.17.0" +version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" +checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" [[package]] name = "heck" @@ -1104,7 +1092,7 @@ dependencies = [ "once_cell", "rand 0.9.4", "ring", - "thiserror 2.0.18", + "thiserror", "tinyvec", "tokio", "tracing", @@ -1127,7 +1115,7 @@ dependencies = [ "rand 0.9.4", "resolv-conf", "smallvec", - "thiserror 2.0.18", + "thiserror", "tokio", "tracing", ] @@ -1152,9 +1140,9 @@ dependencies = [ [[package]] name = "http" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" +checksum = "8be7462df143984c4598a256ef469b251d7d7f9e271135073e78fc535414f3d0" dependencies = [ "bytes", "itoa", @@ -1197,9 +1185,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca" +checksum = "eb92f162bf56536459fc83c79b974bb12837acfed43d6bc370a7916d0ae15ecc" dependencies = [ "atomic-waker", "bytes", @@ -1380,9 +1368,9 @@ dependencies = [ [[package]] name = "idna_adapter" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +checksum = "cb68373c0d6620ef8105e855e7745e18b0d00d3bdb07fb532e434244cdb9a714" dependencies = [ "icu_normalizer", "icu_properties", @@ -1395,7 +1383,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" dependencies = [ "equivalent", - "hashbrown 0.17.0", + "hashbrown 0.17.1", "serde", "serde_core", ] @@ -1406,7 +1394,7 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd5b3eaf1a28b758ac0faa5a4254e8ab2705605496f1b1f3fbbc3988ad73d199" dependencies = [ - "bitflags 2.11.1", + "bitflags", "inotify-sys", "libc", ] @@ -1458,16 +1446,6 @@ dependencies = [ "serde", ] -[[package]] -name = "iri-string" -version = "0.7.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25e659a4bb38e810ebc252e53b5814ff908a8c58c2a9ce2fae1bbec24cbf4e20" -dependencies = [ - "memchr", - "serde", -] - [[package]] name = "itertools" version = "0.13.0" @@ -1485,27 +1463,32 @@ checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" [[package]] name = "jni" -version = "0.21.1" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" +checksum = "5efd9a482cf3a427f00d6b35f14332adc7902ce91efb778580e180ff90fa3498" dependencies = [ - "cesu8", "cfg-if", "combine", - "jni-sys 0.3.1", + "jni-macros", + "jni-sys", "log", - "thiserror 1.0.69", + "simd_cesu8", + "thiserror", "walkdir", - "windows-sys 0.45.0", + "windows-link", ] [[package]] -name = "jni-sys" -version = "0.3.1" +name = "jni-macros" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41a652e1f9b6e0275df1f15b32661cf0d4b78d4d87ddec5e0c3c20f097433258" +checksum = "a00109accc170f0bdb141fed3e393c565b6f5e072365c3bd58f5b062591560a3" dependencies = [ - "jni-sys 0.4.1", + "proc-macro2", + "quote", + "rustc_version", + "simd_cesu8", + "syn", ] [[package]] @@ -1539,9 +1522,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.95" +version = "0.3.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2964e92d1d9dc3364cae4d718d93f227e3abb088e747d92e0395bfdedf1c12ca" +checksum = "142bc4740e452c1e57ade0cbc129f139c9093e354346f0872ef985f4f5cf5f11" dependencies = [ "cfg-if", "futures-util", @@ -1561,11 +1544,11 @@ dependencies = [ [[package]] name = "kqueue-sys" -version = "1.0.4" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +checksum = "07293a4e297ac234359b510362495713f75ea345d5307140414f20c69ffeb087" dependencies = [ - "bitflags 1.3.2", + "bitflags", "libc", ] @@ -1583,9 +1566,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.185" +version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" +checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" [[package]] name = "linux-raw-sys" @@ -1610,9 +1593,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.29" +version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +checksum = "616ec5685824bcc94416c6d4a7a446eea774a31efd7062c8480ba6fd06d7a6e5" [[package]] name = "lru" @@ -1656,9 +1639,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.8.0" +version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +checksum = "6b947ae49db0d222b1dbc6b113ce7248a3fc3a6ca21b696717bfc000ba4484d8" [[package]] name = "memoffset" @@ -1677,9 +1660,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "mio" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" +checksum = "02bd0af71c67b473010cbbc60715ee815645a4dc942899111f494b4b737d6fda" dependencies = [ "libc", "log", @@ -1706,11 +1689,11 @@ dependencies = [ [[package]] name = "nix" -version = "0.31.2" +version = "0.31.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3" +checksum = "cf20d2fde8ff38632c426f1165ed7436270b44f199fc55284c38276f9db47c3d" dependencies = [ - "bitflags 2.11.1", + "bitflags", "cfg-if", "cfg_aliases", "libc", @@ -1733,7 +1716,7 @@ version = "8.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" dependencies = [ - "bitflags 2.11.1", + "bitflags", "fsevent-sys", "inotify", "kqueue", @@ -1751,7 +1734,7 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42b8cfee0e339a0337359f3c88165702ac6e600dc01c0cc9579a92d62b08477a" dependencies = [ - "bitflags 2.11.1", + "bitflags", ] [[package]] @@ -1775,9 +1758,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" +checksum = "521739c6d2bac4aa25192232afe6841231376b2b26d4d9fae5ecf8ca5772e441" [[package]] name = "num-integer" @@ -1875,18 +1858,18 @@ checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" [[package]] name = "pin-project" -version = "1.1.11" +version = "1.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517" +checksum = "2466b2336ed02bcdca6b294417127b90ec92038d1d5c4fbeac971a922e0e0924" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.11" +version = "1.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" +checksum = "c96395f0a926bc13b1c17622aaddda1ecb55d49c8f1bf9777e4d877800a43f8b" dependencies = [ "proc-macro2", "quote", @@ -2017,7 +2000,7 @@ checksum = "4b45fcc2344c680f5025fe57779faef368840d0bd1f42f216291f0dc4ace4744" dependencies = [ "bit-set", "bit-vec", - "bitflags 2.11.1", + "bitflags", "num-traits", "rand 0.9.4", "rand_chacha", @@ -2048,7 +2031,7 @@ dependencies = [ "rustc-hash", "rustls", "socket2", - "thiserror 2.0.18", + "thiserror", "tokio", "tracing", "web-time", @@ -2070,7 +2053,7 @@ dependencies = [ "rustls", "rustls-pki-types", "slab", - "thiserror 2.0.18", + "thiserror", "tinyvec", "tracing", "web-time", @@ -2201,7 +2184,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags 2.11.1", + "bitflags", ] [[package]] @@ -2235,9 +2218,9 @@ checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" [[package]] name = "reqwest" -version = "0.13.2" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab3f43e3283ab1488b624b44b0e988d0acea0b3214e694730a055cb6b2efa801" +checksum = "219c5811de6525e5416c7d5d53bb656d3afdbc6c5af816e0802bcfa42dbdc1c3" dependencies = [ "base64", "bytes", @@ -2331,7 +2314,7 @@ version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" dependencies = [ - "bitflags 2.11.1", + "bitflags", "errno", "libc", "linux-raw-sys", @@ -2340,9 +2323,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.38" +version = "0.23.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f9466fb2c14ea04357e91413efb882e2a6d4a406e625449bc0a5d360d53a21" +checksum = "ef86cd5876211988985292b91c96a8f2d298df24e75989a43a3c73f2d4d8168b" dependencies = [ "aws-lc-rs", "once_cell", @@ -2367,9 +2350,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.14.0" +version = "1.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" +checksum = "30a7197ae7eb376e574fe940d068c30fe0462554a3ddbe4eca7838e049c937a9" dependencies = [ "web-time", "zeroize", @@ -2377,9 +2360,9 @@ dependencies = [ [[package]] name = "rustls-platform-verifier" -version = "0.6.2" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" +checksum = "26d1e2536ce4f35f4846aa13bff16bd0ff40157cdb14cc056c7b14ba41233ba0" dependencies = [ "core-foundation", "core-foundation-sys", @@ -2479,7 +2462,7 @@ version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" dependencies = [ - "bitflags 2.11.1", + "bitflags", "core-foundation", "core-foundation-sys", "libc", @@ -2544,9 +2527,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.149" +version = "1.0.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +checksum = "e8014e44b4736ed0538adeecded0fce2a272f22dc9578a7eb6b2d9993c74cfb9" dependencies = [ "itoa", "memchr", @@ -2629,7 +2612,7 @@ dependencies = [ "shadowsocks-crypto", "socket2", "spin", - "thiserror 2.0.18", + "thiserror", "tokio", "tokio-tfo", "trait-variant", @@ -2667,9 +2650,9 @@ dependencies = [ [[package]] name = "shlex" -version = "1.3.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +checksum = "f8fadd59c855ef2080decdef8ff161eb6661b86933c9d82e5ba29dc602a55aba" [[package]] name = "signal-hook-registry" @@ -2687,6 +2670,22 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +[[package]] +name = "simd_cesu8" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94f90157bb87cddf702797c5dadfa0be7d266cdf49e22da2fcaa32eff75b2c33" +dependencies = [ + "rustc_version", + "simdutf8", +] + +[[package]] +name = "simdutf8" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" + [[package]] name = "slab" version = "0.4.12" @@ -2701,9 +2700,9 @@ checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" [[package]] name = "socket2" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" +checksum = "52d1cfed4120b4d927bf7c0f86d2087a4a7d6027c906d9f9d525a80573b9be51" dependencies = [ "libc", "windows-sys 0.61.2", @@ -2791,7 +2790,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" [[package]] name = "telemt" -version = "3.4.12" +version = "3.4.13" dependencies = [ "aes", "anyhow", @@ -2835,7 +2834,7 @@ dependencies = [ "socket2", "static_assertions", "subtle", - "thiserror 2.0.18", + "thiserror", "tokio", "tokio-rustls", "tokio-test", @@ -2864,33 +2863,13 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "thiserror" -version = "1.0.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" -dependencies = [ - "thiserror-impl 1.0.69", -] - [[package]] name = "thiserror" version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl 2.0.18", -] - -[[package]] -name = "thiserror-impl" -version = "1.0.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "thiserror-impl", ] [[package]] @@ -2981,9 +2960,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.52.1" +version = "1.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b67dee974fe86fd92cc45b7a95fdd2f99a36a6d7b0d431a231178d3d670bbcc6" +checksum = "8fc7f01b389ac15039e4dc9531aa973a135d7a4135281b12d7c1bc79fd57fffe" dependencies = [ "bytes", "libc", @@ -3130,20 +3109,20 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.8" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" +checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" dependencies = [ - "bitflags 2.11.1", + "bitflags", "bytes", "futures-util", "http", "http-body", - "iri-string", "pin-project-lite", "tower", "tower-layer", "tower-service", + "url", ] [[package]] @@ -3177,7 +3156,7 @@ checksum = "050686193eb999b4bb3bc2acfa891a13da00f79734704c4b8b4ef1a10b368a3c" dependencies = [ "crossbeam-channel", "symlink", - "thiserror 2.0.18", + "thiserror", "time", "tracing-subscriber", ] @@ -3384,9 +3363,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.118" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bf938a0bacb0469e83c1e148908bd7d5a6010354cf4fb73279b7447422e3a89" +checksum = "3ed04576f974d2b2fba0f38c51dbc5518011e38c36bf1143164be765528fd409" dependencies = [ "cfg-if", "once_cell", @@ -3397,9 +3376,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.68" +version = "0.4.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f371d383f2fb139252e0bfac3b81b265689bf45b6874af544ffa4c975ac1ebf8" +checksum = "9473dbd2991ae90b6291c3c32c30c6187ac49aa32f9905d1cce280ec1e110b0f" dependencies = [ "js-sys", "wasm-bindgen", @@ -3407,9 +3386,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.118" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eeff24f84126c0ec2db7a449f0c2ec963c6a49efe0698c4242929da037ca28ed" +checksum = "916151b09da36bd82f6615cbf3a419e2f0ba23a03c6160e8e92eb6bd4aa1dec6" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3417,9 +3396,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.118" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d08065faf983b2b80a79fd87d8254c409281cf7de75fc4b773019824196c904" +checksum = "299047362ccbfce148b67ab7e73349f77748e00c8296f9542adfad2ad82c5c5e" dependencies = [ "bumpalo", "proc-macro2", @@ -3430,9 +3409,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.118" +version = "0.2.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd04d9e306f1907bd13c6361b5c6bfc7b3b3c095ed3f8a9246390f8dbdee129" +checksum = "9a929b2c61f11ba3e9bc35b50c1f25cb38e0e892c0c231ae2b8cf78d5dad4437" dependencies = [ "unicode-ident", ] @@ -3465,7 +3444,7 @@ version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ - "bitflags 2.11.1", + "bitflags", "hashbrown 0.15.5", "indexmap", "semver", @@ -3473,9 +3452,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.95" +version = "0.3.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f2dfbb17949fa2088e5d39408c48368947b86f7834484e87b73de55bc14d97d" +checksum = "6d621441cfc37b84979402712047321980c178f299193a3589d05b99e8763436" dependencies = [ "js-sys", "wasm-bindgen", @@ -3616,15 +3595,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "windows-sys" -version = "0.45.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" -dependencies = [ - "windows-targets 0.42.2", -] - [[package]] name = "windows-sys" version = "0.52.0" @@ -3652,21 +3622,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "windows-targets" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" -dependencies = [ - "windows_aarch64_gnullvm 0.42.2", - "windows_aarch64_msvc 0.42.2", - "windows_i686_gnu 0.42.2", - "windows_i686_msvc 0.42.2", - "windows_x86_64_gnu 0.42.2", - "windows_x86_64_gnullvm 0.42.2", - "windows_x86_64_msvc 0.42.2", -] - [[package]] name = "windows-targets" version = "0.52.6" @@ -3700,12 +3655,6 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" - [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -3718,12 +3667,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" -[[package]] -name = "windows_aarch64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" - [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -3736,12 +3679,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" -[[package]] -name = "windows_i686_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" - [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -3766,12 +3703,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" -[[package]] -name = "windows_i686_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" - [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -3784,12 +3715,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" -[[package]] -name = "windows_x86_64_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" - [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -3802,12 +3727,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" - [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -3820,12 +3739,6 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" -[[package]] -name = "windows_x86_64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" - [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -3840,9 +3753,9 @@ checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" [[package]] name = "winnow" -version = "1.0.1" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09dac053f1cd375980747450bfc7250c264eaae0583872e845c0c7cd578872b5" +checksum = "0592e1c9d151f854e6fd382574c3a0855250e1d9b2f99d9281c6e6391af352f1" [[package]] name = "wit-bindgen" @@ -3908,7 +3821,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", - "bitflags 2.11.1", + "bitflags", "indexmap", "log", "serde", @@ -3969,7 +3882,7 @@ dependencies = [ "nom", "oid-registry", "rusticata-macros", - "thiserror 2.0.18", + "thiserror", "time", ] @@ -3998,18 +3911,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.48" +version = "0.8.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" +checksum = "bce33a6288fa3f072a8c2c7d0f2fdbb90e28298f0135c1f99b96c3db2efcc60b" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.48" +version = "0.8.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" +checksum = "8fd425244944f4ab65ccff928e7323354c5a018c75838362fdce749dfad2ee1e" dependencies = [ "proc-macro2", "quote", @@ -4018,9 +3931,9 @@ dependencies = [ [[package]] name = "zerofrom" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df" +checksum = "0ec05a11813ea801ff6d75110ad09cd0824ddba17dfe17128ea0d5f68e6c5272" dependencies = [ "zerofrom-derive", ] diff --git a/Cargo.toml b/Cargo.toml index b69aef7..99570db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.4.12" +version = "3.4.13" edition = "2024" [features] diff --git a/docker-compose.yml b/docker-compose.yml index a6afb51..a90efde 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,12 +10,15 @@ services: - "443:443" - "127.0.0.1:9090:9090" - "127.0.0.1:9091:9091" - # Allow caching 'proxy-secret' in read-only container - working_dir: /etc/telemt + # Working dir uses tmpfs for caching 'proxy-secret' at runtime. + # Config is mounted as a directory (not a single file) so the API can + # atomically update config.toml via write-temp → rename within the same FS. + working_dir: /run/telemt + command: ["/etc/telemt/config.toml"] volumes: - - ./config.toml:/etc/telemt/config.toml:ro + - ./config:/etc/telemt:rw tmpfs: - - /etc/telemt:rw,mode=1777,size=4m + - /run/telemt:rw,mode=1777,size=4m environment: - RUST_LOG=info healthcheck: @@ -24,8 +27,6 @@ services: timeout: 5s retries: 3 start_period: 20s - # Uncomment this line if you want to use host network for IPv6, but bridge is default and usually better - # network_mode: host cap_drop: - ALL cap_add: diff --git a/docs/Quick_start/QUICK_START_GUIDE.ru.md b/docs/Quick_start/QUICK_START_GUIDE.ru.md index d3d06c1..99b0a40 100644 --- a/docs/Quick_start/QUICK_START_GUIDE.ru.md +++ b/docs/Quick_start/QUICK_START_GUIDE.ru.md @@ -235,7 +235,10 @@ curl -s http://127.0.0.1:9091/v1/users | jq -r '.data[] | "[\(.username)]", (.li # Telemt через Docker Compose -**1. Отредактируйте `config.toml` в корневом каталоге репозитория (как минимум: порт, пользовательские секреты, tls_domain)** +**1. Создайте директорию `config/` и поместите в неё отрдеактированный `config.toml` (указав как минимум: порт, пользовательские секреты, tls_domain):** +```bash +mkdir config && mv config.toml config/ +``` **2. Запустите контейнер:** ```bash docker compose up -d --build @@ -249,7 +252,7 @@ docker compose logs -f telemt docker compose down ``` > [!NOTE] -> - В `docker-compose.yml` файл `./config.toml` монтируется в `/app/config.toml` (доступно только для чтения) +> - Директория `./config/` монтируется в `/etc/telemt/` (read-write), что позволяет API атомарно обновлять config.toml > - По умолчанию публикуются порты 443:443, а контейнер запускается со сброшенными привилегиями (добавлена только `NET_BIND_SERVICE`) > - Если вам действительно нужна сеть хоста (обычно это требуется только для некоторых конфигураций IPv6), раскомментируйте `network_mode: host` diff --git a/src/api/http_utils.rs b/src/api/http_utils.rs index 9dfe526..7089d94 100644 --- a/src/api/http_utils.rs +++ b/src/api/http_utils.rs @@ -1,6 +1,7 @@ use http_body_util::{BodyExt, Full}; use hyper::StatusCode; use hyper::body::{Bytes, Incoming}; +use hyper::header::ALLOW; use serde::Serialize; use serde::de::DeserializeOwned; @@ -25,6 +26,8 @@ pub(super) fn success_response( } pub(super) fn error_response(request_id: u64, failure: ApiFailure) -> hyper::Response> { + let status = failure.status; + let allow = failure.allow; let payload = ErrorResponse { ok: false, error: ErrorBody { @@ -40,11 +43,13 @@ pub(super) fn error_response(request_id: u64, failure: ApiFailure) -> hyper::Res ) .into_bytes() }); - hyper::Response::builder() - .status(failure.status) - .header("content-type", "application/json; charset=utf-8") - .body(Full::new(Bytes::from(body))) - .unwrap() + let mut builder = hyper::Response::builder() + .status(status) + .header("content-type", "application/json; charset=utf-8"); + if let Some(allow) = allow { + builder = builder.header(ALLOW, allow); + } + builder.body(Full::new(Bytes::from(body))).unwrap() } pub(super) async fn read_json( diff --git a/src/api/mod.rs b/src/api/mod.rs index c61c59b..2e2ef6f 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -41,7 +41,9 @@ mod runtime_watch; mod runtime_zero; mod users; -use config_store::{current_revision, load_config_from_disk, parse_if_match}; +use config_store::{ + current_revision, ensure_expected_revision, load_config_from_disk, parse_if_match, +}; use events::ApiEventStore; use http_utils::{error_response, read_json, read_optional_json, success_response}; use model::{ @@ -75,6 +77,10 @@ use users::{ const API_MAX_CONTROL_CONNECTIONS: usize = 1024; const API_HTTP_CONNECTION_TIMEOUT: Duration = Duration::from_secs(15); const ROUTE_USERNAME_ERROR: &str = "username must match [A-Za-z0-9_.-] and be 1..64 chars"; +const ALLOW_GET: &str = "GET"; +const ALLOW_POST: &str = "POST"; +const ALLOW_GET_POST: &str = "GET, POST"; +const ALLOW_GET_PATCH_DELETE: &str = "GET, PATCH, DELETE"; pub(super) struct ApiRuntimeState { pub(super) process_started_at_epoch_secs: u64, @@ -125,6 +131,57 @@ fn parse_route_username(user: &str) -> Result<&str, ApiFailure> { } } +fn user_action_route_matches(path: &str, suffix: &str) -> bool { + path.strip_prefix("/v1/users/") + .and_then(|path| path.strip_suffix(suffix)) + .map(|user| !user.is_empty() && !user.contains('/')) + .unwrap_or(false) +} + +fn allowed_methods_for_path(path: &str) -> Option<&'static str> { + match path { + "/v1/health" + | "/v1/health/ready" + | "/v1/system/info" + | "/v1/runtime/gates" + | "/v1/runtime/initialization" + | "/v1/limits/effective" + | "/v1/security/posture" + | "/v1/security/whitelist" + | "/v1/stats/summary" + | "/v1/stats/zero/all" + | "/v1/stats/upstreams" + | "/v1/stats/minimal/all" + | "/v1/stats/me-writers" + | "/v1/stats/dcs" + | "/v1/runtime/me-pool-state" + | "/v1/runtime/me_pool_state" + | "/v1/runtime/me-quality" + | "/v1/runtime/me_quality" + | "/v1/runtime/upstream-quality" + | "/v1/runtime/upstream_quality" + | "/v1/runtime/nat-stun" + | "/v1/runtime/nat_stun" + | "/v1/runtime/me-selftest" + | "/v1/runtime/connections/summary" + | "/v1/runtime/events/recent" + | "/v1/stats/users/active-ips" + | "/v1/stats/users/quota" + | "/v1/stats/users" => Some(ALLOW_GET), + "/v1/users" => Some(ALLOW_GET_POST), + _ if user_action_route_matches(path, "/reset-quota") => Some(ALLOW_POST), + _ if user_action_route_matches(path, "/rotate-secret") => Some(ALLOW_POST), + _ if path + .strip_prefix("/v1/users/") + .map(|user| !user.is_empty() && !user.contains('/')) + .unwrap_or(false) => + { + Some(ALLOW_GET_PATCH_DELETE) + } + _ => None, + } +} + pub async fn serve( listen: SocketAddr, stats: Arc, @@ -435,22 +492,22 @@ async fn handle( let data = build_dcs_data(shared.as_ref(), api_cfg).await; Ok(success_response(StatusCode::OK, data, revision)) } - ("GET", "/v1/runtime/me_pool_state") => { + ("GET", "/v1/runtime/me-pool-state") | ("GET", "/v1/runtime/me_pool_state") => { let revision = current_revision(&shared.config_path).await?; let data = build_runtime_me_pool_state_data(shared.as_ref()).await; Ok(success_response(StatusCode::OK, data, revision)) } - ("GET", "/v1/runtime/me_quality") => { + ("GET", "/v1/runtime/me-quality") | ("GET", "/v1/runtime/me_quality") => { let revision = current_revision(&shared.config_path).await?; let data = build_runtime_me_quality_data(shared.as_ref()).await; Ok(success_response(StatusCode::OK, data, revision)) } - ("GET", "/v1/runtime/upstream_quality") => { + ("GET", "/v1/runtime/upstream-quality") | ("GET", "/v1/runtime/upstream_quality") => { let revision = current_revision(&shared.config_path).await?; let data = build_runtime_upstream_quality_data(shared.as_ref()).await; Ok(success_response(StatusCode::OK, data, revision)) } - ("GET", "/v1/runtime/nat_stun") => { + ("GET", "/v1/runtime/nat-stun") | ("GET", "/v1/runtime/nat_stun") => { let revision = current_revision(&shared.config_path).await?; let data = build_runtime_nat_stun_data(shared.as_ref()).await; Ok(success_response(StatusCode::OK, data, revision)) @@ -506,7 +563,7 @@ async fn handle( .await; Ok(success_response(StatusCode::OK, users, revision)) } - ("GET", "/v1/users/quota") => { + ("GET", "/v1/stats/users/quota") => { let revision = current_revision(&shared.config_path).await?; let disk_cfg = load_config_from_disk(&shared.config_path).await?; let data = build_user_quota_list(&disk_cfg, shared.stats.as_ref()); @@ -567,6 +624,16 @@ async fn handle( ), )); } + let expected_revision = parse_if_match(req.headers()); + let disk_cfg = load_config_from_disk(&shared.config_path).await?; + ensure_expected_revision(&shared.config_path, expected_revision.as_deref()) + .await?; + if !disk_cfg.access.users.contains_key(user) { + return Ok(error_response( + request_id, + ApiFailure::new(StatusCode::NOT_FOUND, "not_found", "User not found"), + )); + } let snapshot = match crate::quota_state::reset_user_quota( &shared.quota_state_path, shared.stats.as_ref(), @@ -761,16 +828,18 @@ async fn handle( if method == Method::POST { return Ok(error_response( request_id, - ApiFailure::new(StatusCode::NOT_FOUND, "not_found", "Route not found"), + ApiFailure::method_not_allowed(ALLOW_GET_PATCH_DELETE), )); } return Ok(error_response( request_id, - ApiFailure::new( - StatusCode::METHOD_NOT_ALLOWED, - "method_not_allowed", - "Unsupported HTTP method for this route", - ), + ApiFailure::method_not_allowed(ALLOW_GET_PATCH_DELETE), + )); + } + if let Some(allow) = allowed_methods_for_path(normalized_path) { + return Ok(error_response( + request_id, + ApiFailure::method_not_allowed(allow), )); } debug!( diff --git a/src/api/model.rs b/src/api/model.rs index 76758d9..56e8fea 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -15,6 +15,7 @@ pub(super) struct ApiFailure { pub(super) status: StatusCode, pub(super) code: &'static str, pub(super) message: String, + pub(super) allow: Option<&'static str>, } impl ApiFailure { @@ -23,6 +24,7 @@ impl ApiFailure { status, code, message: message.into(), + allow: None, } } @@ -33,6 +35,15 @@ impl ApiFailure { pub(super) fn bad_request(message: impl Into) -> Self { Self::new(StatusCode::BAD_REQUEST, "bad_request", message) } + + pub(super) fn method_not_allowed(allow: &'static str) -> Self { + Self { + status: StatusCode::METHOD_NOT_ALLOWED, + code: "method_not_allowed", + message: "Unsupported HTTP method for this route".to_string(), + allow: Some(allow), + } + } } #[derive(Serialize)] diff --git a/src/maestro/helpers.rs b/src/maestro/helpers.rs index 1c9b337..fa23e8f 100644 --- a/src/maestro/helpers.rs +++ b/src/maestro/helpers.rs @@ -1,6 +1,7 @@ #![allow(clippy::items_after_test_module)] use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use tokio::sync::watch; @@ -18,8 +19,27 @@ use crate::transport::middle_proxy::{ const MAESTRO_COLOR: &str = "\x1b[92m"; const COLOR_RESET: &str = "\x1b[0m"; +static MAESTRO_COLORS_ENABLED: AtomicBool = AtomicBool::new(true); + +/// Enables or disables ANSI color in direct MAESTRO status lines. +pub(crate) fn set_maestro_colors_enabled(enabled: bool) { + MAESTRO_COLORS_ENABLED.store(enabled, Ordering::Relaxed); +} + +fn format_maestro_line(message: impl AsRef, colors_enabled: bool) -> String { + if colors_enabled { + format!("{MAESTRO_COLOR}MAESTRO{COLOR_RESET}: {}", message.as_ref()) + } else { + format!("MAESTRO: {}", message.as_ref()) + } +} + +/// Prints a direct MAESTRO status line outside the tracing subscriber. pub(crate) fn print_maestro_line(message: impl AsRef) { - eprintln!("{MAESTRO_COLOR}MAESTRO{COLOR_RESET}: {}", message.as_ref()); + eprintln!( + "{}", + format_maestro_line(message, MAESTRO_COLORS_ENABLED.load(Ordering::Relaxed)) + ); } pub(crate) fn resolve_runtime_config_path( @@ -274,11 +294,24 @@ mod tests { use std::path::{Path, PathBuf}; use super::{ - expected_handshake_close_description, is_expected_handshake_eof, peer_close_description, - resolve_runtime_base_dir, resolve_runtime_config_path, + expected_handshake_close_description, format_maestro_line, is_expected_handshake_eof, + peer_close_description, resolve_runtime_base_dir, resolve_runtime_config_path, }; use crate::error::{ProxyError, StreamError}; + #[test] + fn maestro_line_formatter_respects_disabled_colors() { + let plain = format_maestro_line("boot", false); + assert_eq!(plain, "MAESTRO: boot"); + assert!(!plain.contains('\x1b')); + } + + #[test] + fn maestro_line_formatter_keeps_color_when_enabled() { + let colored = format_maestro_line("boot", true); + assert!(colored.contains("\x1b[92mMAESTRO\x1b[0m")); + } + #[test] fn resolve_runtime_config_path_anchors_relative_to_startup_cwd() { let nonce = std::time::SystemTime::now() diff --git a/src/maestro/mod.rs b/src/maestro/mod.rs index 834704f..e711ab4 100644 --- a/src/maestro/mod.rs +++ b/src/maestro/mod.rs @@ -49,6 +49,7 @@ use crate::transport::UpstreamManager; use crate::transport::middle_proxy::MePool; use helpers::{ parse_cli, print_maestro_line, resolve_runtime_base_dir, resolve_runtime_config_path, + set_maestro_colors_enabled, }; #[cfg(unix)] @@ -314,6 +315,7 @@ async fn run_telemt_core( eprintln!("[telemt] Invalid network.dns_overrides: {}", e); std::process::exit(1); } + set_maestro_colors_enabled(!config.general.disable_colors); startup_tracker .complete_component(COMPONENT_CONFIG_LOAD, Some("config is ready".to_string())) .await; diff --git a/src/protocol/tests/tls_security_tests.rs b/src/protocol/tests/tls_security_tests.rs index 1edece4..f8b1b2b 100644 --- a/src/protocol/tests/tls_security_tests.rs +++ b/src/protocol/tests/tls_security_tests.rs @@ -1385,6 +1385,7 @@ fn emulated_server_hello_never_places_alpn_in_server_hello_extensions() { false, true, ClientHelloTlsVersion::Tls13, + [0x13, 0x01], &rng, Some(b"h2".to_vec()), 0, @@ -1509,12 +1510,22 @@ fn test_validate_tls_handshake_format() { } fn build_client_hello_with_exts(exts: Vec<(u16, Vec)>, host: &str) -> Vec { + build_client_hello_with_ciphers_and_exts(&[[0x13, 0x01]], exts, host) +} + +fn build_client_hello_with_ciphers_and_exts( + cipher_suites: &[[u8; 2]], + exts: Vec<(u16, Vec)>, + host: &str, +) -> Vec { let mut body = Vec::new(); body.extend_from_slice(&TLS_VERSION); body.extend_from_slice(&[0u8; 32]); body.push(0); - body.extend_from_slice(&2u16.to_be_bytes()); - body.extend_from_slice(&[0x13, 0x01]); + body.extend_from_slice(&((cipher_suites.len() * 2) as u16).to_be_bytes()); + for suite in cipher_suites { + body.extend_from_slice(suite); + } body.push(1); body.push(0); @@ -1654,6 +1665,52 @@ fn detect_client_hello_tls_version_rejects_malformed_supported_versions() { assert!(detect_client_hello_tls_version(&ch).is_none()); } +#[test] +fn select_server_hello_cipher_suite_keeps_profile_cipher_when_offered() { + let ch = build_client_hello_with_ciphers_and_exts( + &[[0x13, 0x01], [0x13, 0x03]], + Vec::new(), + "example.com", + ); + assert_eq!( + select_server_hello_cipher_suite(&ch, [0x13, 0x03]), + [0x13, 0x03] + ); +} + +#[test] +fn select_server_hello_cipher_suite_ignores_profile_tls12_cipher() { + let ch = build_client_hello_with_ciphers_and_exts( + &[[0xc0, 0x2f], [0x13, 0x03]], + Vec::new(), + "example.com", + ); + assert_eq!( + select_server_hello_cipher_suite(&ch, [0xc0, 0x2f]), + [0x13, 0x03] + ); +} + +#[test] +fn select_server_hello_cipher_suite_falls_back_to_offered_tls13_suite() { + let ch = build_client_hello_with_ciphers_and_exts(&[[0x13, 0x03]], Vec::new(), "example.com"); + assert_eq!( + select_server_hello_cipher_suite(&ch, [0x13, 0x01]), + [0x13, 0x03] + ); +} + +#[test] +fn select_server_hello_cipher_suite_keeps_preferred_for_malformed_clienthello() { + let mut ch = + build_client_hello_with_ciphers_and_exts(&[[0x13, 0x03]], Vec::new(), "example.com"); + ch.truncate(12); + assert_eq!( + select_server_hello_cipher_suite(&ch, [0x13, 0x01]), + [0x13, 0x01] + ); +} + #[test] fn extract_sni_rejects_zero_length_host_name() { let mut sni_ext = Vec::new(); @@ -2179,7 +2236,7 @@ fn light_fuzz_boot_time_timestamp_matrix_with_short_replay_window_obeys_boot_cap } #[test] -fn server_hello_application_data_contains_alpn_marker_when_selected() { +fn server_hello_application_data_omits_alpn_marker_when_selected() { let secret = b"alpn_marker_test"; let client_digest = [0x55u8; TLS_DIGEST_LEN]; let session_id = vec![0xAB; 32]; @@ -2206,8 +2263,8 @@ fn server_hello_application_data_contains_alpn_marker_when_selected() { assert!( app_payload .windows(expected.len()) - .any(|window| window == expected), - "first application payload must carry ALPN marker for selected protocol" + .all(|window| window != expected), + "first application payload must not expose plaintext ALPN marker bytes" ); } @@ -2303,14 +2360,14 @@ fn server_hello_ignores_oversized_alpn_when_marker_would_not_fit() { } #[test] -fn server_hello_embeds_full_alpn_marker_when_it_exactly_fits_fake_cert_len() { +fn server_hello_omits_alpn_marker_even_when_it_would_fit_fake_cert_len() { let secret = b"alpn_exact_fit_test"; let client_digest = [0x58u8; TLS_DIGEST_LEN]; let session_id = vec![0xA5; 32]; let rng = crate::crypto::SecureRandom::new(); let proto = vec![b'z'; 57]; - // marker_len = 4 + (2 + (1 + proto_len)) = 7 + proto_len = 64 + // marker_len = 4 + (2 + (1 + proto_len)) = 7 + proto_len = 64. let response = build_server_hello( secret, &client_digest, @@ -2336,7 +2393,7 @@ fn server_hello_embeds_full_alpn_marker_when_it_exactly_fits_fake_cert_len() { expected_marker.extend_from_slice(&proto); assert_eq!(app_payload.len(), expected_marker.len()); - assert_eq!(app_payload, expected_marker.as_slice()); + assert_ne!(app_payload, expected_marker.as_slice()); } #[test] diff --git a/src/protocol/tls.rs b/src/protocol/tls.rs index 5c18135..19cb3aa 100644 --- a/src/protocol/tls.rs +++ b/src/protocol/tls.rs @@ -105,6 +105,8 @@ mod extension_type { /// TLS Cipher Suites mod cipher_suite { pub const TLS_AES_128_GCM_SHA256: [u8; 2] = [0x13, 0x01]; + pub const TLS_AES_256_GCM_SHA384: [u8; 2] = [0x13, 0x02]; + pub const TLS_CHACHA20_POLY1305_SHA256: [u8; 2] = [0x13, 0x03]; } /// TLS Named Curves @@ -241,6 +243,13 @@ impl ServerHelloBuilder { self } + fn with_cipher_suite(mut self, cipher_suite: [u8; 2]) -> Self { + if cipher_suite != [0, 0] { + self.cipher_suite = cipher_suite; + } + self + } + /// Build ServerHello message (without record header) fn build_message(&self) -> Vec { let Ok(session_id_len) = u8::try_from(self.session_id.len()) else { @@ -520,6 +529,33 @@ pub fn build_server_hello( rng: &SecureRandom, alpn: Option>, new_session_tickets: u8, +) -> Vec { + build_server_hello_with_cipher( + secret, + client_digest, + session_id, + fake_cert_len, + rng, + cipher_suite::TLS_AES_128_GCM_SHA256, + alpn, + new_session_tickets, + ) +} + +/// Build TLS ServerHello response with a caller-selected cipher suite. +/// +/// The caller is responsible for selecting a suite that is compatible with the +/// already-authenticated ClientHello. Keeping the selection outside this +/// builder avoids extra ClientHello parsing in the response construction path. +pub(crate) fn build_server_hello_with_cipher( + secret: &[u8], + client_digest: &[u8; TLS_DIGEST_LEN], + session_id: &[u8], + fake_cert_len: usize, + rng: &SecureRandom, + selected_cipher_suite: [u8; 2], + alpn: Option>, + new_session_tickets: u8, ) -> Vec { const MIN_APP_DATA: usize = 64; const MAX_APP_DATA: usize = MAX_TLS_CIPHERTEXT_SIZE; @@ -528,6 +564,7 @@ pub fn build_server_hello( // Build ServerHello let server_hello = ServerHelloBuilder::new(session_id.to_vec()) + .with_cipher_suite(selected_cipher_suite) .with_x25519_key(&x25519_key) .with_tls13_version() .build_record(); @@ -538,28 +575,14 @@ pub fn build_server_hello( TLS_VERSION[0], TLS_VERSION[1], 0x00, - 0x01, // length = 1 - 0x01, // CCS byte + 0x01, + 0x01, ]; // Build first encrypted flight mimic as opaque ApplicationData bytes. - // Embed a compact EncryptedExtensions-like ALPN block when selected. + // ALPN belongs inside encrypted EncryptedExtensions in real TLS 1.3. let mut fake_cert = Vec::with_capacity(fake_cert_len); - if let Some(proto) = alpn - .as_ref() - .filter(|p| !p.is_empty() && p.len() <= u8::MAX as usize) - { - let proto_list_len = 1usize + proto.len(); - let ext_data_len = 2usize + proto_list_len; - let marker_len = 4usize + ext_data_len; - if marker_len <= fake_cert_len { - fake_cert.extend_from_slice(&0x0010u16.to_be_bytes()); - fake_cert.extend_from_slice(&(ext_data_len as u16).to_be_bytes()); - fake_cert.extend_from_slice(&(proto_list_len as u16).to_be_bytes()); - fake_cert.push(proto.len() as u8); - fake_cert.extend_from_slice(proto); - } - } + let _ = alpn; if fake_cert.len() < fake_cert_len { fake_cert.extend_from_slice(&rng.bytes(fake_cert_len - fake_cert.len())); } else if fake_cert.len() > fake_cert_len { @@ -580,7 +603,7 @@ pub fn build_server_hello( let ticket_count = new_session_tickets.min(4); if ticket_count > 0 { for _ in 0..ticket_count { - let ticket_len: usize = rng.range(48) + 48; // 48-95 bytes + let ticket_len: usize = rng.range(48) + 48; let mut record = Vec::with_capacity(5 + ticket_len); record.push(TLS_RECORD_APPLICATION); record.extend_from_slice(&TLS_VERSION); @@ -927,6 +950,112 @@ pub fn detect_client_hello_tls_version(handshake: &[u8]) -> Option Option<(usize, usize)> { + if handshake.len() < 5 || handshake[0] != TLS_RECORD_HANDSHAKE { + return None; + } + + let record_len = u16::from_be_bytes([handshake[3], handshake[4]]) as usize; + let record_end = 5usize.checked_add(record_len)?; + if record_end > handshake.len() { + return None; + } + + let mut pos = 5; + if handshake.get(pos) != Some(&0x01) { + return None; + } + pos += 1; + + if pos + 3 > record_end { + return None; + } + let handshake_len = ((handshake[pos] as usize) << 16) + | ((handshake[pos + 1] as usize) << 8) + | handshake[pos + 2] as usize; + pos += 3; + let handshake_end = pos.checked_add(handshake_len)?; + if handshake_end > record_end { + return None; + } + + if pos + 2 + 32 > handshake_end { + return None; + } + pos += 2 + 32; + + let session_id_len = *handshake.get(pos)? as usize; + pos = pos.checked_add(1)?.checked_add(session_id_len)?; + if pos + 2 > handshake_end { + return None; + } + + let cipher_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize; + if cipher_len == 0 || cipher_len % 2 != 0 { + return None; + } + pos += 2; + let cipher_end = pos.checked_add(cipher_len)?; + if cipher_end > handshake_end { + return None; + } + + Some((pos, cipher_end)) +} + +fn client_hello_offers_cipher_suite( + handshake: &[u8], + range: (usize, usize), + suite: [u8; 2], +) -> bool { + let mut pos = range.0; + while pos + 1 < range.1 { + if handshake[pos] == suite[0] && handshake[pos + 1] == suite[1] { + return true; + } + pos += 2; + } + false +} + +fn is_tls13_cipher_suite(suite: [u8; 2]) -> bool { + suite == cipher_suite::TLS_AES_128_GCM_SHA256 + || suite == cipher_suite::TLS_AES_256_GCM_SHA384 + || suite == cipher_suite::TLS_CHACHA20_POLY1305_SHA256 +} + +/// Select the ServerHello cipher suite from the already-received ClientHello. +/// +/// This is intentionally a borrowed, zero-allocation scan. It runs only for an +/// authenticated success response and keeps malformed or unexpected ClientHello +/// shapes on the previous fallback behavior. +pub(crate) fn select_server_hello_cipher_suite(handshake: &[u8], preferred: [u8; 2]) -> [u8; 2] { + let preferred = if is_tls13_cipher_suite(preferred) { + preferred + } else { + cipher_suite::TLS_AES_128_GCM_SHA256 + }; + let Some(range) = client_hello_cipher_suites_range(handshake) else { + return preferred; + }; + + if client_hello_offers_cipher_suite(handshake, range, preferred) { + return preferred; + } + + for fallback in [ + cipher_suite::TLS_AES_128_GCM_SHA256, + cipher_suite::TLS_CHACHA20_POLY1305_SHA256, + cipher_suite::TLS_AES_256_GCM_SHA384, + ] { + if client_hello_offers_cipher_suite(handshake, range, fallback) { + return fallback; + } + } + + preferred +} + /// Check if bytes look like a TLS ClientHello pub fn is_tls_handshake(first_bytes: &[u8]) -> bool { if first_bytes.len() < 3 { diff --git a/src/proxy/handshake.rs b/src/proxy/handshake.rs index 15b04de..a765575 100644 --- a/src/proxy/handshake.rs +++ b/src/proxy/handshake.rs @@ -1504,6 +1504,13 @@ where let validation_session_id_slice = &validation_session_id[..validation_session_id_len]; let response = if let Some((cached_entry, use_full_cert_payload)) = cached { + let preferred_cipher_suite = if cached_entry.server_hello_template.cipher_suite == [0, 0] { + [0x13, 0x01] + } else { + cached_entry.server_hello_template.cipher_suite + }; + let selected_cipher_suite = + tls::select_server_hello_cipher_suite(handshake, preferred_cipher_suite); emulator::build_emulated_server_hello( &validated_secret, &validation_digest, @@ -1512,17 +1519,20 @@ where use_full_cert_payload, config.censorship.serverhello_compact, client_tls_version, + selected_cipher_suite, rng, selected_alpn.clone(), config.censorship.tls_new_session_tickets, ) } else { - tls::build_server_hello( + let selected_cipher_suite = tls::select_server_hello_cipher_suite(handshake, [0x13, 0x01]); + tls::build_server_hello_with_cipher( &validated_secret, &validation_digest, validation_session_id_slice, config.censorship.fake_cert_len, rng, + selected_cipher_suite, selected_alpn.clone(), config.censorship.tls_new_session_tickets, ) diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 3f7c31c..2c61c86 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -1,6 +1,6 @@ +use std::collections::BTreeSet; #[cfg(test)] use std::collections::hash_map::DefaultHasher; -use std::collections::{BTreeSet, HashMap}; #[cfg(test)] use std::future::Future; #[cfg(test)] diff --git a/src/proxy/middle_relay/idle.rs b/src/proxy/middle_relay/idle.rs index 3a33869..0e8ff12 100644 --- a/src/proxy/middle_relay/idle.rs +++ b/src/proxy/middle_relay/idle.rs @@ -1,4 +1,5 @@ use super::*; +use dashmap::DashMap; mod read; @@ -10,10 +11,10 @@ pub(crate) use self::read::{ #[derive(Default)] pub(crate) struct RelayIdleCandidateRegistry { - pub(in crate::proxy::middle_relay) by_conn_id: HashMap, - pub(in crate::proxy::middle_relay) ordered: BTreeSet<(u64, u64)>, - pressure_event_seq: u64, - pressure_consumed_seq: u64, + pub(in crate::proxy::middle_relay) by_conn_id: DashMap, + pub(in crate::proxy::middle_relay) ordered: parking_lot::Mutex>, + pressure_event_seq: AtomicU64, + pressure_consumed_seq: AtomicU64, } /// Queue metadata used to preserve FIFO ordering for idle relay eviction. @@ -23,25 +24,10 @@ pub(in crate::proxy::middle_relay) struct RelayIdleCandidateMeta { pub(in crate::proxy::middle_relay) mark_pressure_seq: u64, } -pub(super) fn relay_idle_candidate_registry_lock_in( - shared: &ProxySharedState, -) -> std::sync::MutexGuard<'_, RelayIdleCandidateRegistry> { - let registry = &shared.middle_relay.relay_idle_registry; - match registry.lock() { - Ok(guard) => guard, - Err(poisoned) => { - let mut guard = poisoned.into_inner(); - *guard = RelayIdleCandidateRegistry::default(); - registry.clear_poison(); - guard - } - } -} - pub(super) fn mark_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u64) -> bool { - let mut guard = relay_idle_candidate_registry_lock_in(shared); + let registry = &shared.middle_relay.relay_idle_registry; - if guard.by_conn_id.contains_key(&conn_id) { + if registry.by_conn_id.contains_key(&conn_id) { return false; } @@ -52,24 +38,38 @@ pub(super) fn mark_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u .saturating_add(1); let meta = RelayIdleCandidateMeta { mark_order_seq, - mark_pressure_seq: guard.pressure_event_seq, + mark_pressure_seq: registry.pressure_event_seq.load(Ordering::Relaxed), }; - guard.by_conn_id.insert(conn_id, meta); - guard.ordered.insert((meta.mark_order_seq, conn_id)); - true + match registry.by_conn_id.entry(conn_id) { + dashmap::mapref::entry::Entry::Occupied(_) => false, + dashmap::mapref::entry::Entry::Vacant(entry) => { + entry.insert(meta); + registry + .ordered + .lock() + .insert((meta.mark_order_seq, conn_id)); + true + } + } } pub(super) fn clear_relay_idle_candidate_in(shared: &ProxySharedState, conn_id: u64) { - let mut guard = relay_idle_candidate_registry_lock_in(shared); + let registry = &shared.middle_relay.relay_idle_registry; - if let Some(meta) = guard.by_conn_id.remove(&conn_id) { - guard.ordered.remove(&(meta.mark_order_seq, conn_id)); + if let Some((_, meta)) = registry.by_conn_id.remove(&conn_id) { + registry + .ordered + .lock() + .remove(&(meta.mark_order_seq, conn_id)); } } pub(super) fn note_relay_pressure_event_in(shared: &ProxySharedState) { - let mut guard = relay_idle_candidate_registry_lock_in(shared); - guard.pressure_event_seq = guard.pressure_event_seq.wrapping_add(1); + shared + .middle_relay + .relay_idle_registry + .pressure_event_seq + .fetch_add(1, Ordering::Relaxed); } pub(crate) fn note_global_relay_pressure(shared: &ProxySharedState) { @@ -77,8 +77,11 @@ pub(crate) fn note_global_relay_pressure(shared: &ProxySharedState) { } pub(super) fn relay_pressure_event_seq_in(shared: &ProxySharedState) -> u64 { - let guard = relay_idle_candidate_registry_lock_in(shared); - guard.pressure_event_seq + shared + .middle_relay + .relay_idle_registry + .pressure_event_seq + .load(Ordering::Relaxed) } pub(super) fn maybe_evict_idle_candidate_on_pressure_in( @@ -87,33 +90,52 @@ pub(super) fn maybe_evict_idle_candidate_on_pressure_in( seen_pressure_seq: &mut u64, stats: &Stats, ) -> bool { - let mut guard = relay_idle_candidate_registry_lock_in(shared); + let registry = &shared.middle_relay.relay_idle_registry; - let latest_pressure_seq = guard.pressure_event_seq; + let latest_pressure_seq = registry.pressure_event_seq.load(Ordering::Relaxed); if latest_pressure_seq == *seen_pressure_seq { return false; } *seen_pressure_seq = latest_pressure_seq; - if latest_pressure_seq == guard.pressure_consumed_seq { + let consumed_pressure_seq = registry.pressure_consumed_seq.load(Ordering::Relaxed); + if latest_pressure_seq == consumed_pressure_seq { return false; } - if guard.ordered.is_empty() { - guard.pressure_consumed_seq = latest_pressure_seq; - return false; - } - - let oldest = guard - .ordered - .iter() - .next() - .map(|(_, candidate_conn_id)| *candidate_conn_id); + let oldest = { + let mut ordered = registry.ordered.lock(); + loop { + let Some((mark_order_seq, candidate_conn_id)) = ordered.iter().next().copied() else { + // Empty queues consume the event so later candidates cannot replay stale pressure. + let _ = registry.pressure_consumed_seq.compare_exchange( + consumed_pressure_seq, + latest_pressure_seq, + Ordering::Relaxed, + Ordering::Relaxed, + ); + return false; + }; + let Some(candidate_meta) = registry.by_conn_id.get(&candidate_conn_id) else { + ordered.remove(&(mark_order_seq, candidate_conn_id)); + continue; + }; + if candidate_meta.mark_order_seq != mark_order_seq { + ordered.remove(&(mark_order_seq, candidate_conn_id)); + continue; + } + break Some(candidate_conn_id); + } + }; if oldest != Some(conn_id) { return false; } - let Some(candidate_meta) = guard.by_conn_id.get(&conn_id).copied() else { + let Some(candidate_meta) = registry + .by_conn_id + .get(&conn_id) + .map(|entry| *entry.value()) + else { return false; }; @@ -121,10 +143,27 @@ pub(super) fn maybe_evict_idle_candidate_on_pressure_in( return false; } - if let Some(meta) = guard.by_conn_id.remove(&conn_id) { - guard.ordered.remove(&(meta.mark_order_seq, conn_id)); + // Claim the global pressure budget before removal; otherwise racing sessions + // can observe the next FIFO item and spend the same event more than once. + if registry + .pressure_consumed_seq + .compare_exchange( + consumed_pressure_seq, + latest_pressure_seq, + Ordering::Relaxed, + Ordering::Relaxed, + ) + .is_err() + { + return false; + } + + if let Some((_, meta)) = registry.by_conn_id.remove(&conn_id) { + registry + .ordered + .lock() + .remove(&(meta.mark_order_seq, conn_id)); } - guard.pressure_consumed_seq = latest_pressure_seq; stats.increment_relay_pressure_evict_total(); true } @@ -220,72 +259,32 @@ pub(crate) fn mark_relay_idle_candidate_for_testing( shared: &ProxySharedState, conn_id: u64, ) -> bool { - let registry = &shared.middle_relay.relay_idle_registry; - let mut guard = match registry.lock() { - Ok(guard) => guard, - Err(poisoned) => { - let mut guard = poisoned.into_inner(); - *guard = RelayIdleCandidateRegistry::default(); - registry.clear_poison(); - guard - } - }; - - if guard.by_conn_id.contains_key(&conn_id) { - return false; - } - - let mark_order_seq = shared - .middle_relay - .relay_idle_mark_seq - .fetch_add(1, Ordering::Relaxed); - let mark_pressure_seq = guard.pressure_event_seq; - let meta = RelayIdleCandidateMeta { - mark_order_seq, - mark_pressure_seq, - }; - guard.by_conn_id.insert(conn_id, meta); - guard.ordered.insert((mark_order_seq, conn_id)); - true + mark_relay_idle_candidate_in(shared, conn_id) } #[cfg(test)] pub(crate) fn oldest_relay_idle_candidate_for_testing(shared: &ProxySharedState) -> Option { let registry = &shared.middle_relay.relay_idle_registry; - let guard = match registry.lock() { - Ok(guard) => guard, - Err(poisoned) => { - let mut guard = poisoned.into_inner(); - *guard = RelayIdleCandidateRegistry::default(); - registry.clear_poison(); - guard - } - }; - guard.ordered.iter().next().map(|(_, conn_id)| *conn_id) + registry + .ordered + .lock() + .iter() + .next() + .map(|(_, conn_id)| *conn_id) } #[cfg(test)] pub(crate) fn clear_relay_idle_candidate_for_testing(shared: &ProxySharedState, conn_id: u64) { - let registry = &shared.middle_relay.relay_idle_registry; - let mut guard = match registry.lock() { - Ok(guard) => guard, - Err(poisoned) => { - let mut guard = poisoned.into_inner(); - *guard = RelayIdleCandidateRegistry::default(); - registry.clear_poison(); - guard - } - }; - if let Some(meta) = guard.by_conn_id.remove(&conn_id) { - guard.ordered.remove(&(meta.mark_order_seq, conn_id)); - } + clear_relay_idle_candidate_in(shared, conn_id); } #[cfg(test)] pub(crate) fn clear_relay_idle_pressure_state_for_testing_in_shared(shared: &ProxySharedState) { - if let Ok(mut guard) = shared.middle_relay.relay_idle_registry.lock() { - *guard = RelayIdleCandidateRegistry::default(); - } + let registry = &shared.middle_relay.relay_idle_registry; + registry.by_conn_id.clear(); + registry.ordered.lock().clear(); + registry.pressure_event_seq.store(0, Ordering::Relaxed); + registry.pressure_consumed_seq.store(0, Ordering::Relaxed); shared .middle_relay .relay_idle_mark_seq @@ -327,15 +326,10 @@ pub(crate) fn set_relay_pressure_state_for_testing( pressure_consumed_seq: u64, ) { let registry = &shared.middle_relay.relay_idle_registry; - let mut guard = match registry.lock() { - Ok(guard) => guard, - Err(poisoned) => { - let mut guard = poisoned.into_inner(); - *guard = RelayIdleCandidateRegistry::default(); - registry.clear_poison(); - guard - } - }; - guard.pressure_event_seq = pressure_event_seq; - guard.pressure_consumed_seq = pressure_consumed_seq; + registry + .pressure_event_seq + .store(pressure_event_seq, Ordering::Relaxed); + registry + .pressure_consumed_seq + .store(pressure_consumed_seq, Ordering::Relaxed); } diff --git a/src/proxy/middle_relay/quota.rs b/src/proxy/middle_relay/quota.rs index 618056c..3a04c00 100644 --- a/src/proxy/middle_relay/quota.rs +++ b/src/proxy/middle_relay/quota.rs @@ -41,11 +41,12 @@ pub(super) async fn reserve_user_quota_with_yield( return Err(MiddleQuotaReserveError::DeadlineExceeded); } tokio::select! { - _ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {} + biased; _ = cancel.cancelled() => { stats.increment_quota_acquire_cancelled_total(); return Err(MiddleQuotaReserveError::Cancelled); } + _ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {} } backoff_rounds = backoff_rounds.saturating_add(1); if backoff_rounds >= QUOTA_RESERVE_MAX_BACKOFF_ROUNDS { @@ -128,11 +129,12 @@ pub(super) async fn wait_for_traffic_budget_or_cancel( return Err(ProxyError::TrafficBudgetWaitDeadlineExceeded); } tokio::select! { - _ = tokio::time::sleep(next_refill_delay()) => {} + biased; _ = cancel.cancelled() => { stats.increment_flow_wait_middle_rate_limit_cancelled_total(); return Err(ProxyError::TrafficBudgetWaitCancelled); } + _ = tokio::time::sleep(next_refill_delay()) => {} } let wait_ms = wait_started_at .elapsed() diff --git a/src/proxy/shared_state.rs b/src/proxy/shared_state.rs index e204890..11e390e 100644 --- a/src/proxy/shared_state.rs +++ b/src/proxy/shared_state.rs @@ -59,7 +59,7 @@ pub(crate) struct MiddleRelaySharedState { pub(crate) desync_hasher: RandomState, pub(crate) desync_full_cache_last_emit_at: Mutex>, pub(crate) desync_dedup_rotation_state: Mutex, - pub(crate) relay_idle_registry: Mutex, + pub(crate) relay_idle_registry: RelayIdleCandidateRegistry, pub(crate) relay_idle_mark_seq: AtomicU64, } @@ -97,7 +97,7 @@ impl ProxySharedState { desync_hasher: RandomState::new(), desync_full_cache_last_emit_at: Mutex::new(None), desync_dedup_rotation_state: Mutex::new(DesyncDedupRotationState::default()), - relay_idle_registry: Mutex::new(RelayIdleCandidateRegistry::default()), + relay_idle_registry: RelayIdleCandidateRegistry::default(), relay_idle_mark_seq: AtomicU64::new(0), }, traffic_limiter: TrafficLimiter::new(), diff --git a/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs b/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs index 4f57f56..107bb04 100644 --- a/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs +++ b/src/proxy/tests/middle_relay_idle_registry_poison_security_tests.rs @@ -1,33 +1,21 @@ use super::*; -use std::panic::{AssertUnwindSafe, catch_unwind}; #[test] -fn blackhat_registry_poison_recovers_with_fail_closed_reset_and_pressure_accounting() { +fn blackhat_registry_stale_order_entry_is_skipped_and_pressure_accounting_continues() { let shared = ProxySharedState::new(); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); - let _ = catch_unwind(AssertUnwindSafe(|| { - let mut guard = shared - .middle_relay - .relay_idle_registry - .lock() - .expect("registry lock must be acquired before poison"); - guard.by_conn_id.insert( - 999, - RelayIdleCandidateMeta { - mark_order_seq: 1, - mark_pressure_seq: 0, - }, - ); - guard.ordered.insert((1, 999)); - panic!("intentional poison for idle-registry recovery"); - })); + shared + .middle_relay + .relay_idle_registry + .ordered + .lock() + .insert((0, 999)); - // Helper lock must recover from poison, reset stale state, and continue. assert!(mark_relay_idle_candidate_for_testing(shared.as_ref(), 42)); assert_eq!( oldest_relay_idle_candidate_for_testing(shared.as_ref()), - Some(42) + Some(999) ); let before = relay_pressure_event_seq_for_testing(shared.as_ref()); @@ -35,25 +23,43 @@ fn blackhat_registry_poison_recovers_with_fail_closed_reset_and_pressure_account let after = relay_pressure_event_seq_for_testing(shared.as_ref()); assert!( after > before, - "pressure accounting must still advance after poison" + "pressure accounting must still advance with stale ordered entries" + ); + + let mut seen_pressure_seq = before; + assert!(maybe_evict_idle_candidate_on_pressure_for_testing( + shared.as_ref(), + 42, + &mut seen_pressure_seq, + &Stats::new() + )); + assert_eq!( + oldest_relay_idle_candidate_for_testing(shared.as_ref()), + None ); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); } #[test] -fn clear_state_helper_must_reset_poisoned_registry_for_deterministic_fifo_tests() { +fn clear_state_helper_must_reset_split_registry_for_deterministic_fifo_tests() { let shared = ProxySharedState::new(); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); - let _ = catch_unwind(AssertUnwindSafe(|| { - let _guard = shared - .middle_relay - .relay_idle_registry - .lock() - .expect("registry lock must be acquired before poison"); - panic!("intentional poison while lock held"); - })); + shared.middle_relay.relay_idle_registry.by_conn_id.insert( + 999, + RelayIdleCandidateMeta { + mark_order_seq: 1, + mark_pressure_seq: 0, + }, + ); + shared + .middle_relay + .relay_idle_registry + .ordered + .lock() + .insert((1, 999)); + set_relay_pressure_state_for_testing(shared.as_ref(), 7, 6); clear_relay_idle_pressure_state_for_testing_in_shared(shared.as_ref()); diff --git a/src/tls_front/emulator.rs b/src/tls_front/emulator.rs index 609aeaf..5bf307c 100644 --- a/src/tls_front/emulator.rs +++ b/src/tls_front/emulator.rs @@ -8,12 +8,17 @@ use crate::protocol::constants::{ use crate::protocol::tls::{ ClientHelloTlsVersion, TLS_DIGEST_LEN, TLS_DIGEST_POS, gen_fake_x25519_key, }; -use crate::tls_front::types::{CachedTlsData, ParsedCertificateInfo, TlsProfileSource}; +use crate::tls_front::types::{ + CachedTlsData, ParsedCertificateInfo, TlsExtension, TlsProfileSource, +}; use crc32fast::Hasher; const MIN_APP_DATA: usize = 64; const MAX_APP_DATA: usize = MAX_TLS_CIPHERTEXT_SIZE; const MAX_TICKET_RECORDS: usize = 4; +const EXT_SUPPORTED_VERSIONS: u16 = 0x002b; +const EXT_KEY_SHARE: u16 = 0x0033; +const EXT_ALPN: u16 = 0x0010; fn jitter_and_clamp_sizes(sizes: &[usize], rng: &SecureRandom) -> Vec { sizes @@ -185,6 +190,74 @@ fn hash_compact_cert_info_payload(cert_payload: Vec) -> Option> { Some(hashed) } +fn push_supported_versions_extension(extensions: &mut Vec) { + extensions.extend_from_slice(&EXT_SUPPORTED_VERSIONS.to_be_bytes()); + extensions.extend_from_slice(&(2u16).to_be_bytes()); + extensions.extend_from_slice(&0x0304u16.to_be_bytes()); +} + +fn push_key_share_extension(extensions: &mut Vec, rng: &SecureRandom) { + let key = gen_fake_x25519_key(rng); + extensions.extend_from_slice(&EXT_KEY_SHARE.to_be_bytes()); + extensions.extend_from_slice(&(2 + 2 + 32u16).to_be_bytes()); + extensions.extend_from_slice(&0x001du16.to_be_bytes()); + extensions.extend_from_slice(&(32u16).to_be_bytes()); + extensions.extend_from_slice(&key); +} + +fn replay_profiled_server_hello_extension( + ext: &TlsExtension, + extensions: &mut Vec, + rng: &SecureRandom, + saw_supported_versions: &mut bool, + saw_key_share: &mut bool, +) { + match ext.ext_type { + EXT_SUPPORTED_VERSIONS if !*saw_supported_versions => { + push_supported_versions_extension(extensions); + *saw_supported_versions = true; + } + EXT_KEY_SHARE if !*saw_key_share => { + push_key_share_extension(extensions, rng); + *saw_key_share = true; + } + EXT_ALPN => {} + _ => {} + } +} + +fn build_profiled_server_hello_extensions(cached: &CachedTlsData, rng: &SecureRandom) -> Vec { + let capacity = cached + .server_hello_template + .extensions + .iter() + .map(|ext| 4 + ext.data.len()) + .sum::() + .max(44); + let mut extensions = Vec::with_capacity(capacity); + let mut saw_supported_versions = false; + let mut saw_key_share = false; + + for ext in &cached.server_hello_template.extensions { + replay_profiled_server_hello_extension( + ext, + &mut extensions, + rng, + &mut saw_supported_versions, + &mut saw_key_share, + ); + } + + if !saw_key_share { + push_key_share_extension(&mut extensions, rng); + } + if !saw_supported_versions { + push_supported_versions_extension(&mut extensions); + } + + extensions +} + /// Build a ServerHello + CCS + ApplicationData sequence using cached TLS metadata. pub fn build_emulated_server_hello( secret: &[u8], @@ -194,39 +267,28 @@ pub fn build_emulated_server_hello( use_full_cert_payload: bool, serverhello_compact: bool, client_tls_version: ClientHelloTlsVersion, + selected_cipher_suite: [u8; 2], rng: &SecureRandom, alpn: Option>, new_session_tickets: u8, ) -> Vec { // --- ServerHello --- - let mut extensions = Vec::new(); - let key = gen_fake_x25519_key(rng); - extensions.extend_from_slice(&0x0033u16.to_be_bytes()); - extensions.extend_from_slice(&(2 + 2 + 32u16).to_be_bytes()); - extensions.extend_from_slice(&0x001du16.to_be_bytes()); - extensions.extend_from_slice(&(32u16).to_be_bytes()); - extensions.extend_from_slice(&key); - extensions.extend_from_slice(&0x002bu16.to_be_bytes()); - extensions.extend_from_slice(&(2u16).to_be_bytes()); - extensions.extend_from_slice(&0x0304u16.to_be_bytes()); + let extensions = build_profiled_server_hello_extensions(cached, rng); let extensions_len = extensions.len() as u16; - let body_len = 2 + // version - 32 + // random - 1 + session_id.len() + // session id - 2 + // cipher - 1 + // compression - 2 + extensions.len(); // extensions + let body_len = 2 + 32 + 1 + session_id.len() + 2 + 1 + 2 + extensions.len(); let mut message = Vec::with_capacity(4 + body_len); - message.push(0x02); // ServerHello + message.push(0x02); let len_bytes = (body_len as u32).to_be_bytes(); message.extend_from_slice(&len_bytes[1..4]); - message.extend_from_slice(&cached.server_hello_template.version); // 0x0303 - message.extend_from_slice(&[0u8; 32]); // random placeholder + message.extend_from_slice(&cached.server_hello_template.version); + message.extend_from_slice(&[0u8; 32]); message.push(session_id.len() as u8); message.extend_from_slice(session_id); - let cipher = if cached.server_hello_template.cipher_suite == [0, 0] { + let cipher = if selected_cipher_suite != [0, 0] { + selected_cipher_suite + } else if cached.server_hello_template.cipher_suite == [0, 0] { [0x13, 0x01] } else { cached.server_hello_template.cipher_suite @@ -303,21 +365,10 @@ pub fn build_emulated_server_hello( } let mut app_data = Vec::new(); - let alpn_marker = alpn - .as_ref() - .filter(|p| !p.is_empty() && p.len() <= u8::MAX as usize) - .map(|proto| { - let proto_list_len = 1usize + proto.len(); - let ext_data_len = 2usize + proto_list_len; - let mut marker = Vec::with_capacity(4 + ext_data_len); - marker.extend_from_slice(&0x0010u16.to_be_bytes()); - marker.extend_from_slice(&(ext_data_len as u16).to_be_bytes()); - marker.extend_from_slice(&(proto_list_len as u16).to_be_bytes()); - marker.push(proto.len() as u8); - marker.extend_from_slice(proto); - marker - }); - for (idx, size) in sizes.into_iter().enumerate() { + // ALPN selection is encrypted inside EncryptedExtensions in real TLS 1.3. + // Keeping the FakeTLS record body opaque avoids a stable plaintext marker. + let _ = alpn; + for size in sizes { let mut rec = Vec::with_capacity(5 + size); rec.push(TLS_RECORD_APPLICATION); rec.extend_from_slice(&TLS_VERSION); @@ -334,31 +385,18 @@ pub fn build_emulated_server_hello( if body_len > copy_len { rec.extend_from_slice(&rng.bytes(body_len - copy_len)); } - rec.push(0x16); // inner content type marker (handshake) - rec.extend_from_slice(&rng.bytes(16)); // AEAD-like tag + rec.push(0x16); + rec.extend_from_slice(&rng.bytes(16)); } else { rec.extend_from_slice(&rng.bytes(size)); } } else if size > 17 { let body_len = size - 17; let mut body = Vec::with_capacity(body_len); - if idx == 0 - && let Some(marker) = &alpn_marker - { - if marker.len() <= body_len { - body.extend_from_slice(marker); - if body_len > marker.len() { - body.extend_from_slice(&rng.bytes(body_len - marker.len())); - } - } else { - body.extend_from_slice(&rng.bytes(body_len)); - } - } else { - body.extend_from_slice(&rng.bytes(body_len)); - } + body.extend_from_slice(&rng.bytes(body_len)); rec.extend_from_slice(&body); - rec.push(0x16); // inner content type marker (handshake) - rec.extend_from_slice(&rng.bytes(16)); // AEAD-like tag + rec.push(0x16); + rec.extend_from_slice(&rng.bytes(16)); } else { rec.extend_from_slice(&rng.bytes(size)); } @@ -408,7 +446,8 @@ mod tests { use std::time::SystemTime; use crate::tls_front::types::{ - CachedTlsData, ParsedServerHello, TlsBehaviorProfile, TlsCertPayload, TlsProfileSource, + CachedTlsData, ParsedServerHello, TlsBehaviorProfile, TlsCertPayload, TlsExtension, + TlsProfileSource, }; use super::{ @@ -432,6 +471,38 @@ mod tests { &response[app_start + 5..app_start + 5 + app_len] } + fn server_hello_cipher_suite(response: &[u8]) -> [u8; 2] { + let mut pos = 5 + 4 + 2 + 32; + let session_id_len = response[pos] as usize; + pos += 1 + session_id_len; + [response[pos], response[pos + 1]] + } + + fn server_hello_extension_types(response: &[u8]) -> Vec { + let record_len = u16::from_be_bytes([response[3], response[4]]) as usize; + let handshake_end = 5 + record_len; + let mut pos = 5 + 4 + 2 + 32; + let session_id_len = response[pos] as usize; + pos += 1 + session_id_len + 2 + 1; + let extensions_len = u16::from_be_bytes([response[pos], response[pos + 1]]) as usize; + pos += 2; + let extensions_end = (pos + extensions_len).min(handshake_end); + let mut out = Vec::new(); + + while pos + 4 <= extensions_end { + let ext_type = u16::from_be_bytes([response[pos], response[pos + 1]]); + let ext_len = u16::from_be_bytes([response[pos + 2], response[pos + 3]]) as usize; + pos += 4; + if pos + ext_len > extensions_end { + break; + } + out.push(ext_type); + pos += ext_len; + } + + out + } + fn make_cached(cert_payload: Option) -> CachedTlsData { CachedTlsData { server_hello_template: ParsedServerHello { @@ -468,6 +539,7 @@ mod tests { true, true, ClientHelloTlsVersion::Tls12, + [0x13, 0x01], &rng, None, 0, @@ -484,6 +556,65 @@ mod tests { assert!(payload.starts_with(&cert_msg)); } + #[test] + fn test_build_emulated_server_hello_uses_selected_cipher_suite() { + let cached = make_cached(None); + let rng = SecureRandom::new(); + let response = build_emulated_server_hello( + b"secret", + &[0x10; 32], + &[0x20; 16], + &cached, + false, + true, + ClientHelloTlsVersion::Tls13, + [0x13, 0x03], + &rng, + None, + 0, + ); + + assert_eq!(server_hello_cipher_suite(&response), [0x13, 0x03]); + } + + #[test] + fn test_build_emulated_server_hello_replays_profiled_safe_extension_order() { + let mut cached = make_cached(None); + cached.server_hello_template.extensions = vec![ + TlsExtension { + ext_type: 0x002b, + data: vec![0x03, 0x04], + }, + TlsExtension { + ext_type: 0x0010, + data: vec![0x00, 0x03, 0x02, b'h', b'2'], + }, + TlsExtension { + ext_type: 0x0033, + data: vec![0; 36], + }, + ]; + let rng = SecureRandom::new(); + let response = build_emulated_server_hello( + b"secret", + &[0x21; 32], + &[0x22; 16], + &cached, + false, + true, + ClientHelloTlsVersion::Tls13, + [0x13, 0x01], + &rng, + Some(b"h2".to_vec()), + 0, + ); + + assert_eq!( + server_hello_extension_types(&response), + vec![0x002b, 0x0033] + ); + } + #[test] fn test_build_emulated_server_hello_random_fallback_when_no_cert_payload() { let cached = make_cached(None); @@ -496,6 +627,7 @@ mod tests { true, true, ClientHelloTlsVersion::Tls12, + [0x13, 0x01], &rng, None, 0, @@ -530,6 +662,7 @@ mod tests { false, true, ClientHelloTlsVersion::Tls12, + [0x13, 0x01], &rng, None, 0, @@ -570,6 +703,7 @@ mod tests { true, true, ClientHelloTlsVersion::Tls13, + [0x13, 0x01], &rng, None, 0, @@ -583,7 +717,7 @@ mod tests { } #[test] - fn test_build_emulated_server_hello_compact_disabled_skips_compact_payload() { + fn test_build_emulated_server_hello_keeps_alpn_marker_out_of_random_payload() { let mut cached = make_cached(None); cached.cert_info = Some(crate::tls_front::types::ParsedCertificateInfo { not_after_unix: Some(1_900_000_000), @@ -602,6 +736,7 @@ mod tests { false, false, ClientHelloTlsVersion::Tls12, + [0x13, 0x01], &rng, Some(b"h2".to_vec()), 0, @@ -610,8 +745,8 @@ mod tests { let payload = first_app_data_payload(&response); let expected_alpn_marker = [0x00u8, 0x10, 0x00, 0x05, 0x00, 0x03, 0x02, b'h', b'2']; assert!( - payload.starts_with(&expected_alpn_marker), - "when compact mode is disabled and no full cert payload exists, the random/alpn path must be used" + !payload.starts_with(&expected_alpn_marker), + "random fallback payload must not expose plaintext ALPN marker bytes" ); } @@ -633,6 +768,7 @@ mod tests { false, true, ClientHelloTlsVersion::Tls13, + [0x13, 0x01], &rng, None, 0, diff --git a/src/tls_front/tests/emulator_profile_fidelity_security_tests.rs b/src/tls_front/tests/emulator_profile_fidelity_security_tests.rs index ba0e137..3fbba07 100644 --- a/src/tls_front/tests/emulator_profile_fidelity_security_tests.rs +++ b/src/tls_front/tests/emulator_profile_fidelity_security_tests.rs @@ -65,6 +65,7 @@ fn emulated_server_hello_keeps_single_change_cipher_spec_for_client_compatibilit false, true, ClientHelloTlsVersion::Tls13, + [0x13, 0x01], &rng, None, 0, @@ -89,6 +90,7 @@ fn emulated_server_hello_does_not_emit_profile_ticket_tail_when_disabled() { false, true, ClientHelloTlsVersion::Tls13, + [0x13, 0x01], &rng, None, 0, @@ -111,6 +113,7 @@ fn emulated_server_hello_uses_profile_ticket_lengths_when_enabled() { false, true, ClientHelloTlsVersion::Tls13, + [0x13, 0x01], &rng, None, 2, diff --git a/src/tls_front/tests/emulator_security_tests.rs b/src/tls_front/tests/emulator_security_tests.rs index ce493bb..c3ef96d 100644 --- a/src/tls_front/tests/emulator_security_tests.rs +++ b/src/tls_front/tests/emulator_security_tests.rs @@ -58,6 +58,7 @@ fn emulated_server_hello_ignores_oversized_alpn_when_marker_would_not_fit() { true, true, ClientHelloTlsVersion::Tls13, + [0x13, 0x01], &rng, Some(oversized_alpn), 0, @@ -84,7 +85,7 @@ fn emulated_server_hello_ignores_oversized_alpn_when_marker_would_not_fit() { } #[test] -fn emulated_server_hello_embeds_full_alpn_marker_when_body_can_fit() { +fn emulated_server_hello_keeps_alpn_marker_out_of_appdata() { let cached = make_cached(None); let rng = SecureRandom::new(); @@ -96,6 +97,7 @@ fn emulated_server_hello_embeds_full_alpn_marker_when_body_can_fit() { true, true, ClientHelloTlsVersion::Tls13, + [0x13, 0x01], &rng, Some(b"h2".to_vec()), 0, @@ -104,8 +106,8 @@ fn emulated_server_hello_embeds_full_alpn_marker_when_body_can_fit() { let payload = first_app_data_payload(&response); let expected = [0x00u8, 0x10, 0x00, 0x05, 0x00, 0x03, 0x02, b'h', b'2']; assert!( - payload.starts_with(&expected), - "when body has enough capacity, emulated first application record must include full ALPN marker" + !payload.starts_with(&expected), + "emulated ApplicationData must not expose plaintext ALPN marker bytes" ); } @@ -126,6 +128,7 @@ fn emulated_server_hello_prefers_cert_payload_over_alpn_marker() { true, true, ClientHelloTlsVersion::Tls12, + [0x13, 0x01], &rng, Some(b"h2".to_vec()), 0, diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 35021aa..4c10a3b 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -8,6 +8,7 @@ use std::time::{Duration, Instant}; use bytes::BytesMut; use rand::RngExt; use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; @@ -26,6 +27,7 @@ const ME_ACTIVE_PING_JITTER_SECS: i64 = 5; const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5; const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700; const ME_PING_TRACKER_CLEANUP_EVERY: u32 = 32; +const ME_SERVICE_SIGNAL_SEND_TIMEOUT_MS: u64 = 50; #[derive(Clone, Copy)] enum WriterTeardownMode { @@ -45,6 +47,11 @@ enum WriterLifecycleExit { Cancelled, } +enum ServiceWriterCommandSendError { + Closed, + TimedOut, +} + async fn writer_command_loop( mut rx: mpsc::Receiver, mut rpc_writer: RpcWriter, @@ -52,6 +59,8 @@ async fn writer_command_loop( ) -> Result<()> { loop { tokio::select! { + biased; + _ = cancel.cancelled() => return Ok(()), cmd = rx.recv() => { match cmd { Some(WriterCommand::Data(payload)) => { @@ -69,7 +78,27 @@ async fn writer_command_loop( Some(WriterCommand::Close) | None => return Ok(()), } } - _ = cancel.cancelled() => return Ok(()), + } + } +} + +async fn send_service_writer_command( + tx: &mpsc::Sender, + cmd: WriterCommand, +) -> std::result::Result<(), ServiceWriterCommandSendError> { + match tx.try_send(cmd) { + Ok(()) => Ok(()), + Err(TrySendError::Closed(_)) => Err(ServiceWriterCommandSendError::Closed), + Err(TrySendError::Full(cmd)) => { + let wait = Duration::from_millis(ME_SERVICE_SIGNAL_SEND_TIMEOUT_MS); + match tokio::time::timeout(wait, tx.reserve()).await { + Ok(Ok(permit)) => { + permit.send(cmd); + Ok(()) + } + Ok(Err(_)) => Err(ServiceWriterCommandSendError::Closed), + Err(_) => Err(ServiceWriterCommandSendError::TimedOut), + } } } } @@ -108,6 +137,7 @@ async fn ping_loop( Duration::from_secs(wait) }; tokio::select! { + biased; _ = cancel_ping_token.cancelled() => return, _ = tokio::time::sleep(startup_jitter) => {} } @@ -131,6 +161,7 @@ async fn ping_loop( Duration::from_secs(secs) }; tokio::select! { + biased; _ = cancel_ping_token.cancelled() => return, _ = tokio::time::sleep(wait) => {} } @@ -151,14 +182,24 @@ async fn ping_loop( } ping_id = ping_id.wrapping_add(1); stats_ping.increment_me_keepalive_sent(); - if tx_ping - .send(WriterCommand::ControlAndFlush(payload)) - .await - .is_err() + if let Err(error) = + send_service_writer_command(&tx_ping, WriterCommand::ControlAndFlush(payload)).await { + { + let mut tracker = ping_tracker_ping.lock().await; + tracker.remove(&sent_id); + } stats_ping.increment_me_keepalive_failed(); - debug!("ME ping failed, removing dead writer"); - return; + match error { + ServiceWriterCommandSendError::Closed => { + debug!("ME ping failed, removing dead writer"); + return; + } + ServiceWriterCommandSendError::TimedOut => { + debug!("ME ping skipped: writer command channel is full"); + continue; + } + } } } } @@ -191,6 +232,7 @@ async fn rpc_proxy_req_signal_loop( }; tokio::select! { + biased; _ = cancel_signal.cancelled() => return, _ = tokio::time::sleep(Duration::from_millis(startup_jitter_ms)) => {} } @@ -207,6 +249,7 @@ async fn rpc_proxy_req_signal_loop( }; tokio::select! { + biased; _ = cancel_signal.cancelled() => return, _ = tokio::time::sleep(wait) => {} } @@ -233,14 +276,15 @@ async fn rpc_proxy_req_signal_loop( meta.proto_flags, ); - if tx_signal - .send(WriterCommand::DataAndFlush(payload)) - .await - .is_err() + if let Err(error) = + send_service_writer_command(&tx_signal, WriterCommand::DataAndFlush(payload)).await { stats_signal.increment_me_rpc_proxy_req_signal_failed_total(); let _ = pool.registry.unregister(conn_id).await; - return; + match error { + ServiceWriterCommandSendError::Closed => return, + ServiceWriterCommandSendError::TimedOut => continue, + } } stats_signal.increment_me_rpc_proxy_req_signal_sent_total(); @@ -258,14 +302,16 @@ async fn rpc_proxy_req_signal_loop( let close_payload = build_control_payload(RPC_CLOSE_EXT_U32, conn_id); - if tx_signal - .send(WriterCommand::ControlAndFlush(close_payload)) - .await - .is_err() + if let Err(error) = + send_service_writer_command(&tx_signal, WriterCommand::ControlAndFlush(close_payload)) + .await { stats_signal.increment_me_rpc_proxy_req_signal_failed_total(); let _ = pool.registry.unregister(conn_id).await; - return; + match error { + ServiceWriterCommandSendError::Closed => return, + ServiceWriterCommandSendError::TimedOut => continue, + } } stats_signal.increment_me_rpc_proxy_req_signal_close_sent_total(); diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index 66bf83d..15f6c71 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -242,6 +242,7 @@ pub(crate) async fn reader_loop( let mut raw = enc_leftover; let mut expected_seq: i32 = 0; let mut data_route_queue_full_streak = HashMap::::new(); + let mut tmp = [0u8; 65_536]; let mut fairness = WorkerFairnessState::new( WorkerFairnessConfig { worker_id: (writer_id as u16).saturating_add(1), @@ -263,18 +264,18 @@ pub(crate) async fn reader_loop( let fairshare_enabled = route_fairshare_enabled.load(Ordering::Relaxed); fairness.set_backpressure_enabled(backpressure_enabled); let fairness_has_backlog = should_schedule_fairness_retry(&fairness_snapshot); - let mut tmp = [0u8; 65_536]; let backlog_retry_enabled = fairness_has_backlog; let backlog_retry_delay = fairness_retry_delay(reader_route_data_wait_ms.load(Ordering::Relaxed)); let mut retry_only = false; let n = tokio::select! { + biased; + _ = cancel.cancelled() => return Ok(()), res = rd.read(&mut tmp) => res.map_err(ProxyError::Io)?, _ = tokio::time::sleep(backlog_retry_delay), if backlog_retry_enabled => { retry_only = true; 0usize }, - _ = cancel.cancelled() => return Ok(()), }; if retry_only { let route_wait_ms = reader_route_data_wait_ms.load(Ordering::Relaxed); diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 0f3925e..71f75f8 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -77,26 +77,24 @@ struct HotBindingTable { struct BindingState { inner: Mutex, + writer_idle_since_epoch_secs: DashMap, + bound_clients_by_writer: DashMap, + active_sessions_by_target_dc: DashMap, + last_meta_for_writer: DashMap, } struct BindingInner { - writers: HashMap>, writer_for_conn: HashMap, conns_for_writer: HashMap>, meta: HashMap, - last_meta_for_writer: HashMap, - writer_idle_since_epoch_secs: HashMap, } impl BindingInner { fn new() -> Self { Self { - writers: HashMap::new(), writer_for_conn: HashMap::new(), conns_for_writer: HashMap::new(), meta: HashMap::new(), - last_meta_for_writer: HashMap::new(), - writer_idle_since_epoch_secs: HashMap::new(), } } } @@ -149,6 +147,10 @@ impl ConnRegistry { }, binding: BindingState { inner: Mutex::new(BindingInner::new()), + writer_idle_since_epoch_secs: DashMap::new(), + bound_clients_by_writer: DashMap::new(), + active_sessions_by_target_dc: DashMap::new(), + last_meta_for_writer: DashMap::new(), }, next_id: AtomicU64::new(start), route_channel_capacity, diff --git a/src/transport/middle_proxy/registry/writer.rs b/src/transport/middle_proxy/registry/writer.rs index c2817f0..4d6ea0d 100644 --- a/src/transport/middle_proxy/registry/writer.rs +++ b/src/transport/middle_proxy/registry/writer.rs @@ -13,13 +13,63 @@ use super::{ }; impl ConnRegistry { + fn set_writer_bound_count(&self, writer_id: u64, count: usize) { + self.binding + .bound_clients_by_writer + .insert(writer_id, count); + if count == 0 { + self.binding + .writer_idle_since_epoch_secs + .entry(writer_id) + .or_insert_with(Self::now_epoch_secs); + } else { + self.binding.writer_idle_since_epoch_secs.remove(&writer_id); + } + } + + fn adjust_active_target_dc(&self, target_dc: i16, delta: isize) { + if target_dc == 0 || delta == 0 { + return; + } + if delta > 0 { + self.binding + .active_sessions_by_target_dc + .entry(target_dc) + .and_modify(|count| *count = count.saturating_add(delta as usize)) + .or_insert(delta as usize); + return; + } + + let remove = if let Some(mut count) = self + .binding + .active_sessions_by_target_dc + .get_mut(&target_dc) + { + let decrement = delta.unsigned_abs(); + *count = count.saturating_sub(decrement); + *count == 0 + } else { + false + }; + if remove { + self.binding.active_sessions_by_target_dc.remove(&target_dc); + } + } + pub async fn register_writer(&self, writer_id: u64, tx: mpsc::Sender) { let mut binding = self.binding.inner.lock().await; - binding.writers.insert(writer_id, tx.clone()); binding .conns_for_writer .entry(writer_id) .or_insert_with(HashSet::new); + self.binding + .bound_clients_by_writer + .entry(writer_id) + .or_insert(0); + self.binding + .writer_idle_since_epoch_secs + .entry(writer_id) + .or_insert_with(Self::now_epoch_secs); self.writers.map.insert(writer_id, tx); } @@ -29,19 +79,18 @@ impl ConnRegistry { self.routing.byte_budget.remove(&id); self.hot_binding.map.remove(&id); let mut binding = self.binding.inner.lock().await; - binding.meta.remove(&id); + let previous_meta = binding.meta.remove(&id); + if let Some(meta) = previous_meta.as_ref() { + self.adjust_active_target_dc(meta.target_dc, -1); + } if let Some(writer_id) = binding.writer_for_conn.remove(&id) { - let became_empty = if let Some(set) = binding.conns_for_writer.get_mut(&writer_id) { + let next_count = if let Some(set) = binding.conns_for_writer.get_mut(&writer_id) { set.remove(&id); - set.is_empty() + set.len() } else { - false + 0 }; - if became_empty { - binding - .writer_idle_since_epoch_secs - .insert(writer_id, Self::now_epoch_secs()); - } + self.set_writer_bound_count(writer_id, next_count); return Some(writer_id); } None @@ -248,7 +297,7 @@ impl ConnRegistry { if !self.routing.map.contains_key(&conn_id) { return false; } - if !binding.writers.contains_key(&writer_id) { + if !self.writers.map.contains_key(&writer_id) { return false; } @@ -256,28 +305,32 @@ impl ConnRegistry { if let Some(previous_writer_id) = previous_writer_id && previous_writer_id != writer_id { - let became_empty = + let next_count = if let Some(set) = binding.conns_for_writer.get_mut(&previous_writer_id) { set.remove(&conn_id); - set.is_empty() + set.len() } else { - false + 0 }; - if became_empty { - binding - .writer_idle_since_epoch_secs - .insert(previous_writer_id, Self::now_epoch_secs()); - } + self.set_writer_bound_count(previous_writer_id, next_count); } - binding.meta.insert(conn_id, meta.clone()); - binding.last_meta_for_writer.insert(writer_id, meta.clone()); - binding.writer_idle_since_epoch_secs.remove(&writer_id); - binding - .conns_for_writer - .entry(writer_id) - .or_insert_with(HashSet::new) - .insert(conn_id); + if let Some(previous_meta) = binding.meta.insert(conn_id, meta.clone()) { + self.adjust_active_target_dc(previous_meta.target_dc, -1); + } + self.adjust_active_target_dc(meta.target_dc, 1); + self.binding + .last_meta_for_writer + .insert(writer_id, meta.clone()); + let next_count = { + let set = binding + .conns_for_writer + .entry(writer_id) + .or_insert_with(HashSet::new); + set.insert(conn_id); + set.len() + }; + self.set_writer_bound_count(writer_id, next_count); self.hot_binding .map .insert(conn_id, HotConnBinding { writer_id, meta }); @@ -290,27 +343,38 @@ impl ConnRegistry { .conns_for_writer .entry(writer_id) .or_insert_with(HashSet::new); - binding - .writer_idle_since_epoch_secs - .entry(writer_id) - .or_insert(Self::now_epoch_secs()); + let count = binding + .conns_for_writer + .get(&writer_id) + .map(|set| set.len()) + .unwrap_or(0); + self.set_writer_bound_count(writer_id, count); } pub async fn get_last_writer_meta(&self, writer_id: u64) -> Option { - let binding = self.binding.inner.lock().await; - binding.last_meta_for_writer.get(&writer_id).cloned() + self.binding + .last_meta_for_writer + .get(&writer_id) + .map(|entry| entry.value().clone()) } pub async fn writer_idle_since_snapshot(&self) -> HashMap { - let binding = self.binding.inner.lock().await; - binding.writer_idle_since_epoch_secs.clone() + self.binding + .writer_idle_since_epoch_secs + .iter() + .map(|entry| (*entry.key(), *entry.value())) + .collect() } pub async fn writer_idle_since_for_writer_ids(&self, writer_ids: &[u64]) -> HashMap { - let binding = self.binding.inner.lock().await; let mut out = HashMap::::with_capacity(writer_ids.len()); for writer_id in writer_ids { - if let Some(idle_since) = binding.writer_idle_since_epoch_secs.get(writer_id).copied() { + if let Some(idle_since) = self + .binding + .writer_idle_since_epoch_secs + .get(writer_id) + .map(|entry| *entry.value()) + { out.insert(*writer_id, idle_since); } } @@ -320,25 +384,19 @@ impl ConnRegistry { pub(in crate::transport::middle_proxy) async fn writer_activity_snapshot( &self, ) -> WriterActivitySnapshot { - let binding = self.binding.inner.lock().await; - let mut bound_clients_by_writer = HashMap::::new(); - let mut active_sessions_by_target_dc = HashMap::::new(); - - for (writer_id, conn_ids) in &binding.conns_for_writer { - bound_clients_by_writer.insert(*writer_id, conn_ids.len()); - } - for conn_meta in binding.meta.values() { - if conn_meta.target_dc == 0 { - continue; - } - *active_sessions_by_target_dc - .entry(conn_meta.target_dc) - .or_insert(0) += 1; - } - WriterActivitySnapshot { - bound_clients_by_writer, - active_sessions_by_target_dc, + bound_clients_by_writer: self + .binding + .bound_clients_by_writer + .iter() + .map(|entry| (*entry.key(), *entry.value())) + .collect(), + active_sessions_by_target_dc: self + .binding + .active_sessions_by_target_dc + .iter() + .map(|entry| (*entry.key(), *entry.value())) + .collect(), } } @@ -393,10 +451,10 @@ impl ConnRegistry { pub async fn writer_lost(&self, writer_id: u64) -> Vec { let mut binding = self.binding.inner.lock().await; - binding.writers.remove(&writer_id); self.writers.map.remove(&writer_id); - binding.last_meta_for_writer.remove(&writer_id); - binding.writer_idle_since_epoch_secs.remove(&writer_id); + self.binding.last_meta_for_writer.remove(&writer_id); + self.binding.writer_idle_since_epoch_secs.remove(&writer_id); + self.binding.bound_clients_by_writer.remove(&writer_id); let conns = binding .conns_for_writer .remove(&writer_id) @@ -410,6 +468,10 @@ impl ConnRegistry { continue; } binding.writer_for_conn.remove(&conn_id); + let meta = binding.meta.remove(&conn_id); + if let Some(meta) = meta.as_ref() { + self.adjust_active_target_dc(meta.target_dc, -1); + } let remove_hot = self .hot_binding .map @@ -419,11 +481,8 @@ impl ConnRegistry { if remove_hot { self.hot_binding.map.remove(&conn_id); } - if let Some(m) = binding.meta.get(&conn_id) { - out.push(BoundConn { - conn_id, - meta: m.clone(), - }); + if let Some(m) = meta { + out.push(BoundConn { conn_id, meta: m }); } } out @@ -438,11 +497,10 @@ impl ConnRegistry { } pub async fn is_writer_empty(&self, writer_id: u64) -> bool { - let binding = self.binding.inner.lock().await; - binding - .conns_for_writer + self.binding + .bound_clients_by_writer .get(&writer_id) - .map(|s| s.is_empty()) + .map(|count| *count.value() == 0) .unwrap_or(true) } @@ -457,21 +515,20 @@ impl ConnRegistry { return false; } - binding.writers.remove(&writer_id); self.writers.map.remove(&writer_id); - binding.last_meta_for_writer.remove(&writer_id); - binding.writer_idle_since_epoch_secs.remove(&writer_id); + self.binding.last_meta_for_writer.remove(&writer_id); + self.binding.writer_idle_since_epoch_secs.remove(&writer_id); + self.binding.bound_clients_by_writer.remove(&writer_id); binding.conns_for_writer.remove(&writer_id); true } #[allow(dead_code)] pub(super) async fn non_empty_writer_ids(&self, writer_ids: &[u64]) -> HashSet { - let binding = self.binding.inner.lock().await; let mut out = HashSet::::with_capacity(writer_ids.len()); for writer_id in writer_ids { - if let Some(conns) = binding.conns_for_writer.get(writer_id) - && !conns.is_empty() + if let Some(count) = self.binding.bound_clients_by_writer.get(writer_id) + && *count.value() > 0 { out.insert(*writer_id); } diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 847d60e..3c1963b 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; +use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, warn}; @@ -15,7 +16,6 @@ use super::registry::ConnMeta; use super::wire::build_proxy_req_payload; use crate::config::{MeRouteNoWriterMode, MeWriterPickMode}; use crate::error::{ProxyError, Result}; -use crate::network::IpFamily; use crate::stream::PooledBuffer; use rand::seq::SliceRandom; @@ -34,6 +34,11 @@ mod close; mod recovery; mod selection; +enum WriterCommandReserveError { + Closed, + TimedOut, +} + fn proxy_tag_array(tag: Option<&[u8]>) -> Option<[u8; 16]> { tag.and_then(|tag| <[u8; 16]>::try_from(tag).ok()) } @@ -45,6 +50,21 @@ fn proxy_req_payload_from_command(cmd: WriterCommand) -> Option { } } +async fn reserve_writer_command_slot( + tx: &mpsc::Sender, + wait: Option, +) -> std::result::Result, WriterCommandReserveError> { + let reserve = tx.clone().reserve_owned(); + match wait { + Some(wait) => match tokio::time::timeout(wait, reserve).await { + Ok(Ok(permit)) => Ok(permit), + Ok(Err(_)) => Err(WriterCommandReserveError::Closed), + Err(_) => Err(WriterCommandReserveError::TimedOut), + }, + None => reserve.await.map_err(|_| WriterCommandReserveError::Closed), + } +} + impl MePool { /// Send RPC_PROXY_REQ. `tag_override`: per-user ad_tag (from access.user_ad_tags); if None, uses pool default. pub async fn send_proxy_req( @@ -105,9 +125,25 @@ impl MePool { return Ok(()); } Err(TrySendError::Full(cmd)) => { - if current.tx.send(cmd).await.is_ok() { - self.note_hybrid_route_success(); - return Ok(()); + match reserve_writer_command_slot( + ¤t.tx, + self.route_runtime.me_route_blocking_send_timeout, + ) + .await + { + Ok(permit) => { + permit.send(cmd); + self.note_hybrid_route_success(); + return Ok(()); + } + Err(WriterCommandReserveError::TimedOut) => { + self.stats + .increment_me_writer_pick_full_total(self.writer_pick_mode()); + return Err(ProxyError::Proxy( + "ME writer channel full within blocking send timeout".into(), + )); + } + Err(WriterCommandReserveError::Closed) => {} } warn!(writer_id = current.writer_id, "ME writer channel closed"); self.remove_writer_and_close_clients(current.writer_id) @@ -124,9 +160,8 @@ impl MePool { } let mut writers_snapshot = { - let ws = self.writers.read().await; + let ws = self.writers.snapshot(); if ws.is_empty() { - drop(ws); match no_writer_mode { MeRouteNoWriterMode::AsyncRecoveryFailfast => { let deadline = *no_writer_deadline.get_or_insert_with(|| { @@ -154,38 +189,28 @@ impl MePool { for _ in 0..self.route_runtime.me_route_inline_recovery_attempts.max(1) { - for family in self.family_order() { - let map = match family { - IpFamily::V4 => self.proxy_map_v4.read().await.clone(), - IpFamily::V6 => self.proxy_map_v6.read().await.clone(), - }; - for (dc, addrs) in &map { - for (ip, port) in addrs { - let addr = SocketAddr::new(*ip, *port); - let _ = self - .connect_one_for_dc( - addr, - *dc, - self.rng.as_ref(), - ) - .await; - } + let preferred = self.preferred_endpoints_by_dc.load_full(); + for (dc, addrs) in preferred.iter() { + for addr in addrs { + let _ = self + .connect_one_for_dc(*addr, *dc, self.rng.as_ref()) + .await; } } - if !self.writers.read().await.is_empty() { + if !self.writers.snapshot().is_empty() { break; } } } - if !self.writers.read().await.is_empty() { + if !self.writers.snapshot().is_empty() { continue; } let deadline = *no_writer_deadline.get_or_insert_with(|| { Instant::now() + self.route_runtime.me_route_inline_recovery_wait }); if !self.wait_for_writer_until(deadline).await { - if !self.writers.read().await.is_empty() { + if !self.writers.snapshot().is_empty() { continue; } self.stats.increment_me_no_writer_failfast_total(); @@ -222,7 +247,7 @@ impl MePool { } } } - ws.clone() + ws }; let mut candidate_indices = self @@ -285,7 +310,12 @@ impl MePool { )); } emergency_attempts += 1; - let mut endpoints = self.endpoint_candidates_for_target_dc(routed_dc).await; + let mut endpoints = self + .preferred_endpoints_by_dc + .load() + .get(&routed_dc) + .cloned() + .unwrap_or_default(); endpoints.shuffle(&mut rand::rng()); for addr in endpoints { if self @@ -298,9 +328,7 @@ impl MePool { } tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64)) .await; - let ws2 = self.writers.read().await; - writers_snapshot = ws2.clone(); - drop(ws2); + writers_snapshot = self.writers.snapshot(); candidate_indices = self .candidate_indices_for_dc(&writers_snapshot, routed_dc, false) .await; @@ -563,33 +591,48 @@ impl MePool { self.note_hybrid_route_success(); return Ok(()); } - Err(TrySendError::Full(cmd)) => match current.tx.send(cmd).await { - Ok(()) => { - self.note_hybrid_route_success(); - return Ok(()); - } - Err(send_err) => { - let Some(payload) = proxy_req_payload_from_command(send_err.0) else { + Err(TrySendError::Full(cmd)) => { + match reserve_writer_command_slot( + ¤t.tx, + self.route_runtime.me_route_blocking_send_timeout, + ) + .await + { + Ok(permit) => { + permit.send(cmd); + self.note_hybrid_route_success(); + return Ok(()); + } + Err(WriterCommandReserveError::TimedOut) => { + self.stats + .increment_me_writer_pick_full_total(self.writer_pick_mode()); return Err(ProxyError::Proxy( - "ME writer rejected unexpected command type".into(), + "ME writer channel full within blocking send timeout".into(), )); - }; - warn!(writer_id = current.writer_id, "ME writer channel closed"); - self.remove_writer_and_close_clients(current.writer_id) - .await; - return self - .send_proxy_req( - conn_id, - target_dc, - client_addr, - our_addr, - payload.as_ref(), - proto_flags, - tag.as_ref().map(|tag| tag.as_slice()), - ) - .await; + } + Err(WriterCommandReserveError::Closed) => { + let Some(payload) = proxy_req_payload_from_command(cmd) else { + return Err(ProxyError::Proxy( + "ME writer rejected unexpected command type".into(), + )); + }; + warn!(writer_id = current.writer_id, "ME writer channel closed"); + self.remove_writer_and_close_clients(current.writer_id) + .await; + return self + .send_proxy_req( + conn_id, + target_dc, + client_addr, + our_addr, + payload.as_ref(), + proto_flags, + tag.as_ref().map(|tag| tag.as_slice()), + ) + .await; + } } - }, + } Err(TrySendError::Closed(cmd)) => { let Some(payload) = proxy_req_payload_from_command(cmd) else { return Err(ProxyError::Proxy( diff --git a/src/transport/middle_proxy/send/close.rs b/src/transport/middle_proxy/send/close.rs index ddd25c9..9a8055f 100644 --- a/src/transport/middle_proxy/send/close.rs +++ b/src/transport/middle_proxy/send/close.rs @@ -10,18 +10,43 @@ use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32}; use super::super::MePool; use super::super::codec::{WriterCommand, build_control_payload}; +use super::{WriterCommandReserveError, reserve_writer_command_slot}; + +const ME_CLOSE_SIGNAL_SEND_TIMEOUT: Duration = Duration::from_millis(50); impl MePool { + /// Sends an extended close signal for a client-bound ME connection. pub async fn send_close(self: &Arc, conn_id: u64) -> Result<()> { if let Some(w) = self.registry.get_writer(conn_id).await { let payload = build_control_payload(RPC_CLOSE_EXT_U32, conn_id); - if w.tx - .send(WriterCommand::ControlAndFlush(payload)) - .await - .is_err() - { - debug!("ME close write failed"); - self.remove_writer_and_close_clients(w.writer_id).await; + match w.tx.try_send(WriterCommand::ControlAndFlush(payload)) { + Ok(()) => {} + Err(TrySendError::Full(cmd)) => { + match reserve_writer_command_slot(&w.tx, Some(ME_CLOSE_SIGNAL_SEND_TIMEOUT)) + .await + { + Ok(permit) => { + permit.send(cmd); + } + Err(WriterCommandReserveError::TimedOut) => { + debug!(conn_id, "ME close skipped: writer command channel is full"); + } + Err(WriterCommandReserveError::Closed) => { + debug!( + conn_id, + "ME close skipped: writer command channel is closed" + ); + self.remove_writer_and_close_clients(w.writer_id).await; + } + } + } + Err(TrySendError::Closed(_)) => { + debug!( + conn_id, + "ME close skipped: writer command channel is closed" + ); + self.remove_writer_and_close_clients(w.writer_id).await; + } } } else { debug!(conn_id, "ME close skipped (writer missing)"); @@ -31,13 +56,16 @@ impl MePool { Ok(()) } + /// Sends the compact close signal used by ME-side forced connection teardown. pub async fn send_close_conn(self: &Arc, conn_id: u64) -> Result<()> { if let Some(w) = self.registry.get_writer(conn_id).await { let payload = build_control_payload(RPC_CLOSE_CONN_U32, conn_id); match w.tx.try_send(WriterCommand::ControlAndFlush(payload)) { Ok(()) => {} Err(TrySendError::Full(cmd)) => { - let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await; + let _ = reserve_writer_command_slot(&w.tx, Some(ME_CLOSE_SIGNAL_SEND_TIMEOUT)) + .await + .map(|permit| permit.send(cmd)); } Err(TrySendError::Closed(_)) => { debug!(conn_id, "ME close_conn skipped: writer channel closed"); @@ -51,6 +79,7 @@ impl MePool { Ok(()) } + /// Sends close signals for all currently registered ME-bound connections during shutdown. pub async fn shutdown_send_close_conn_all(self: &Arc) -> usize { let conn_ids = self.registry.active_conn_ids().await; let total = conn_ids.len(); @@ -60,6 +89,7 @@ impl MePool { total } + /// Returns the current number of active ME writers tracked by the pool. pub fn connection_count(&self) -> usize { self.conn_count.load(Ordering::Relaxed) } diff --git a/src/transport/middle_proxy/send/recovery.rs b/src/transport/middle_proxy/send/recovery.rs index 85772da..ab38b5e 100644 --- a/src/transport/middle_proxy/send/recovery.rs +++ b/src/transport/middle_proxy/send/recovery.rs @@ -1,13 +1,9 @@ -use std::collections::HashSet; -use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; use tracing::warn; -use crate::network::IpFamily; - use super::super::MePool; use super::{ HYBRID_GLOBAL_BURST_PERIOD_ROUNDS, HYBRID_RECENT_SUCCESS_WINDOW_MS, @@ -17,18 +13,18 @@ use super::{ impl MePool { pub(super) async fn wait_for_writer_until(&self, deadline: Instant) -> bool { let mut rx = self.writer_epoch.subscribe(); - if !self.writers.read().await.is_empty() { + if !self.writers.snapshot().is_empty() { return true; } let now = Instant::now(); if now >= deadline { - return !self.writers.read().await.is_empty(); + return !self.writers.snapshot().is_empty(); } let timeout = deadline.saturating_duration_since(now); if tokio::time::timeout(timeout, rx.changed()).await.is_ok() { - return !self.writers.read().await.is_empty(); + return !self.writers.snapshot().is_empty(); } - !self.writers.read().await.is_empty() + !self.writers.snapshot().is_empty() } pub(super) async fn wait_for_candidate_until(&self, routed_dc: i32, deadline: Instant) -> bool { @@ -58,11 +54,11 @@ impl MePool { pub(super) async fn has_candidate_for_target_dc(&self, routed_dc: i32) -> bool { let writers_snapshot = { - let ws = self.writers.read().await; + let ws = self.writers.snapshot(); if ws.is_empty() { return false; } - ws.clone() + ws }; let mut candidate_indices = self .candidate_indices_for_dc(&writers_snapshot, routed_dc, false) @@ -79,7 +75,7 @@ impl MePool { self: &Arc, routed_dc: i32, ) -> bool { - let endpoints = self.endpoint_candidates_for_target_dc(routed_dc).await; + let endpoints = self.preferred_endpoints_for_dc(routed_dc).await; if endpoints.is_empty() { return false; } @@ -92,33 +88,19 @@ impl MePool { pub(super) async fn trigger_async_recovery_global(self: &Arc) { self.stats.increment_me_async_recovery_trigger_total(); - let mut seen = HashSet::<(i32, SocketAddr)>::new(); - for family in self.family_order() { - let map_guard = match family { - IpFamily::V4 => self.proxy_map_v4.read().await, - IpFamily::V6 => self.proxy_map_v6.read().await, - }; - for (dc, addrs) in map_guard.iter() { - for (ip, port) in addrs { - let addr = SocketAddr::new(*ip, *port); - if seen.insert((*dc, addr)) { - self.trigger_immediate_refill_for_dc(addr, *dc); - } - if seen.len() >= 8 { - return; - } + let preferred = self.preferred_endpoints_by_dc.load(); + let mut triggered = 0usize; + for (dc, addrs) in preferred.iter() { + for addr in addrs { + self.trigger_immediate_refill_for_dc(*addr, *dc); + triggered = triggered.saturating_add(1); + if triggered >= 8 { + return; } } } } - pub(super) async fn endpoint_candidates_for_target_dc( - &self, - routed_dc: i32, - ) -> Vec { - self.preferred_endpoints_for_dc(routed_dc).await - } - pub(super) async fn maybe_trigger_hybrid_recovery( self: &Arc, routed_dc: i32, diff --git a/src/transport/middle_proxy/send/selection.rs b/src/transport/middle_proxy/send/selection.rs index ac05fa1..834e0c0 100644 --- a/src/transport/middle_proxy/send/selection.rs +++ b/src/transport/middle_proxy/send/selection.rs @@ -15,7 +15,10 @@ impl MePool { routed_dc: i32, include_warm: bool, ) -> Vec { - let preferred = self.preferred_endpoints_for_dc(routed_dc).await; + let preferred_snapshot = self.preferred_endpoints_by_dc.load(); + let Some(preferred) = preferred_snapshot.get(&routed_dc) else { + return Vec::new(); + }; if preferred.is_empty() { return Vec::new(); } @@ -25,7 +28,7 @@ impl MePool { if !self.writer_eligible_for_selection(w, include_warm) { continue; } - if w.writer_dc == routed_dc && preferred.contains(&w.addr) { + if w.writer_dc == routed_dc && preferred.binary_search(&w.addr).is_ok() { out.push(idx); } }