mirror of https://github.com/telemt/telemt.git
fix: enforce streaming cap before accumulation, unique tmp path, bounded reconnects, buffer zeroization, restrict type visibility
- secret.rs: swap resp.bytes() for resp.chunk() loop; reject each chunk before it is appended so hard_cap is never exceeded in memory (OOM/DoS fix). Replace fixed ".tmp" suffix with unique_temp_path() (timestamp + atomic counter) to prevent concurrent-writer collisions on the cache file. - pool_config.rs: add MAX_CONCURRENT_RECONNECTS=32 and batch the reconnect_all task spawn loop to prevent a thundering-herd burst on large pools. - buffer_pool.rs: call fill(0u8) before clear() in return_buffer() to overwrite the initialized region of every returned buffer (OWASP ASVS L2 V8.3.6). Add unsafe backing-byte test to verify zeroization at the allocation level, not merely via the safe len==0 API. - api/events.rs, api/runtime_stats.rs: restrict ApiEventStore and MinimalCacheEntry from pub to pub(crate) — both are consumed only within the api module tree and should not be part of the public API surface.
This commit is contained in:
parent
9f6c5aafd4
commit
3ec316fbcd
|
|
@ -0,0 +1,70 @@
|
||||||
|
# .cargo/config.toml
|
||||||
|
[build]
|
||||||
|
rustflags = [
|
||||||
|
# 1. ABSOLUTE NON-NEGOTIABLES (Forbid)
|
||||||
|
# NOTE: temporarily relax some strict lints to reduce noise while triaging legacy code.
|
||||||
|
"-D", "clippy::unwrap_used",
|
||||||
|
"-D", "clippy::expect_used",
|
||||||
|
"-D", "clippy::panic",
|
||||||
|
"-D", "clippy::todo",
|
||||||
|
"-D", "clippy::unimplemented",
|
||||||
|
"-F", "clippy::undocumented_unsafe_blocks",
|
||||||
|
|
||||||
|
# 2. BASE STRICTNESS & CORE CORRECTNESS (Deny)
|
||||||
|
"-D", "clippy::correctness",
|
||||||
|
# Note: temporarily allow some noisy lints (e.g. redundant closure, too_many_arguments,
|
||||||
|
# documentation backticks, missing const fn) while we triage existing code.
|
||||||
|
"-A", "clippy::use-self",
|
||||||
|
"-A", "clippy::redundant-closure",
|
||||||
|
"-A", "clippy::too-many-arguments",
|
||||||
|
"-A", "clippy::doc-markdown",
|
||||||
|
"-A", "clippy::missing-const-for-fn",
|
||||||
|
"-A", "clippy::unnecessary_operation",
|
||||||
|
"-A", "clippy::redundant-pub-crate",
|
||||||
|
"-A", "clippy::derive-partial-eq-without-eq",
|
||||||
|
"-D", "clippy::option_if_let_else",
|
||||||
|
"-D", "clippy::or_fun_call",
|
||||||
|
"-D", "clippy::branches_sharing_code",
|
||||||
|
"-A", "clippy::type_complexity",
|
||||||
|
"-A", "clippy::new_ret_no_self",
|
||||||
|
"-D", "clippy::single_option_map",
|
||||||
|
"-D", "clippy::useless_let_if_seq",
|
||||||
|
"-D", "clippy::redundant_locals",
|
||||||
|
"-D", "clippy::cloned_ref_to_slice_refs",
|
||||||
|
#"-D", "clippy::all",
|
||||||
|
#"-D", "clippy::pedantic",
|
||||||
|
#"-D", "clippy::cargo",
|
||||||
|
"-D", "unsafe_code",
|
||||||
|
|
||||||
|
# 3. CONCURRENCY, ASYNC, & MEMORY STRICTNESS (Deny)
|
||||||
|
"-D", "clippy::await_holding_lock",
|
||||||
|
"-D", "clippy::await_holding_refcell_ref",
|
||||||
|
"-D", "clippy::debug_assert_with_mut_call",
|
||||||
|
"-D", "clippy::macro_use_imports",
|
||||||
|
"-D", "clippy::cast_ptr_alignment",
|
||||||
|
"-D", "clippy::cast_lossless",
|
||||||
|
"-A", "clippy::cast_possible_truncation",
|
||||||
|
"-A", "clippy::cast_possible_wrap",
|
||||||
|
"-D", "clippy::ptr_as_ptr",
|
||||||
|
"-A", "clippy::significant_drop_tightening",
|
||||||
|
"-A", "clippy::significant_drop_in_scrutinee",
|
||||||
|
|
||||||
|
# 4. CRYPTOGRAPHIC, MATH & COMPLEXITY STRICTNESS (Deny)
|
||||||
|
"-D", "clippy::large_stack_arrays",
|
||||||
|
"-A", "clippy::float_cmp",
|
||||||
|
"-D", "clippy::same_functions_in_if_condition",
|
||||||
|
#"-D", "clippy::cognitive_complexity",
|
||||||
|
|
||||||
|
# 5. NATIVE COMPILER STRICTNESS (Deny)
|
||||||
|
#"-D", "missing_docs",
|
||||||
|
#"-D", "missing_debug_implementations",
|
||||||
|
"-D", "trivial_casts",
|
||||||
|
"-D", "trivial_numeric_casts",
|
||||||
|
"-D", "unused_extern_crates",
|
||||||
|
"-D", "unused_import_braces",
|
||||||
|
#"-D", "unused_qualifications",
|
||||||
|
"-D", "rust_2018_idioms",
|
||||||
|
|
||||||
|
# 6. EXPERIMENTAL (Warn)
|
||||||
|
"-A", "clippy::nursery"
|
||||||
|
]
|
||||||
|
|
@ -0,0 +1,15 @@
|
||||||
|
[bans]
|
||||||
|
multiple-versions = "deny"
|
||||||
|
wildcards = "allow"
|
||||||
|
highlight = "all"
|
||||||
|
|
||||||
|
# Explicitly flag the weak cryptography so the agent is forced to justify its existence
|
||||||
|
[[bans.skip]]
|
||||||
|
name = "md-5"
|
||||||
|
version = "*"
|
||||||
|
reason = "MUST VERIFY: Only allowed for legacy checksums, never for security."
|
||||||
|
|
||||||
|
[[bans.skip]]
|
||||||
|
name = "sha1"
|
||||||
|
version = "*"
|
||||||
|
reason = "MUST VERIFY: Only allowed for backwards compatibility."
|
||||||
|
|
@ -45,10 +45,65 @@ jobs:
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: cargo test --verbose
|
run: cargo test --verbose
|
||||||
|
|
||||||
# clippy dont fail on warnings because of active development of telemt
|
- name: Check benches compile
|
||||||
# and many warnings
|
run: cargo check --benches
|
||||||
- name: Run clippy
|
|
||||||
run: cargo clippy -- --cap-lints warn
|
- name: Run clippy policy regression tests
|
||||||
|
run: bash tools/security/test_clippy_policy.sh
|
||||||
|
|
||||||
|
- name: Run clippy for production targets (strict)
|
||||||
|
run: >-
|
||||||
|
cargo clippy --workspace --lib --bins --all-features --
|
||||||
|
-D warnings
|
||||||
|
-D clippy::correctness
|
||||||
|
-D clippy::all
|
||||||
|
-D clippy::pedantic
|
||||||
|
-D clippy::cargo
|
||||||
|
-A clippy::redundant_pub_crate
|
||||||
|
-D clippy::await_holding_lock
|
||||||
|
-D clippy::await_holding_refcell_ref
|
||||||
|
-D clippy::debug_assert_with_mut_call
|
||||||
|
-D clippy::macro_use_imports
|
||||||
|
-D clippy::cast_ptr_alignment
|
||||||
|
-D clippy::cast_lossless
|
||||||
|
-D clippy::cast_possible_truncation
|
||||||
|
-D clippy::cast_possible_wrap
|
||||||
|
-D clippy::ptr_as_ptr
|
||||||
|
-D clippy::large_stack_arrays
|
||||||
|
-D clippy::float_cmp
|
||||||
|
-D clippy::same_functions_in_if_condition
|
||||||
|
-D clippy::cognitive_complexity
|
||||||
|
-D missing_docs
|
||||||
|
-D missing_debug_implementations
|
||||||
|
-D trivial_casts
|
||||||
|
-D trivial_numeric_casts
|
||||||
|
-D unused_extern_crates
|
||||||
|
-D unused_import_braces
|
||||||
|
-D unused_qualifications
|
||||||
|
-D rust_2018_idioms
|
||||||
|
-W clippy::nursery
|
||||||
|
-F clippy::unwrap_used
|
||||||
|
-F clippy::expect_used
|
||||||
|
-F clippy::panic
|
||||||
|
-F clippy::todo
|
||||||
|
-F clippy::unimplemented
|
||||||
|
|
||||||
|
- name: Run clippy for test targets (scoped)
|
||||||
|
run: >-
|
||||||
|
cargo clippy --workspace --tests --all-features --
|
||||||
|
-D clippy::correctness
|
||||||
|
-A clippy::expect_used
|
||||||
|
-A clippy::panic
|
||||||
|
-A clippy::unwrap_used
|
||||||
|
-A clippy::todo
|
||||||
|
-A clippy::unimplemented
|
||||||
|
-A clippy::redundant_pub_crate
|
||||||
|
-A clippy::missing_const_for_fn
|
||||||
|
-A clippy::option_if_let_else
|
||||||
|
-A clippy::unused_async
|
||||||
|
-A clippy::cast_lossless
|
||||||
|
-A clippy::cast_possible_truncation
|
||||||
|
-A clippy::cast_possible_wrap
|
||||||
|
|
||||||
- name: Check for unused dependencies
|
- name: Check for unused dependencies
|
||||||
run: cargo udeps || true
|
run: cargo udeps || true
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,34 @@
|
||||||
|
name: Security
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: [ "*" ]
|
||||||
|
pull_request:
|
||||||
|
branches: [ "*" ]
|
||||||
|
|
||||||
|
env:
|
||||||
|
CARGO_TERM_COLOR: always
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
advisory-gate:
|
||||||
|
name: Advisory Gate
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
|
permissions:
|
||||||
|
contents: read
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout repository
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Install latest stable Rust toolchain
|
||||||
|
uses: dtolnay/rust-toolchain@stable
|
||||||
|
|
||||||
|
- name: Install cargo-audit
|
||||||
|
run: cargo install --locked cargo-audit
|
||||||
|
|
||||||
|
- name: Run policy regression tests
|
||||||
|
run: bash tools/security/test_enforce_audit_policy.sh
|
||||||
|
|
||||||
|
- name: Enforce advisory policy
|
||||||
|
run: bash tools/security/enforce_audit_policy.sh
|
||||||
|
|
@ -27,7 +27,7 @@ struct ApiEventsInner {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Bounded ring-buffer for control-plane API/runtime events.
|
/// Bounded ring-buffer for control-plane API/runtime events.
|
||||||
pub struct ApiEventStore {
|
pub(crate) struct ApiEventStore {
|
||||||
inner: Mutex<ApiEventsInner>,
|
inner: Mutex<ApiEventsInner>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ const FEATURE_DISABLED_REASON: &str = "feature_disabled";
|
||||||
const SOURCE_UNAVAILABLE_REASON: &str = "source_unavailable";
|
const SOURCE_UNAVAILABLE_REASON: &str = "source_unavailable";
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct MinimalCacheEntry {
|
pub(crate) struct MinimalCacheEntry {
|
||||||
pub(super) expires_at: Instant,
|
pub(super) expires_at: Instant,
|
||||||
pub(super) payload: MinimalAllPayload,
|
pub(super) payload: MinimalAllPayload,
|
||||||
pub(super) generated_at_epoch_secs: u64,
|
pub(super) generated_at_epoch_secs: u64,
|
||||||
|
|
|
||||||
|
|
@ -99,6 +99,12 @@ impl BufferPool {
|
||||||
|
|
||||||
/// Return a buffer to the pool
|
/// Return a buffer to the pool
|
||||||
fn return_buffer(&self, mut buffer: BytesMut) {
|
fn return_buffer(&self, mut buffer: BytesMut) {
|
||||||
|
// Zero the initialized region before clearing to prevent the next caller
|
||||||
|
// from observing prior connection data through the backing allocation.
|
||||||
|
// This satisfies OWASP ASVS L2 V8.3.6 (clear sensitive data from memory).
|
||||||
|
// The write is not optimized away because the allocation remains live
|
||||||
|
// (it is about to be pushed into the pool queue).
|
||||||
|
buffer.as_mut().fill(0u8);
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
|
|
||||||
// Accept buffers within [buffer_size, buffer_size * MAX_POOL_BUFFER_OVERSIZE_MULT].
|
// Accept buffers within [buffer_size, buffer_size * MAX_POOL_BUFFER_OVERSIZE_MULT].
|
||||||
|
|
@ -638,18 +644,18 @@ mod tests {
|
||||||
assert_eq!(stats.hits, 50);
|
assert_eq!(stats.hits, 50);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Security invariant: sensitive data must not leak between pool users ───
|
// Security invariant: sensitive data must not leak between pool users.
|
||||||
|
// A buffer containing sensitive bytes has its initialized region overwritten
|
||||||
// A buffer containing "sensitive" bytes must be zeroed before being handed
|
// with zeros in return_buffer() before the length is reset to 0. The next
|
||||||
// to the next caller. An attacker who can trigger repeated pool cycles against
|
// caller therefore receives a buffer whose backing bytes are provably zero,
|
||||||
// a shared buffer slot must not be able to read prior connection data.
|
// not merely logically invisible through the safe Rust API.
|
||||||
#[test]
|
#[test]
|
||||||
fn pooled_buffer_sensitive_data_is_cleared_before_reuse() {
|
fn pooled_buffer_sensitive_data_is_cleared_before_reuse() {
|
||||||
let pool = Arc::new(BufferPool::with_config(64, 2));
|
let pool = Arc::new(BufferPool::with_config(64, 2));
|
||||||
{
|
{
|
||||||
let mut buf = pool.get();
|
let mut buf = pool.get();
|
||||||
buf.extend_from_slice(b"credentials:password123");
|
buf.extend_from_slice(b"credentials:password123");
|
||||||
// Drop returns the buffer to the pool after clearing.
|
// Drop triggers return_buffer: fill(0u8) then clear().
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
let buf = pool.get();
|
let buf = pool.get();
|
||||||
|
|
@ -659,6 +665,53 @@ mod tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify that return_buffer() actually zeros the backing bytes, not merely
|
||||||
|
// resets the length. We use unsafe to read the first N bytes of the backing
|
||||||
|
// allocation after the buffer has been returned and re-issued with len=0.
|
||||||
|
//
|
||||||
|
// SAFETY reasoning: BytesMut maintains a NonNull<u8> backing allocation for
|
||||||
|
// the full capacity. When len=0 and capacity>0, the deref'd slice as_ptr()
|
||||||
|
// returns the start of that allocation (not a dangling pointer), because
|
||||||
|
// BytesMut is constructed from an Arc-managed heap block. Reading exactly
|
||||||
|
// `prior_len` bytes (which return_buffer zeroed via fill(0u8)) is valid
|
||||||
|
// because the allocation covers at least `capacity >= buffer_size` bytes.
|
||||||
|
#[allow(unsafe_code)]
|
||||||
|
#[test]
|
||||||
|
fn return_to_pool_zeros_backing_bytes_not_just_length() {
|
||||||
|
let buf_size = 16usize;
|
||||||
|
let payload: &[u8] = b"secret_payload!!"; // exactly 16 bytes = buf_size
|
||||||
|
assert_eq!(payload.len(), buf_size, "pre-condition: payload fills buffer");
|
||||||
|
|
||||||
|
let pool = Arc::new(BufferPool::with_config(buf_size, 1));
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut buf = pool.get();
|
||||||
|
buf.extend_from_slice(payload);
|
||||||
|
assert_eq!(buf.len(), buf_size);
|
||||||
|
} // drop → return_buffer → fill(0u8) → clear()
|
||||||
|
|
||||||
|
{
|
||||||
|
let buf = pool.get();
|
||||||
|
assert_eq!(buf.len(), 0, "re-issued buffer must have len=0");
|
||||||
|
assert!(
|
||||||
|
buf.capacity() >= buf_size,
|
||||||
|
"re-issued buffer must have at least the original capacity"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Read the first `payload.len()` bytes of the backing allocation
|
||||||
|
// to confirm return_buffer wrote zeros, not just reset the length.
|
||||||
|
let backing_zeroed = unsafe {
|
||||||
|
std::slice::from_raw_parts(buf.as_ptr(), payload.len())
|
||||||
|
.iter()
|
||||||
|
.all(|&b| b == 0)
|
||||||
|
};
|
||||||
|
assert!(
|
||||||
|
backing_zeroed,
|
||||||
|
"return_buffer must zero backing bytes (fill(0u8)), not merely reset len"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Verify that calling take() extracts the full content and the extracted
|
// Verify that calling take() extracts the full content and the extracted
|
||||||
// BytesMut does NOT get returned to the pool (no double-return).
|
// BytesMut does NOT get returned to the pool (no double-return).
|
||||||
#[test]
|
#[test]
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,11 @@ impl SnapshotApplyOutcome {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Limits the number of simultaneous outbound dials during a secret-rotation
|
||||||
|
// reconnect sweep, preventing a thundering-herd burst against Telegram's
|
||||||
|
// MTProto servers when the pool contains many active writers.
|
||||||
|
const MAX_CONCURRENT_RECONNECTS: usize = 32;
|
||||||
|
|
||||||
impl MePool {
|
impl MePool {
|
||||||
pub async fn update_proxy_maps(
|
pub async fn update_proxy_maps(
|
||||||
&self,
|
&self,
|
||||||
|
|
@ -110,32 +115,46 @@ impl MePool {
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reconnects every active writer concurrently so that secret rotation does not
|
// Reconnects every active writer concurrently, bounded to MAX_CONCURRENT_RECONNECTS
|
||||||
// block the caller for O(N writers × 2 s) with the old sequential approach.
|
// tasks at a time, so that secret rotation does not block the caller for
|
||||||
// Each new connection is established before the corresponding old writer is marked as
|
// O(N writers × connect latency) with the old sequential approach, and does not
|
||||||
// draining, ensuring the pool never briefly drops to zero active writers per DC.
|
// create an unbounded burst of dials against Telegram's MTProto endpoints.
|
||||||
|
// Each new connection is established before the corresponding old writer is marked
|
||||||
|
// as draining, ensuring the pool never briefly drops to zero active writers per DC.
|
||||||
pub async fn reconnect_all(self: &Arc<Self>) {
|
pub async fn reconnect_all(self: &Arc<Self>) {
|
||||||
let ws = self.writers.read().await.clone();
|
let ws = self.writers.read().await.clone();
|
||||||
let mut join = tokio::task::JoinSet::new();
|
let mut ws_iter = ws.into_iter();
|
||||||
for w in ws {
|
loop {
|
||||||
let pool = self.clone();
|
let mut join = tokio::task::JoinSet::new();
|
||||||
join.spawn(async move {
|
let mut spawned = 0usize;
|
||||||
if pool
|
for _ in 0..MAX_CONCURRENT_RECONNECTS {
|
||||||
.connect_one_for_dc(w.addr, w.writer_dc, pool.rng.as_ref())
|
if let Some(w) = ws_iter.next() {
|
||||||
.await
|
let pool = self.clone();
|
||||||
.is_ok()
|
spawned += 1;
|
||||||
{
|
join.spawn(async move {
|
||||||
pool.mark_writer_draining(w.id).await;
|
if pool
|
||||||
|
.connect_one_for_dc(w.addr, w.writer_dc, pool.rng.as_ref())
|
||||||
|
.await
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
pool.mark_writer_draining(w.id).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
if spawned == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
while join.join_next().await.is_some() {}
|
||||||
}
|
}
|
||||||
while join.join_next().await.is_some() {}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::SnapshotApplyOutcome;
|
use super::{SnapshotApplyOutcome, MAX_CONCURRENT_RECONNECTS};
|
||||||
|
|
||||||
// --- SnapshotApplyOutcome::changed() ---
|
// --- SnapshotApplyOutcome::changed() ---
|
||||||
|
|
||||||
|
|
@ -166,4 +185,48 @@ mod tests {
|
||||||
let changed_count = variants.iter().filter(|v| v.changed()).count();
|
let changed_count = variants.iter().filter(|v| v.changed()).count();
|
||||||
assert_eq!(changed_count, 1, "exactly one variant must report changed");
|
assert_eq!(changed_count, 1, "exactly one variant must report changed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- MAX_CONCURRENT_RECONNECTS ---
|
||||||
|
|
||||||
|
// The concurrency bound must be large enough for meaningful parallelism but
|
||||||
|
// small enough to prevent a thundering-herd burst against upstream endpoints.
|
||||||
|
#[test]
|
||||||
|
fn max_concurrent_reconnects_is_in_operational_range() {
|
||||||
|
assert!(
|
||||||
|
MAX_CONCURRENT_RECONNECTS >= 4,
|
||||||
|
"concurrency bound ({MAX_CONCURRENT_RECONNECTS}) is too small for useful parallelism"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
MAX_CONCURRENT_RECONNECTS <= 256,
|
||||||
|
"concurrency bound ({MAX_CONCURRENT_RECONNECTS}) risks thundering-herd on upstream"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that the batch-iteration logic never exceeds MAX_CONCURRENT_RECONNECTS
|
||||||
|
// tasks per batch, regardless of how many writers exist.
|
||||||
|
#[test]
|
||||||
|
fn batching_logic_never_spawns_more_than_cap_per_batch() {
|
||||||
|
// Simulate the batch loop with a large writer list (200 > 32).
|
||||||
|
let total_writers = 200usize;
|
||||||
|
let mut remaining = total_writers;
|
||||||
|
let mut max_batch = 0usize;
|
||||||
|
|
||||||
|
while remaining > 0 {
|
||||||
|
let batch = remaining.min(MAX_CONCURRENT_RECONNECTS);
|
||||||
|
if batch > max_batch {
|
||||||
|
max_batch = batch;
|
||||||
|
}
|
||||||
|
remaining -= batch;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
max_batch <= MAX_CONCURRENT_RECONNECTS,
|
||||||
|
"no batch must exceed MAX_CONCURRENT_RECONNECTS ({MAX_CONCURRENT_RECONNECTS})"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
max_batch,
|
||||||
|
MAX_CONCURRENT_RECONNECTS,
|
||||||
|
"first batch must be exactly MAX_CONCURRENT_RECONNECTS when writers > cap"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
use std::time::SystemTime;
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
use httpdate;
|
use httpdate;
|
||||||
|
|
||||||
use crate::error::{ProxyError, Result};
|
use crate::error::{ProxyError, Result};
|
||||||
|
|
@ -7,6 +8,19 @@ use super::selftest::record_timeskew_sample;
|
||||||
|
|
||||||
pub const PROXY_SECRET_MIN_LEN: usize = 32;
|
pub const PROXY_SECRET_MIN_LEN: usize = 32;
|
||||||
|
|
||||||
|
// Produces a unique path suffix from a nanosecond timestamp plus a per-process
|
||||||
|
// monotonic counter, preventing two concurrent writers from clobbering each
|
||||||
|
// other's in-progress temp file when the same cache path is shared.
|
||||||
|
fn unique_temp_path(cache: &str) -> String {
|
||||||
|
static NEXT_ID: AtomicU64 = AtomicU64::new(0);
|
||||||
|
let ts = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.map(|d| d.as_nanos())
|
||||||
|
.unwrap_or(0);
|
||||||
|
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
|
||||||
|
format!("{cache}.tmp.{ts}.{id}")
|
||||||
|
}
|
||||||
|
|
||||||
/// Absolute upper bound on bytes we are willing to buffer from the network before
|
/// Absolute upper bound on bytes we are willing to buffer from the network before
|
||||||
/// running any protocol-level length validation. Prevents OOM if the remote
|
/// running any protocol-level length validation. Prevents OOM if the remote
|
||||||
/// endpoint (or a MITM) sends an oversized response body.
|
/// endpoint (or a MITM) sends an oversized response body.
|
||||||
|
|
@ -41,9 +55,10 @@ pub async fn fetch_proxy_secret(cache_path: Option<&str>, max_len: usize) -> Res
|
||||||
// 1) Try fresh download first.
|
// 1) Try fresh download first.
|
||||||
match download_proxy_secret_with_max_len(max_len).await {
|
match download_proxy_secret_with_max_len(max_len).await {
|
||||||
Ok(data) => {
|
Ok(data) => {
|
||||||
// Write to a temporary file then rename for an atomic update,
|
// Write to a uniquely-named temporary file then rename for an atomic
|
||||||
// preventing a partial write from corrupting the on-disk cache.
|
// update, preventing a partial write from corrupting the on-disk cache
|
||||||
let tmp_path = format!("{cache}.tmp");
|
// and avoiding collisions between concurrent writers or processes.
|
||||||
|
let tmp_path = unique_temp_path(cache);
|
||||||
if let Err(e) = tokio::fs::write(&tmp_path, &data).await {
|
if let Err(e) = tokio::fs::write(&tmp_path, &data).await {
|
||||||
warn!(error = %e, "Failed to write proxy-secret temp file (non-fatal)");
|
warn!(error = %e, "Failed to write proxy-secret temp file (non-fatal)");
|
||||||
} else if let Err(e) = tokio::fs::rename(&tmp_path, cache).await {
|
} else if let Err(e) = tokio::fs::rename(&tmp_path, cache).await {
|
||||||
|
|
@ -85,7 +100,7 @@ pub async fn fetch_proxy_secret(cache_path: Option<&str>, max_len: usize) -> Res
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn download_proxy_secret_with_max_len(max_len: usize) -> Result<Vec<u8>> {
|
pub async fn download_proxy_secret_with_max_len(max_len: usize) -> Result<Vec<u8>> {
|
||||||
let resp = reqwest::get("https://core.telegram.org/getProxySecret")
|
let mut resp = reqwest::get("https://core.telegram.org/getProxySecret")
|
||||||
.await
|
.await
|
||||||
.map_err(|e| ProxyError::Proxy(format!("Failed to download proxy-secret: {e}")))?;
|
.map_err(|e| ProxyError::Proxy(format!("Failed to download proxy-secret: {e}")))?;
|
||||||
|
|
||||||
|
|
@ -123,21 +138,27 @@ pub async fn download_proxy_secret_with_max_len(max_len: usize) -> Result<Vec<u8
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let data = resp
|
// Read the body as a stream, checking the hard cap *before* each chunk is
|
||||||
.bytes()
|
// appended. This prevents an oversized response from exhausting memory even
|
||||||
.await
|
// when Content-Length is absent (e.g. chunked transfer encoding).
|
||||||
.map_err(|e| ProxyError::Proxy(format!("Read proxy-secret body: {e}")))?;
|
|
||||||
|
|
||||||
// Secondary cap covers chunked transfer responses that omit Content-Length.
|
|
||||||
let hard_cap = max_len.min(PROXY_SECRET_DOWNLOAD_HARD_CAP);
|
let hard_cap = max_len.min(PROXY_SECRET_DOWNLOAD_HARD_CAP);
|
||||||
if data.len() > hard_cap {
|
let mut data: Vec<u8> = Vec::new();
|
||||||
return Err(ProxyError::Proxy(format!(
|
loop {
|
||||||
"proxy-secret response body {} bytes exceeds hard cap {hard_cap}",
|
match resp
|
||||||
data.len()
|
.chunk()
|
||||||
)));
|
.await
|
||||||
|
.map_err(|e| ProxyError::Proxy(format!("Read proxy-secret body: {e}")))?{
|
||||||
|
Some(chunk) => {
|
||||||
|
if data.len() + chunk.len() > hard_cap {
|
||||||
|
return Err(ProxyError::Proxy(format!(
|
||||||
|
"proxy-secret response body would exceed hard cap {hard_cap} bytes"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
data.extend_from_slice(&chunk);
|
||||||
|
}
|
||||||
|
None => break,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let data = data.to_vec();
|
|
||||||
validate_proxy_secret_len(data.len(), max_len)?;
|
validate_proxy_secret_len(data.len(), max_len)?;
|
||||||
|
|
||||||
info!(len = data.len(), "Downloaded proxy-secret OK");
|
info!(len = data.len(), "Downloaded proxy-secret OK");
|
||||||
|
|
@ -219,4 +240,113 @@ mod tests {
|
||||||
let ok_content_len: u64 = hard_cap;
|
let ok_content_len: u64 = hard_cap;
|
||||||
assert!(ok_content_len <= hard_cap);
|
assert!(ok_content_len <= hard_cap);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- Streaming cap check (mirrors the chunk-accumulation loop) ---
|
||||||
|
|
||||||
|
// The rejection must fire *before* the offending chunk is appended, so
|
||||||
|
// memory usage never exceeds hard_cap even for a single oversized chunk.
|
||||||
|
#[test]
|
||||||
|
fn streaming_cap_rejects_at_chunk_boundary_before_copy() {
|
||||||
|
let hard_cap = 100usize;
|
||||||
|
let mut data: Vec<u8> = Vec::new();
|
||||||
|
|
||||||
|
let chunks: &[&[u8]] = &[
|
||||||
|
&[0x41u8; 60], // 60 bytes — accepted
|
||||||
|
&[0x42u8; 41], // would bring total to 101 > 100 — must reject
|
||||||
|
&[0x43u8; 10], // must never be reached
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut rejected_at = None;
|
||||||
|
for (i, chunk) in chunks.iter().enumerate() {
|
||||||
|
if data.len() + chunk.len() > hard_cap {
|
||||||
|
rejected_at = Some(i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
data.extend_from_slice(chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(rejected_at, Some(1), "rejection must occur at chunk index 1");
|
||||||
|
assert_eq!(data.len(), 60, "only the first chunk must be accumulated");
|
||||||
|
assert!(
|
||||||
|
data.len() <= hard_cap,
|
||||||
|
"accumulated bytes must not exceed hard_cap at the point of rejection"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// A single chunk that is exactly hard_cap + 1 bytes must be rejected
|
||||||
|
// immediately, with zero bytes buffered into memory.
|
||||||
|
#[test]
|
||||||
|
fn streaming_cap_rejects_single_oversized_chunk_before_any_copy() {
|
||||||
|
let hard_cap = 100usize;
|
||||||
|
let data: Vec<u8> = Vec::new();
|
||||||
|
let chunk = vec![0xDEu8; hard_cap + 1];
|
||||||
|
|
||||||
|
let would_reject = data.len() + chunk.len() > hard_cap;
|
||||||
|
|
||||||
|
assert!(would_reject, "single oversized chunk must trigger cap rejection");
|
||||||
|
assert_eq!(data.len(), 0, "zero bytes must be buffered when rejection fires");
|
||||||
|
}
|
||||||
|
|
||||||
|
// A body exactly equal to hard_cap bytes must be accepted without rejection.
|
||||||
|
#[test]
|
||||||
|
fn streaming_cap_accepts_body_exactly_at_hard_cap() {
|
||||||
|
let hard_cap = 100usize;
|
||||||
|
let mut data: Vec<u8> = Vec::new();
|
||||||
|
|
||||||
|
let chunk = vec![0xABu8; hard_cap];
|
||||||
|
let would_reject = data.len() + chunk.len() > hard_cap;
|
||||||
|
if !would_reject {
|
||||||
|
data.extend_from_slice(&chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(!would_reject, "body exactly at hard_cap must be accepted");
|
||||||
|
assert_eq!(data.len(), hard_cap);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Multiple small chunks that together exceed hard_cap must be rejected on
|
||||||
|
// the chunk that would push the total over the limit.
|
||||||
|
#[test]
|
||||||
|
fn streaming_cap_rejects_cumulative_excess_across_many_chunks() {
|
||||||
|
let hard_cap = 50usize;
|
||||||
|
let mut data: Vec<u8> = Vec::new();
|
||||||
|
let mut rejected = false;
|
||||||
|
|
||||||
|
for i in 0..10u8 {
|
||||||
|
let chunk = vec![i; 10]; // 10 chunks × 10 bytes = 100 total
|
||||||
|
if data.len() + chunk.len() > hard_cap {
|
||||||
|
rejected = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
data.extend_from_slice(&chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(rejected, "cumulative excess across chunks must trigger rejection");
|
||||||
|
assert!(
|
||||||
|
data.len() <= hard_cap,
|
||||||
|
"must not have buffered past hard_cap: got {} bytes",
|
||||||
|
data.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- unique_temp_path ---
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn unique_temp_path_generates_distinct_names_on_successive_calls() {
|
||||||
|
let p1 = unique_temp_path("proxy-secret");
|
||||||
|
let p2 = unique_temp_path("proxy-secret");
|
||||||
|
assert_ne!(p1, p2, "successive calls must produce distinct paths");
|
||||||
|
assert!(
|
||||||
|
p1.starts_with("proxy-secret.tmp."),
|
||||||
|
"path must begin with the cache name and .tmp. prefix"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn unique_temp_path_embeds_cache_name_as_prefix() {
|
||||||
|
let p = unique_temp_path("/var/cache/proxy-secret");
|
||||||
|
assert!(
|
||||||
|
p.starts_with("/var/cache/proxy-secret.tmp."),
|
||||||
|
"path must preserve the full cache path as a prefix: {p}"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue