Compare commits

...

3 Commits

Author SHA1 Message Date
Alexey 809352fac5 Merge pull request #864 from telemt/flow
Restore ME writer source IP for initial proxy request binding
2026-06-29 12:51:04 +03:00
Alexey 22627b498d Bump -> 3.4.21 2026-06-29 12:44:03 +03:00
Alexey b9c5c71dbc Restore ME writer source IP for initial proxy request binding 2026-06-29 12:37:31 +03:00
4 changed files with 13 additions and 6 deletions
Generated
+1 -1
View File
@@ -2899,7 +2899,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]] [[package]]
name = "telemt" name = "telemt"
version = "3.4.19" version = "3.4.21"
dependencies = [ dependencies = [
"aes", "aes",
"anyhow", "anyhow",
+1 -1
View File
@@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "3.4.19" version = "3.4.21"
edition = "2024" edition = "2024"
[features] [features]
+6 -2
View File
@@ -464,7 +464,9 @@ impl MePool {
if !self.writer_accepts_new_binding(w) { if !self.writer_accepts_new_binding(w) {
continue; continue;
} }
let (payload, meta) = build_routed_payload(our_addr); // Keep the advertised proxy IP aligned with the selected ME writer source.
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
let (payload, meta) = build_routed_payload(effective_our_addr);
match w.tx.clone().try_reserve_owned() { match w.tx.clone().try_reserve_owned() {
Ok(permit) => { Ok(permit) => {
if !self.registry.bind_writer(conn_id, w.id, meta).await { if !self.registry.bind_writer(conn_id, w.id, meta).await {
@@ -519,7 +521,9 @@ impl MePool {
} }
self.stats self.stats
.increment_me_writer_pick_blocking_fallback_total(); .increment_me_writer_pick_blocking_fallback_total();
let (payload, meta) = build_routed_payload(our_addr); // Keep the advertised proxy IP aligned with the selected ME writer source.
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
let (payload, meta) = build_routed_payload(effective_our_addr);
let reserve_result = let reserve_result =
if let Some(timeout) = self.route_runtime.me_route_blocking_send_timeout { if let Some(timeout) = self.route_runtime.me_route_blocking_send_timeout {
match tokio::time::timeout(timeout, w.tx.clone().reserve_owned()).await { match tokio::time::timeout(timeout, w.tx.clone().reserve_owned()).await {
@@ -323,7 +323,7 @@ async fn send_proxy_req_prunes_iterative_stale_bind_failures_without_data_replay
} }
#[tokio::test] #[tokio::test]
async fn send_proxy_req_preserves_client_facing_our_addr_when_writer_source_ip_differs() { async fn send_proxy_req_uses_writer_source_ip_when_advertised_our_addr_differs() {
let (pool, _rng) = make_pool().await; let (pool, _rng) = make_pool().await;
pool.rr.store(0, Ordering::Relaxed); pool.rr.store(0, Ordering::Relaxed);
@@ -363,5 +363,8 @@ async fn send_proxy_req_preserves_client_facing_our_addr_when_writer_source_ip_d
let payload = recv_first_data_payload(&mut live_rx, Duration::from_millis(50)) let payload = recv_first_data_payload(&mut live_rx, Duration::from_millis(50))
.await .await
.expect("writer must receive routed payload"); .expect("writer must receive routed payload");
assert_eq!(proxy_req_our_addr_from_payload(&payload), our_addr); assert_eq!(
proxy_req_our_addr_from_payload(&payload),
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 31)), our_addr.port())
);
} }