mirror of
https://github.com/telemt/telemt.git
synced 2026-05-22 19:51:43 +03:00
Compare commits
3 Commits
422d97a385
...
c02c7fbe43
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c02c7fbe43 | ||
|
|
8379b48f69 | ||
|
|
70d02910b7 |
@@ -21,8 +21,7 @@ pub enum LogLevel {
|
||||
#[default]
|
||||
Normal,
|
||||
/// Minimal output: only warnings and errors (warn + error).
|
||||
/// Startup messages (config, DC connectivity, proxy links) are always shown
|
||||
/// via info! before the filter is applied.
|
||||
/// Proxy links may still be emitted through their dedicated target.
|
||||
Silent,
|
||||
}
|
||||
|
||||
|
||||
@@ -392,12 +392,18 @@ impl UserIpTracker {
|
||||
let now = Instant::now();
|
||||
|
||||
let (mut active_ips, mut recent_ips) = self.active_and_recent_write().await;
|
||||
let user_active = active_ips
|
||||
.entry(username.to_string())
|
||||
.or_insert_with(HashMap::new);
|
||||
let user_recent = recent_ips
|
||||
.entry(username.to_string())
|
||||
.or_insert_with(HashMap::new);
|
||||
if !active_ips.contains_key(username) {
|
||||
active_ips.insert(username.to_string(), HashMap::new());
|
||||
}
|
||||
if !recent_ips.contains_key(username) {
|
||||
recent_ips.insert(username.to_string(), HashMap::new());
|
||||
}
|
||||
let Some(user_active) = active_ips.get_mut(username) else {
|
||||
return Err(format!("IP tracker active entry unavailable for user '{username}'"));
|
||||
};
|
||||
let Some(user_recent) = recent_ips.get_mut(username) else {
|
||||
return Err(format!("IP tracker recent entry unavailable for user '{username}'"));
|
||||
};
|
||||
let pruned_recent_entries = Self::prune_recent(user_recent, now, window);
|
||||
Self::decrement_counter(&self.recent_entry_count, pruned_recent_entries);
|
||||
let recent_contains_ip = user_recent.contains_key(&ip);
|
||||
|
||||
@@ -15,6 +15,13 @@ use crate::transport::middle_proxy::{
|
||||
save_proxy_config_cache,
|
||||
};
|
||||
|
||||
const MAESTRO_COLOR: &str = "\x1b[92m";
|
||||
const COLOR_RESET: &str = "\x1b[0m";
|
||||
|
||||
pub(crate) fn print_maestro_line(message: impl AsRef<str>) {
|
||||
eprintln!("{MAESTRO_COLOR}MAESTRO{COLOR_RESET}: {}", message.as_ref());
|
||||
}
|
||||
|
||||
pub(crate) fn resolve_runtime_config_path(
|
||||
config_path_cli: &str,
|
||||
startup_cwd: &Path,
|
||||
@@ -501,7 +508,7 @@ mod tests {
|
||||
}
|
||||
|
||||
pub(crate) fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {
|
||||
info!(target: "telemt::links", "--- Proxy Links ({}) ---", host);
|
||||
print_maestro_line(format!("Proxy links ({host})"));
|
||||
for user_name in config
|
||||
.general
|
||||
.links
|
||||
@@ -509,20 +516,16 @@ pub(crate) fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {
|
||||
.resolve_users(&config.access.users)
|
||||
{
|
||||
if let Some(secret) = config.access.users.get(user_name) {
|
||||
info!(target: "telemt::links", "User: {}", user_name);
|
||||
print_maestro_line(format!("User: {user_name}"));
|
||||
if config.general.modes.classic {
|
||||
info!(
|
||||
target: "telemt::links",
|
||||
" Classic: tg://proxy?server={}&port={}&secret={}",
|
||||
host, port, secret
|
||||
);
|
||||
print_maestro_line(format!(
|
||||
"Classic: tg://proxy?server={host}&port={port}&secret={secret}"
|
||||
));
|
||||
}
|
||||
if config.general.modes.secure {
|
||||
info!(
|
||||
target: "telemt::links",
|
||||
" DD: tg://proxy?server={}&port={}&secret=dd{}",
|
||||
host, port, secret
|
||||
);
|
||||
print_maestro_line(format!(
|
||||
"DD: tg://proxy?server={host}&port={port}&secret=dd{secret}"
|
||||
));
|
||||
}
|
||||
if config.general.modes.tls {
|
||||
let mut domains = Vec::with_capacity(1 + config.censorship.tls_domains.len());
|
||||
@@ -535,18 +538,15 @@ pub(crate) fn print_proxy_links(host: &str, port: u16, config: &ProxyConfig) {
|
||||
|
||||
for domain in domains {
|
||||
let domain_hex = hex::encode(&domain);
|
||||
info!(
|
||||
target: "telemt::links",
|
||||
" EE-TLS: tg://proxy?server={}&port={}&secret=ee{}{}",
|
||||
host, port, secret, domain_hex
|
||||
);
|
||||
print_maestro_line(format!(
|
||||
"EE-TLS: tg://proxy?server={host}&port={port}&secret=ee{secret}{domain_hex}"
|
||||
));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!(target: "telemt::links", "User '{}' in show_link not found", user_name);
|
||||
}
|
||||
}
|
||||
info!(target: "telemt::links", "------------------------");
|
||||
}
|
||||
|
||||
pub(crate) async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result<()> {
|
||||
|
||||
@@ -47,7 +47,7 @@ use crate::stats::{ReplayChecker, Stats};
|
||||
use crate::stream::BufferPool;
|
||||
use crate::transport::UpstreamManager;
|
||||
use crate::transport::middle_proxy::MePool;
|
||||
use helpers::{parse_cli, resolve_runtime_base_dir, resolve_runtime_config_path};
|
||||
use helpers::{parse_cli, print_maestro_line, resolve_runtime_base_dir, resolve_runtime_config_path};
|
||||
|
||||
#[cfg(unix)]
|
||||
use crate::daemon::{DaemonOptions, PidFile, drop_privileges};
|
||||
@@ -325,7 +325,9 @@ async fn run_telemt_core(
|
||||
config.general.log_level.clone()
|
||||
};
|
||||
|
||||
let (filter_layer, filter_handle) = reload::Layer::new(EnvFilter::new("info"));
|
||||
let initial_filter_spec = runtime_tasks::log_filter_spec(has_rust_log, &effective_log_level);
|
||||
let (filter_layer, filter_handle) =
|
||||
reload::Layer::new(EnvFilter::new(initial_filter_spec.clone()));
|
||||
startup_tracker
|
||||
.start_component(
|
||||
COMPONENT_TRACING_INIT,
|
||||
@@ -356,7 +358,7 @@ async fn run_telemt_core(
|
||||
destination: log_destination,
|
||||
disable_colors: true,
|
||||
};
|
||||
let (_, guard) = crate::logging::init_logging(&logging_opts, "info");
|
||||
let (_, guard) = crate::logging::init_logging(&logging_opts, &initial_filter_spec);
|
||||
_logging_guard = Some(guard);
|
||||
}
|
||||
crate::logging::LogDestination::File { .. } => {
|
||||
@@ -365,7 +367,7 @@ async fn run_telemt_core(
|
||||
destination: log_destination,
|
||||
disable_colors: true,
|
||||
};
|
||||
let (_, guard) = crate::logging::init_logging(&logging_opts, "info");
|
||||
let (_, guard) = crate::logging::init_logging(&logging_opts, &initial_filter_spec);
|
||||
_logging_guard = Some(guard);
|
||||
}
|
||||
}
|
||||
@@ -377,7 +379,7 @@ async fn run_telemt_core(
|
||||
)
|
||||
.await;
|
||||
|
||||
info!("Telemt MTProxy v{}", env!("CARGO_PKG_VERSION"));
|
||||
print_maestro_line(format!("Telemt MTProxy v{}", env!("CARGO_PKG_VERSION")));
|
||||
info!("Log level: {}", effective_log_level);
|
||||
if config.general.disable_colors {
|
||||
info!("Colors: disabled");
|
||||
|
||||
@@ -319,13 +319,7 @@ pub(crate) async fn apply_runtime_log_filter(
|
||||
filter_handle: reload::Handle<EnvFilter, tracing_subscriber::Registry>,
|
||||
mut log_level_rx: watch::Receiver<LogLevel>,
|
||||
) {
|
||||
let runtime_filter = if has_rust_log {
|
||||
EnvFilter::from_default_env()
|
||||
} else if matches!(effective_log_level, LogLevel::Silent) {
|
||||
EnvFilter::new("warn,telemt::links=info")
|
||||
} else {
|
||||
EnvFilter::new(effective_log_level.to_filter_str())
|
||||
};
|
||||
let runtime_filter = EnvFilter::new(log_filter_spec(has_rust_log, effective_log_level));
|
||||
filter_handle
|
||||
.reload(runtime_filter)
|
||||
.expect("Failed to switch log filter");
|
||||
@@ -336,7 +330,7 @@ pub(crate) async fn apply_runtime_log_filter(
|
||||
break;
|
||||
}
|
||||
let level = log_level_rx.borrow_and_update().clone();
|
||||
let new_filter = tracing_subscriber::EnvFilter::new(level.to_filter_str());
|
||||
let new_filter = tracing_subscriber::EnvFilter::new(log_filter_spec(false, &level));
|
||||
if let Err(e) = filter_handle.reload(new_filter) {
|
||||
tracing::error!("config reload: failed to update log filter: {}", e);
|
||||
}
|
||||
@@ -344,6 +338,17 @@ pub(crate) async fn apply_runtime_log_filter(
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn log_filter_spec(has_rust_log: bool, effective_log_level: &LogLevel) -> String {
|
||||
if has_rust_log {
|
||||
std::env::var("RUST_LOG")
|
||||
.unwrap_or_else(|_| effective_log_level.to_filter_str().to_string())
|
||||
} else if matches!(effective_log_level, LogLevel::Silent) {
|
||||
"warn,telemt::links=info".to_string()
|
||||
} else {
|
||||
effective_log_level.to_filter_str().to_string()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn spawn_metrics_if_configured(
|
||||
config: &Arc<ProxyConfig>,
|
||||
startup_tracker: &Arc<StartupTracker>,
|
||||
|
||||
@@ -515,8 +515,13 @@ fn exclusive_mask_target_for_sni<'a>(
|
||||
config: &'a ProxyConfig,
|
||||
sni: &str,
|
||||
) -> Option<MaskTcpTarget<'a>> {
|
||||
for (domain, target) in &config.censorship.exclusive_mask {
|
||||
if domain.eq_ignore_ascii_case(sni) {
|
||||
if let Some(target) = config.censorship.exclusive_mask.get(sni) {
|
||||
return parse_exclusive_mask_target(target);
|
||||
}
|
||||
|
||||
if sni.bytes().any(|byte| byte.is_ascii_uppercase()) {
|
||||
let normalized_sni = sni.to_ascii_lowercase();
|
||||
if let Some(target) = config.censorship.exclusive_mask.get(&normalized_sni) {
|
||||
return parse_exclusive_mask_target(target);
|
||||
}
|
||||
}
|
||||
@@ -529,17 +534,27 @@ fn mask_host_for_initial_data<'a>(config: &'a ProxyConfig, initial_data: &[u8])
|
||||
mask_tcp_target_for_initial_data(config, initial_data).host
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn mask_tcp_target_for_initial_data<'a>(
|
||||
config: &'a ProxyConfig,
|
||||
initial_data: &[u8],
|
||||
) -> MaskTcpTarget<'a> {
|
||||
if let Some(target) = tls::extract_sni_from_client_hello(initial_data)
|
||||
let sni = tls::extract_sni_from_client_hello(initial_data);
|
||||
if let Some(target) = sni
|
||||
.as_deref()
|
||||
.and_then(|sni| exclusive_mask_target_for_sni(config, sni))
|
||||
{
|
||||
return target;
|
||||
}
|
||||
|
||||
default_mask_tcp_target_for_initial_data(config, initial_data, sni.as_deref())
|
||||
}
|
||||
|
||||
fn default_mask_tcp_target_for_initial_data<'a>(
|
||||
config: &'a ProxyConfig,
|
||||
initial_data: &[u8],
|
||||
sni: Option<&str>,
|
||||
) -> MaskTcpTarget<'a> {
|
||||
let configured_mask_host = config
|
||||
.censorship
|
||||
.mask_host
|
||||
@@ -553,8 +568,13 @@ fn mask_tcp_target_for_initial_data<'a>(
|
||||
};
|
||||
}
|
||||
|
||||
let host = tls::extract_sni_from_client_hello(initial_data)
|
||||
.as_deref()
|
||||
let extracted_sni = if sni.is_none() {
|
||||
tls::extract_sni_from_client_hello(initial_data)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let host = sni
|
||||
.or(extracted_sni.as_deref())
|
||||
.and_then(|sni| matching_tls_domain_for_sni(config, sni))
|
||||
.unwrap_or(configured_mask_host);
|
||||
MaskTcpTarget {
|
||||
@@ -858,7 +878,8 @@ pub async fn handle_bad_client<R, W>(
|
||||
return;
|
||||
}
|
||||
|
||||
let exclusive_tcp_target = tls::extract_sni_from_client_hello(initial_data)
|
||||
let client_sni = tls::extract_sni_from_client_hello(initial_data);
|
||||
let exclusive_tcp_target = client_sni
|
||||
.as_deref()
|
||||
.and_then(|sni| exclusive_mask_target_for_sni(config, sni));
|
||||
|
||||
@@ -943,8 +964,9 @@ pub async fn handle_bad_client<R, W>(
|
||||
return;
|
||||
}
|
||||
|
||||
let mask_target = exclusive_tcp_target
|
||||
.unwrap_or_else(|| mask_tcp_target_for_initial_data(config, initial_data));
|
||||
let mask_target = exclusive_tcp_target.unwrap_or_else(|| {
|
||||
default_mask_tcp_target_for_initial_data(config, initial_data, client_sni.as_deref())
|
||||
});
|
||||
let mask_host = mask_target.host;
|
||||
let mask_port = mask_target.port;
|
||||
|
||||
|
||||
@@ -550,9 +550,7 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
|
||||
this.counters.touch(Instant::now(), this.epoch);
|
||||
|
||||
this.stats
|
||||
.add_user_octets_from_handle(this.user_stats.as_ref(), n_to_charge);
|
||||
this.stats
|
||||
.increment_user_msgs_from_handle(this.user_stats.as_ref());
|
||||
.add_user_traffic_from_handle(this.user_stats.as_ref(), n_to_charge);
|
||||
if this.traffic_lease.is_some() {
|
||||
this.c2s_rate_debt_bytes =
|
||||
this.c2s_rate_debt_bytes.saturating_add(n_to_charge);
|
||||
@@ -718,9 +716,7 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
|
||||
this.counters.touch(Instant::now(), this.epoch);
|
||||
|
||||
this.stats
|
||||
.add_user_octets_to_handle(this.user_stats.as_ref(), n_to_charge);
|
||||
this.stats
|
||||
.increment_user_msgs_to_handle(this.user_stats.as_ref());
|
||||
.add_user_traffic_to_handle(this.user_stats.as_ref(), n_to_charge);
|
||||
|
||||
if let (Some(limit), Some(remaining)) = (this.quota_limit, remaining_before) {
|
||||
if should_immediate_quota_check(remaining, n_to_charge) {
|
||||
|
||||
@@ -474,6 +474,30 @@ impl Stats {
|
||||
.fetch_add(bytes, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn add_user_traffic_from_handle(&self, user_stats: &UserStats, bytes: u64) {
|
||||
if !self.telemetry_user_enabled() {
|
||||
return;
|
||||
}
|
||||
self.touch_user_stats(user_stats);
|
||||
user_stats
|
||||
.octets_from_client
|
||||
.fetch_add(bytes, Ordering::Relaxed);
|
||||
user_stats.msgs_from_client.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn add_user_traffic_to_handle(&self, user_stats: &UserStats, bytes: u64) {
|
||||
if !self.telemetry_user_enabled() {
|
||||
return;
|
||||
}
|
||||
self.touch_user_stats(user_stats);
|
||||
user_stats
|
||||
.octets_to_client
|
||||
.fetch_add(bytes, Ordering::Relaxed);
|
||||
user_stats.msgs_to_client.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn increment_user_msgs_from_handle(&self, user_stats: &UserStats) {
|
||||
if !self.telemetry_user_enabled() {
|
||||
@@ -2678,9 +2702,10 @@ struct ReplayEntry {
|
||||
}
|
||||
|
||||
struct ReplayShard {
|
||||
cache: LruCache<Box<[u8]>, ReplayEntry>,
|
||||
queue: VecDeque<(Instant, Box<[u8]>, u64)>,
|
||||
cache: LruCache<Arc<[u8]>, ReplayEntry>,
|
||||
queue: VecDeque<(Instant, Arc<[u8]>, u64)>,
|
||||
seq_counter: u64,
|
||||
capacity: usize,
|
||||
}
|
||||
|
||||
impl ReplayShard {
|
||||
@@ -2689,6 +2714,7 @@ impl ReplayShard {
|
||||
cache: LruCache::new(cap),
|
||||
queue: VecDeque::with_capacity(cap.get()),
|
||||
seq_counter: 0,
|
||||
capacity: cap.get(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2709,15 +2735,19 @@ impl ReplayShard {
|
||||
if *ts >= cutoff {
|
||||
break;
|
||||
}
|
||||
let (_, key, queue_seq) = self.queue.pop_front().unwrap();
|
||||
self.evict_queue_front();
|
||||
}
|
||||
}
|
||||
|
||||
// Use key.as_ref() to get &[u8] — avoids Borrow<Q> ambiguity
|
||||
// between Borrow<[u8]> and Borrow<Box<[u8]>>
|
||||
if let Some(entry) = self.cache.peek(key.as_ref())
|
||||
&& entry.seq == queue_seq
|
||||
{
|
||||
self.cache.pop(key.as_ref());
|
||||
}
|
||||
fn evict_queue_front(&mut self) {
|
||||
let Some((_, key, queue_seq)) = self.queue.pop_front() else {
|
||||
return;
|
||||
};
|
||||
|
||||
if let Some(entry) = self.cache.peek(key.as_ref())
|
||||
&& entry.seq == queue_seq
|
||||
{
|
||||
self.cache.pop(key.as_ref());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2738,13 +2768,16 @@ impl ReplayShard {
|
||||
if self.cache.peek(key).is_some() {
|
||||
return;
|
||||
}
|
||||
while self.queue.len() >= self.capacity {
|
||||
self.evict_queue_front();
|
||||
}
|
||||
|
||||
let seq = self.next_seq();
|
||||
let boxed_key: Box<[u8]> = key.into();
|
||||
let shared_key: Arc<[u8]> = Arc::from(key);
|
||||
|
||||
self.cache
|
||||
.put(boxed_key.clone(), ReplayEntry { seen_at: now, seq });
|
||||
self.queue.push_back((now, boxed_key, seq));
|
||||
.put(Arc::clone(&shared_key), ReplayEntry { seen_at: now, seq });
|
||||
self.queue.push_back((now, shared_key, seq));
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
|
||||
@@ -5,10 +5,14 @@ use crate::crypto::{AesCbc, crc32, crc32c};
|
||||
use crate::error::{ProxyError, Result};
|
||||
use crate::protocol::constants::*;
|
||||
|
||||
const RPC_WRITER_FRAME_BUF_SHRINK_THRESHOLD: usize = 256 * 1024;
|
||||
const RPC_WRITER_FRAME_BUF_RETAIN: usize = 64 * 1024;
|
||||
|
||||
/// Commands sent to dedicated writer tasks to avoid mutex contention on TCP writes.
|
||||
pub(crate) enum WriterCommand {
|
||||
Data(Bytes),
|
||||
DataAndFlush(Bytes),
|
||||
ControlAndFlush([u8; 12]),
|
||||
Close,
|
||||
}
|
||||
|
||||
@@ -42,15 +46,35 @@ pub(crate) fn rpc_crc(mode: RpcChecksumMode, data: &[u8]) -> u32 {
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a fixed-size control payload without heap allocation.
|
||||
pub(crate) fn build_control_payload(tag: u32, value: u64) -> [u8; 12] {
|
||||
let mut payload = [0u8; 12];
|
||||
payload[..4].copy_from_slice(&tag.to_le_bytes());
|
||||
payload[4..].copy_from_slice(&value.to_le_bytes());
|
||||
payload
|
||||
}
|
||||
|
||||
pub(crate) fn build_rpc_frame(seq_no: i32, payload: &[u8], crc_mode: RpcChecksumMode) -> Vec<u8> {
|
||||
let total_len = (4 + 4 + payload.len() + 4) as u32;
|
||||
let mut frame = Vec::with_capacity(total_len as usize);
|
||||
let mut frame = Vec::new();
|
||||
build_rpc_frame_into(&mut frame, seq_no, payload, crc_mode);
|
||||
frame
|
||||
}
|
||||
|
||||
fn build_rpc_frame_into(
|
||||
frame: &mut Vec<u8>,
|
||||
seq_no: i32,
|
||||
payload: &[u8],
|
||||
crc_mode: RpcChecksumMode,
|
||||
) {
|
||||
let total_len = 4 + 4 + payload.len() + 4;
|
||||
frame.clear();
|
||||
frame.reserve(total_len + 15);
|
||||
let total_len = total_len as u32;
|
||||
frame.extend_from_slice(&total_len.to_le_bytes());
|
||||
frame.extend_from_slice(&seq_no.to_le_bytes());
|
||||
frame.extend_from_slice(payload);
|
||||
let c = rpc_crc(crc_mode, &frame);
|
||||
frame.extend_from_slice(&c.to_le_bytes());
|
||||
frame
|
||||
}
|
||||
|
||||
pub(crate) async fn read_rpc_frame_plaintext(
|
||||
@@ -218,29 +242,35 @@ pub(crate) struct RpcWriter {
|
||||
pub(crate) iv: [u8; 16],
|
||||
pub(crate) seq_no: i32,
|
||||
pub(crate) crc_mode: RpcChecksumMode,
|
||||
pub(crate) frame_buf: Vec<u8>,
|
||||
}
|
||||
|
||||
impl RpcWriter {
|
||||
pub(crate) async fn send(&mut self, payload: &[u8]) -> Result<()> {
|
||||
let frame = build_rpc_frame(self.seq_no, payload, self.crc_mode);
|
||||
build_rpc_frame_into(&mut self.frame_buf, self.seq_no, payload, self.crc_mode);
|
||||
self.seq_no = self.seq_no.wrapping_add(1);
|
||||
|
||||
let pad = (16 - (frame.len() % 16)) % 16;
|
||||
let mut buf = frame;
|
||||
let pad = (16 - (self.frame_buf.len() % 16)) % 16;
|
||||
let pad_pattern: [u8; 4] = [0x04, 0x00, 0x00, 0x00];
|
||||
for i in 0..pad {
|
||||
buf.push(pad_pattern[i % 4]);
|
||||
self.frame_buf.push(pad_pattern[i % 4]);
|
||||
}
|
||||
|
||||
let cipher = AesCbc::new(self.key, self.iv);
|
||||
cipher
|
||||
.encrypt_in_place(&mut buf)
|
||||
.encrypt_in_place(&mut self.frame_buf)
|
||||
.map_err(|e| ProxyError::Crypto(format!("{e}")))?;
|
||||
|
||||
if buf.len() >= 16 {
|
||||
self.iv.copy_from_slice(&buf[buf.len() - 16..]);
|
||||
if self.frame_buf.len() >= 16 {
|
||||
self.iv
|
||||
.copy_from_slice(&self.frame_buf[self.frame_buf.len() - 16..]);
|
||||
}
|
||||
self.writer.write_all(&buf).await.map_err(ProxyError::Io)
|
||||
let write_result = self.writer.write_all(&self.frame_buf).await;
|
||||
if self.frame_buf.capacity() > RPC_WRITER_FRAME_BUF_SHRINK_THRESHOLD {
|
||||
self.frame_buf.clear();
|
||||
self.frame_buf.shrink_to(RPC_WRITER_FRAME_BUF_RETAIN);
|
||||
}
|
||||
write_result.map_err(ProxyError::Io)
|
||||
}
|
||||
|
||||
pub(crate) async fn send_and_flush(&mut self, payload: &[u8]) -> Result<()> {
|
||||
|
||||
@@ -5,7 +5,6 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use bytes::Bytes;
|
||||
use bytes::BytesMut;
|
||||
use rand::RngExt;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -17,7 +16,7 @@ use crate::crypto::SecureRandom;
|
||||
use crate::error::{ProxyError, Result};
|
||||
use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32};
|
||||
|
||||
use super::codec::{RpcWriter, WriterCommand};
|
||||
use super::codec::{RpcWriter, WriterCommand, build_control_payload};
|
||||
use super::pool::{MePool, MeWriter, WriterContour};
|
||||
use super::reader::reader_loop;
|
||||
use super::wire::build_proxy_req_payload;
|
||||
@@ -61,6 +60,9 @@ async fn writer_command_loop(
|
||||
Some(WriterCommand::DataAndFlush(payload)) => {
|
||||
rpc_writer.send_and_flush(&payload).await?;
|
||||
}
|
||||
Some(WriterCommand::ControlAndFlush(payload)) => {
|
||||
rpc_writer.send_and_flush(&payload).await?;
|
||||
}
|
||||
Some(WriterCommand::Close) | None => return Ok(()),
|
||||
}
|
||||
}
|
||||
@@ -130,9 +132,7 @@ async fn ping_loop(
|
||||
_ = tokio::time::sleep(wait) => {}
|
||||
}
|
||||
let sent_id = ping_id;
|
||||
let mut p = Vec::with_capacity(12);
|
||||
p.extend_from_slice(&RPC_PING_U32.to_le_bytes());
|
||||
p.extend_from_slice(&sent_id.to_le_bytes());
|
||||
let payload = build_control_payload(RPC_PING_U32, sent_id as u64);
|
||||
{
|
||||
let mut tracker = ping_tracker_ping.lock().await;
|
||||
cleanup_tick = cleanup_tick.wrapping_add(1);
|
||||
@@ -149,7 +149,7 @@ async fn ping_loop(
|
||||
ping_id = ping_id.wrapping_add(1);
|
||||
stats_ping.increment_me_keepalive_sent();
|
||||
if tx_ping
|
||||
.send(WriterCommand::DataAndFlush(Bytes::from(p)))
|
||||
.send(WriterCommand::ControlAndFlush(payload))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
@@ -253,12 +253,10 @@ async fn rpc_proxy_req_signal_loop(
|
||||
stats_signal.increment_me_rpc_proxy_req_signal_response_total();
|
||||
}
|
||||
|
||||
let mut close_payload = Vec::with_capacity(12);
|
||||
close_payload.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
|
||||
close_payload.extend_from_slice(&conn_id.to_le_bytes());
|
||||
let close_payload = build_control_payload(RPC_CLOSE_EXT_U32, conn_id);
|
||||
|
||||
if tx_signal
|
||||
.send(WriterCommand::DataAndFlush(Bytes::from(close_payload)))
|
||||
.send(WriterCommand::ControlAndFlush(close_payload))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
@@ -380,6 +378,7 @@ impl MePool {
|
||||
iv: hs.write_iv,
|
||||
seq_no: 0,
|
||||
crc_mode: hs.crc_mode,
|
||||
frame_buf: Vec::new(),
|
||||
};
|
||||
let writer = MeWriter {
|
||||
id: writer_id,
|
||||
|
||||
@@ -19,7 +19,7 @@ use crate::error::{ProxyError, Result};
|
||||
use crate::protocol::constants::*;
|
||||
use crate::stats::Stats;
|
||||
|
||||
use super::codec::{RpcChecksumMode, WriterCommand, rpc_crc};
|
||||
use super::codec::{RpcChecksumMode, WriterCommand, build_control_payload, rpc_crc};
|
||||
use super::fairness::{
|
||||
AdmissionDecision, DispatchAction, DispatchFeedback, PressureState, SchedulerDecision,
|
||||
WorkerFairnessConfig, WorkerFairnessSnapshot, WorkerFairnessState,
|
||||
@@ -464,10 +464,8 @@ pub(crate) async fn reader_loop(
|
||||
} else if pt == RPC_PING_U32 && body.len() >= 8 {
|
||||
let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap());
|
||||
trace!(ping_id, "RPC_PING -> RPC_PONG");
|
||||
let mut pong = Vec::with_capacity(12);
|
||||
pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes());
|
||||
pong.extend_from_slice(&ping_id.to_le_bytes());
|
||||
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(pong))) {
|
||||
let pong = build_control_payload(RPC_PONG_U32, ping_id as u64);
|
||||
match tx.try_send(WriterCommand::ControlAndFlush(pong)) {
|
||||
Ok(()) => {}
|
||||
Err(TrySendError::Full(_)) => {
|
||||
debug!(ping_id, "PONG dropped: writer command channel is full");
|
||||
@@ -667,10 +665,8 @@ mod tests {
|
||||
}
|
||||
|
||||
async fn send_close_conn(tx: &mpsc::Sender<WriterCommand>, conn_id: u64) {
|
||||
let mut p = Vec::with_capacity(12);
|
||||
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
|
||||
p.extend_from_slice(&conn_id.to_le_bytes());
|
||||
match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
|
||||
let payload = build_control_payload(RPC_CLOSE_CONN_U32, conn_id);
|
||||
match tx.try_send(WriterCommand::ControlAndFlush(payload)) {
|
||||
Ok(()) => {}
|
||||
Err(TrySendError::Full(_)) => {
|
||||
debug!(
|
||||
|
||||
@@ -567,6 +567,29 @@ impl ConnRegistry {
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the active writer and routing metadata from one hot-binding lookup.
|
||||
pub async fn get_writer_with_meta(&self, conn_id: u64) -> Option<(ConnWriter, ConnMeta)> {
|
||||
if !self.routing.map.contains_key(&conn_id) {
|
||||
return None;
|
||||
}
|
||||
|
||||
let hot = self.hot_binding.map.get(&conn_id)?;
|
||||
let writer_id = hot.writer_id;
|
||||
let meta = hot.meta.clone();
|
||||
let writer = self
|
||||
.writers
|
||||
.map
|
||||
.get(&writer_id)
|
||||
.map(|entry| entry.value().clone())?;
|
||||
Some((
|
||||
ConnWriter {
|
||||
writer_id,
|
||||
tx: writer,
|
||||
},
|
||||
meta,
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn active_conn_ids(&self) -> Vec<u64> {
|
||||
let binding = self.binding.inner.lock().await;
|
||||
binding.writer_for_conn.keys().copied().collect()
|
||||
|
||||
@@ -7,7 +7,6 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use bytes::Bytes;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
@@ -17,7 +16,7 @@ use crate::network::IpFamily;
|
||||
use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32};
|
||||
|
||||
use super::MePool;
|
||||
use super::codec::WriterCommand;
|
||||
use super::codec::{WriterCommand, build_control_payload};
|
||||
use super::pool::WriterContour;
|
||||
use super::registry::ConnMeta;
|
||||
use super::wire::build_proxy_req_payload;
|
||||
@@ -47,12 +46,6 @@ impl MePool {
|
||||
tag_override: Option<&[u8]>,
|
||||
) -> Result<()> {
|
||||
let tag = tag_override.or(self.proxy_tag.as_deref());
|
||||
let fallback_meta = ConnMeta {
|
||||
target_dc,
|
||||
client_addr,
|
||||
our_addr,
|
||||
proto_flags,
|
||||
};
|
||||
let build_routed_payload = |effective_our_addr: SocketAddr| {
|
||||
(
|
||||
build_proxy_req_payload(
|
||||
@@ -91,16 +84,13 @@ impl MePool {
|
||||
let mut hybrid_wait_current = hybrid_wait_step;
|
||||
|
||||
loop {
|
||||
let current_meta = self
|
||||
.registry
|
||||
.get_meta(conn_id)
|
||||
.await
|
||||
.unwrap_or_else(|| fallback_meta.clone());
|
||||
let (current_payload, _) = build_routed_payload(current_meta.our_addr);
|
||||
if let Some(current) = self.registry.get_writer(conn_id).await {
|
||||
if let Some((current, current_meta)) =
|
||||
self.registry.get_writer_with_meta(conn_id).await
|
||||
{
|
||||
let (current_payload, _) = build_routed_payload(current_meta.our_addr);
|
||||
match current
|
||||
.tx
|
||||
.try_send(WriterCommand::Data(current_payload.clone()))
|
||||
.try_send(WriterCommand::Data(current_payload))
|
||||
{
|
||||
Ok(()) => {
|
||||
self.note_hybrid_route_success();
|
||||
@@ -452,7 +442,7 @@ impl MePool {
|
||||
self.remove_writer_and_close_clients(w.id).await;
|
||||
continue;
|
||||
}
|
||||
permit.send(WriterCommand::Data(payload.clone()));
|
||||
permit.send(WriterCommand::Data(payload));
|
||||
self.stats
|
||||
.increment_me_writer_pick_success_try_total(pick_mode);
|
||||
if w.generation < self.current_generation() {
|
||||
@@ -520,7 +510,7 @@ impl MePool {
|
||||
self.remove_writer_and_close_clients(w.id).await;
|
||||
continue;
|
||||
}
|
||||
permit.send(WriterCommand::Data(payload.clone()));
|
||||
permit.send(WriterCommand::Data(payload));
|
||||
self.stats
|
||||
.increment_me_writer_pick_success_fallback_total(pick_mode);
|
||||
if w.generation < self.current_generation() {
|
||||
@@ -735,11 +725,9 @@ impl MePool {
|
||||
|
||||
pub async fn send_close(self: &Arc<Self>, conn_id: u64) -> Result<()> {
|
||||
if let Some(w) = self.registry.get_writer(conn_id).await {
|
||||
let mut p = Vec::with_capacity(12);
|
||||
p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes());
|
||||
p.extend_from_slice(&conn_id.to_le_bytes());
|
||||
let payload = build_control_payload(RPC_CLOSE_EXT_U32, conn_id);
|
||||
if w.tx
|
||||
.send(WriterCommand::DataAndFlush(Bytes::from(p)))
|
||||
.send(WriterCommand::ControlAndFlush(payload))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
@@ -756,10 +744,8 @@ impl MePool {
|
||||
|
||||
pub async fn send_close_conn(self: &Arc<Self>, conn_id: u64) -> Result<()> {
|
||||
if let Some(w) = self.registry.get_writer(conn_id).await {
|
||||
let mut p = Vec::with_capacity(12);
|
||||
p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes());
|
||||
p.extend_from_slice(&conn_id.to_le_bytes());
|
||||
match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) {
|
||||
let payload = build_control_payload(RPC_CLOSE_CONN_U32, conn_id);
|
||||
match w.tx.try_send(WriterCommand::ControlAndFlush(payload)) {
|
||||
Ok(()) => {}
|
||||
Err(TrySendError::Full(cmd)) => {
|
||||
let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await;
|
||||
|
||||
@@ -165,6 +165,7 @@ async fn recv_data_count(rx: &mut mpsc::Receiver<WriterCommand>, budget: Duratio
|
||||
match tokio::time::timeout(remaining.min(Duration::from_millis(10)), rx.recv()).await {
|
||||
Ok(Some(WriterCommand::Data(_))) => data_count += 1,
|
||||
Ok(Some(WriterCommand::DataAndFlush(_))) => data_count += 1,
|
||||
Ok(Some(WriterCommand::ControlAndFlush(_))) => data_count += 1,
|
||||
Ok(Some(WriterCommand::Close)) => {}
|
||||
Ok(None) => break,
|
||||
Err(_) => break,
|
||||
|
||||
Reference in New Issue
Block a user