Middle-End Drafts

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-02-15 12:30:40 +03:00
parent 427c7dd375
commit f2455c9cb1
9 changed files with 174 additions and 82 deletions

View File

@@ -5,6 +5,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tracing::{debug, info, trace};
use crate::config::ProxyConfig;
use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result};
use crate::protocol::constants::*;
use crate::proxy::handshake::HandshakeSuccess;
@@ -21,6 +22,7 @@ pub(crate) async fn handle_via_middle_proxy<R, W>(
_config: Arc<ProxyConfig>,
_buffer_pool: Arc<BufferPool>,
local_addr: SocketAddr,
rng: Arc<SecureRandom>,
) -> Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
@@ -58,16 +60,17 @@ where
tokio::select! {
client_frame = read_client_payload(&mut crypto_reader, proto_tag) => {
match client_frame {
Ok(Some(payload)) => {
Ok(Some((payload, quickack))) => {
trace!(conn_id, bytes = payload.len(), "C->ME frame");
stats.add_user_octets_from(&user, payload.len() as u64);
let flags = if quickack { proto_flags | RPC_FLAG_QUICKACK } else { proto_flags };
me_pool.send_proxy_req(
conn_id,
success.dc_idx,
peer,
translated_local_addr,
&payload,
proto_flags,
flags,
).await?;
}
Ok(None) => {
@@ -83,7 +86,7 @@ where
Some(MeResponse::Data { flags, data }) => {
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
stats.add_user_octets_to(&user, data.len() as u64);
write_client_payload(&mut crypto_writer, proto_tag, flags, &data).await?;
write_client_payload(&mut crypto_writer, proto_tag, flags, &data, rng.as_ref()).await?;
}
Some(MeResponse::Ack(confirm)) => {
trace!(conn_id, confirm, "ME->C quickack");
@@ -111,11 +114,11 @@ where
async fn read_client_payload<R>(
client_reader: &mut CryptoReader<R>,
proto_tag: ProtoTag,
) -> Result<Option<Vec<u8>>>
) -> Result<Option<(Vec<u8>, bool)>>
where
R: AsyncRead + Unpin + Send + 'static,
{
let len = match proto_tag {
let (len, quickack) = match proto_tag {
ProtoTag::Abridged => {
let mut first = [0u8; 1];
match client_reader.read_exact(&mut first).await {
@@ -124,6 +127,7 @@ where
Err(e) => return Err(ProxyError::Io(e)),
}
let quickack = (first[0] & 0x80) != 0;
let len_words = if (first[0] & 0x7f) == 0x7f {
let mut ext = [0u8; 3];
client_reader
@@ -135,9 +139,10 @@ where
(first[0] & 0x7f) as usize
};
len_words
let len = len_words
.checked_mul(4)
.ok_or_else(|| ProxyError::Proxy("Abridged frame length overflow".into()))?
.ok_or_else(|| ProxyError::Proxy("Abridged frame length overflow".into()))?;
(len, quickack)
}
ProtoTag::Intermediate | ProtoTag::Secure => {
let mut len_buf = [0u8; 4];
@@ -146,7 +151,8 @@ where
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(ProxyError::Io(e)),
}
(u32::from_le_bytes(len_buf) & 0x7fff_ffff) as usize
let quickack = (len_buf[3] & 0x80) != 0;
((u32::from_le_bytes(len_buf) & 0x7fff_ffff) as usize, quickack)
}
};
@@ -159,7 +165,15 @@ where
.read_exact(&mut payload)
.await
.map_err(ProxyError::Io)?;
Ok(Some(payload))
// Secure Intermediate: remove random padding (last len%4 bytes)
if proto_tag == ProtoTag::Secure {
let rem = len % 4;
if rem != 0 && payload.len() >= rem {
payload.truncate(len - rem);
}
}
Ok(Some((payload, quickack)))
}
async fn write_client_payload<W>(
@@ -167,6 +181,7 @@ async fn write_client_payload<W>(
proto_tag: ProtoTag,
flags: u32,
data: &[u8],
rng: &SecureRandom,
) -> Result<()>
where
W: AsyncWrite + Unpin + Send + 'static,
@@ -215,7 +230,12 @@ where
.map_err(ProxyError::Io)?;
}
ProtoTag::Intermediate | ProtoTag::Secure => {
let mut len = data.len() as u32;
let padding_len = if proto_tag == ProtoTag::Secure {
(rng.bytes(1)[0] % 4) as usize
} else {
0
};
let mut len = (data.len() + padding_len) as u32;
if quickack {
len |= 0x8000_0000;
}
@@ -227,6 +247,13 @@ where
.write_all(data)
.await
.map_err(ProxyError::Io)?;
if padding_len > 0 {
let pad = rng.bytes(padding_len);
client_writer
.write_all(&pad)
.await
.map_err(ProxyError::Io)?;
}
}
}