mirror of https://github.com/telemt/telemt.git
ME Buffer reuse + Bytes Len over Full + Seq-no over Wrap-add
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
parent
ecad96374a
commit
eaba926fe5
|
|
@ -49,19 +49,32 @@ impl SecureRandom {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generate random bytes
|
/// Fill a caller-provided buffer with random bytes.
|
||||||
pub fn bytes(&self, len: usize) -> Vec<u8> {
|
pub fn fill(&self, out: &mut [u8]) {
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
const CHUNK_SIZE: usize = 512;
|
const CHUNK_SIZE: usize = 512;
|
||||||
|
|
||||||
while inner.buffer.len() < len {
|
let mut written = 0usize;
|
||||||
let mut chunk = vec![0u8; CHUNK_SIZE];
|
while written < out.len() {
|
||||||
inner.rng.fill_bytes(&mut chunk);
|
if inner.buffer.is_empty() {
|
||||||
inner.cipher.apply(&mut chunk);
|
let mut chunk = vec![0u8; CHUNK_SIZE];
|
||||||
inner.buffer.extend_from_slice(&chunk);
|
inner.rng.fill_bytes(&mut chunk);
|
||||||
|
inner.cipher.apply(&mut chunk);
|
||||||
|
inner.buffer.extend_from_slice(&chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
let take = (out.len() - written).min(inner.buffer.len());
|
||||||
|
out[written..written + take].copy_from_slice(&inner.buffer[..take]);
|
||||||
|
inner.buffer.drain(..take);
|
||||||
|
written += take;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
inner.buffer.drain(..len).collect()
|
|
||||||
|
/// Generate random bytes
|
||||||
|
pub fn bytes(&self, len: usize) -> Vec<u8> {
|
||||||
|
let mut out = vec![0u8; len];
|
||||||
|
self.fill(&mut out);
|
||||||
|
out
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generate random number in range [0, max)
|
/// Generate random number in range [0, max)
|
||||||
|
|
|
||||||
|
|
@ -95,6 +95,7 @@ where
|
||||||
let user_clone = user.clone();
|
let user_clone = user.clone();
|
||||||
let me_writer = tokio::spawn(async move {
|
let me_writer = tokio::spawn(async move {
|
||||||
let mut writer = crypto_writer;
|
let mut writer = crypto_writer;
|
||||||
|
let mut frame_buf = Vec::with_capacity(16 * 1024);
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
msg = me_rx_task.recv() => {
|
msg = me_rx_task.recv() => {
|
||||||
|
|
@ -102,7 +103,15 @@ where
|
||||||
Some(MeResponse::Data { flags, data }) => {
|
Some(MeResponse::Data { flags, data }) => {
|
||||||
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
|
trace!(conn_id, bytes = data.len(), flags, "ME->C data");
|
||||||
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
|
stats_clone.add_user_octets_to(&user_clone, data.len() as u64);
|
||||||
write_client_payload(&mut writer, proto_tag, flags, &data, rng_clone.as_ref()).await?;
|
write_client_payload(
|
||||||
|
&mut writer,
|
||||||
|
proto_tag,
|
||||||
|
flags,
|
||||||
|
&data,
|
||||||
|
rng_clone.as_ref(),
|
||||||
|
&mut frame_buf,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
// Drain all immediately queued ME responses and flush once.
|
// Drain all immediately queued ME responses and flush once.
|
||||||
while let Ok(next) = me_rx_task.try_recv() {
|
while let Ok(next) = me_rx_task.try_recv() {
|
||||||
|
|
@ -116,6 +125,7 @@ where
|
||||||
flags,
|
flags,
|
||||||
&data,
|
&data,
|
||||||
rng_clone.as_ref(),
|
rng_clone.as_ref(),
|
||||||
|
&mut frame_buf,
|
||||||
).await?;
|
).await?;
|
||||||
}
|
}
|
||||||
MeResponse::Ack(confirm) => {
|
MeResponse::Ack(confirm) => {
|
||||||
|
|
@ -363,6 +373,7 @@ async fn write_client_payload<W>(
|
||||||
flags: u32,
|
flags: u32,
|
||||||
data: &[u8],
|
data: &[u8],
|
||||||
rng: &SecureRandom,
|
rng: &SecureRandom,
|
||||||
|
frame_buf: &mut Vec<u8>,
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
where
|
||||||
W: AsyncWrite + Unpin + Send + 'static,
|
W: AsyncWrite + Unpin + Send + 'static,
|
||||||
|
|
@ -384,7 +395,8 @@ where
|
||||||
if quickack {
|
if quickack {
|
||||||
first |= 0x80;
|
first |= 0x80;
|
||||||
}
|
}
|
||||||
let mut frame_buf = Vec::with_capacity(1 + data.len());
|
frame_buf.clear();
|
||||||
|
frame_buf.reserve(1 + data.len());
|
||||||
frame_buf.push(first);
|
frame_buf.push(first);
|
||||||
frame_buf.extend_from_slice(data);
|
frame_buf.extend_from_slice(data);
|
||||||
client_writer
|
client_writer
|
||||||
|
|
@ -397,7 +409,8 @@ where
|
||||||
first |= 0x80;
|
first |= 0x80;
|
||||||
}
|
}
|
||||||
let lw = (len_words as u32).to_le_bytes();
|
let lw = (len_words as u32).to_le_bytes();
|
||||||
let mut frame_buf = Vec::with_capacity(4 + data.len());
|
frame_buf.clear();
|
||||||
|
frame_buf.reserve(4 + data.len());
|
||||||
frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]);
|
frame_buf.extend_from_slice(&[first, lw[0], lw[1], lw[2]]);
|
||||||
frame_buf.extend_from_slice(data);
|
frame_buf.extend_from_slice(data);
|
||||||
client_writer
|
client_writer
|
||||||
|
|
@ -428,11 +441,14 @@ where
|
||||||
len_val |= 0x8000_0000;
|
len_val |= 0x8000_0000;
|
||||||
}
|
}
|
||||||
let total = 4 + data.len() + padding_len;
|
let total = 4 + data.len() + padding_len;
|
||||||
let mut frame_buf = Vec::with_capacity(total);
|
frame_buf.clear();
|
||||||
|
frame_buf.reserve(total);
|
||||||
frame_buf.extend_from_slice(&len_val.to_le_bytes());
|
frame_buf.extend_from_slice(&len_val.to_le_bytes());
|
||||||
frame_buf.extend_from_slice(data);
|
frame_buf.extend_from_slice(data);
|
||||||
if padding_len > 0 {
|
if padding_len > 0 {
|
||||||
frame_buf.extend_from_slice(&rng.bytes(padding_len));
|
let start = frame_buf.len();
|
||||||
|
frame_buf.resize(start + padding_len, 0);
|
||||||
|
rng.fill(&mut frame_buf[start..]);
|
||||||
}
|
}
|
||||||
client_writer
|
client_writer
|
||||||
.write_all(&frame_buf)
|
.write_all(&frame_buf)
|
||||||
|
|
|
||||||
|
|
@ -223,7 +223,7 @@ pub(crate) struct RpcWriter {
|
||||||
impl RpcWriter {
|
impl RpcWriter {
|
||||||
pub(crate) async fn send(&mut self, payload: &[u8]) -> Result<()> {
|
pub(crate) async fn send(&mut self, payload: &[u8]) -> Result<()> {
|
||||||
let frame = build_rpc_frame(self.seq_no, payload, self.crc_mode);
|
let frame = build_rpc_frame(self.seq_no, payload, self.crc_mode);
|
||||||
self.seq_no += 1;
|
self.seq_no = self.seq_no.wrapping_add(1);
|
||||||
|
|
||||||
let pad = (16 - (frame.len() % 16)) % 16;
|
let pad = (16 - (frame.len() % 16)) % 16;
|
||||||
let mut buf = frame;
|
let mut buf = frame;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue