Implement shared MTProto framing and ME address role separation

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-06-15 08:50:08 +03:00
parent d81d7dba62
commit 37d0184a0b
13 changed files with 415 additions and 184 deletions
+3
View File
@@ -293,6 +293,8 @@ impl MePool {
};
record_bnd_status(bnd_addr_status, bnd_port_status, raw_socks_bound_addr);
let socks_bound_kdf_addr = socks_bound_addr.filter(|bound| bound.port() != 0);
// SOCKS BND is the only reflected source that can supply both KDF IP and
// port. Direct STUN reflection is IP-only and keeps the TCP local port.
let reflected = if let Some(bound) = socks_bound_kdf_addr {
Some(bound)
} else if is_socks_route {
@@ -417,6 +419,7 @@ impl MePool {
key_selector = format_args!("0x{ks:08x}"),
crypto_schema = format_args!("0x{schema:08x}"),
skew_secs = skew,
socks_kdf_policy = ?self.socks_kdf_policy(),
"ME key derivation parameters"
);
+2 -4
View File
@@ -464,8 +464,7 @@ impl MePool {
if !self.writer_accepts_new_binding(w) {
continue;
}
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
let (payload, meta) = build_routed_payload(effective_our_addr);
let (payload, meta) = build_routed_payload(our_addr);
match w.tx.clone().try_reserve_owned() {
Ok(permit) => {
if !self.registry.bind_writer(conn_id, w.id, meta).await {
@@ -520,8 +519,7 @@ impl MePool {
}
self.stats
.increment_me_writer_pick_blocking_fallback_total();
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
let (payload, meta) = build_routed_payload(effective_our_addr);
let (payload, meta) = build_routed_payload(our_addr);
let reserve_result =
if let Some(timeout) = self.route_runtime.me_route_blocking_send_timeout {
match tokio::time::timeout(timeout, w.tx.clone().reserve_owned()).await {
@@ -177,6 +177,37 @@ async fn recv_data_count(rx: &mut mpsc::Receiver<WriterCommand>, budget: Duratio
data_count
}
async fn recv_first_data_payload(
rx: &mut mpsc::Receiver<WriterCommand>,
budget: Duration,
) -> Option<Vec<u8>> {
let start = Instant::now();
while Instant::now().duration_since(start) < budget {
let remaining = budget.saturating_sub(Instant::now().duration_since(start));
match tokio::time::timeout(remaining.min(Duration::from_millis(10)), rx.recv()).await {
Ok(Some(WriterCommand::Data(payload))) => return Some(payload.to_vec()),
Ok(Some(WriterCommand::DataAndFlush(payload))) => return Some(payload.to_vec()),
Ok(Some(_)) => {}
Ok(None) => break,
Err(_) => break,
}
}
None
}
fn proxy_req_our_addr_from_payload(payload: &[u8]) -> SocketAddr {
const CLIENT_ADDR_WIRE_LEN: usize = 20;
const OUR_ADDR_OFFSET: usize = 4 + 4 + 8 + CLIENT_ADDR_WIRE_LEN;
let our_addr = &payload[OUR_ADDR_OFFSET..OUR_ADDR_OFFSET + CLIENT_ADDR_WIRE_LEN];
let ip = Ipv4Addr::new(our_addr[12], our_addr[13], our_addr[14], our_addr[15]);
let port = u32::from_le_bytes([our_addr[16], our_addr[17], our_addr[18], our_addr[19]]);
SocketAddr::new(
IpAddr::V4(ip),
u16::try_from(port).expect("test port must fit u16"),
)
}
#[tokio::test]
async fn send_proxy_req_does_not_replay_when_first_bind_commit_fails() {
let (pool, _rng) = make_pool().await;
@@ -290,3 +321,47 @@ async fn send_proxy_req_prunes_iterative_stale_bind_failures_without_data_replay
drop(writers);
assert_eq!(writer_ids, vec![23]);
}
#[tokio::test]
async fn send_proxy_req_preserves_client_facing_our_addr_when_writer_source_ip_differs() {
let (pool, _rng) = make_pool().await;
pool.rr.store(0, Ordering::Relaxed);
let (conn_id, _rx) = pool.registry.register().await;
let mut live_rx = insert_writer(
&pool,
31,
2,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 2, 31)), 443),
true,
)
.await;
{
let mut writers = pool.writers.write().await;
let writer = writers
.iter_mut()
.find(|writer| writer.id == 31)
.expect("test writer must exist");
writer.source_ip = IpAddr::V4(Ipv4Addr::new(203, 0, 113, 31));
}
let our_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 7)), 8443);
let result = pool
.send_proxy_req(
conn_id,
2,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 7)), 30002),
our_addr,
b"route",
0,
None,
)
.await;
assert!(result.is_ok());
let payload = recv_first_data_payload(&mut live_rx, Duration::from_millis(50))
.await
.expect("writer must receive routed payload");
assert_eq!(proxy_req_our_addr_from_payload(&payload), our_addr);
}