mirror of https://github.com/telemt/telemt.git
Drafting fixes for Apple/XNU Darwin Connectivity issues
Co-Authored-By: Aleksandr Kalashnikov <33665156+sleep3r@users.noreply.github.com>
This commit is contained in:
parent
07d774a82a
commit
65da1f91ec
|
|
@ -50,6 +50,7 @@ pub(super) struct RuntimeGatesData {
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
pub(super) struct EffectiveTimeoutLimits {
|
pub(super) struct EffectiveTimeoutLimits {
|
||||||
|
pub(super) client_first_byte_idle_secs: u64,
|
||||||
pub(super) client_handshake_secs: u64,
|
pub(super) client_handshake_secs: u64,
|
||||||
pub(super) tg_connect_secs: u64,
|
pub(super) tg_connect_secs: u64,
|
||||||
pub(super) client_keepalive_secs: u64,
|
pub(super) client_keepalive_secs: u64,
|
||||||
|
|
@ -227,6 +228,7 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD
|
||||||
me_reinit_every_secs: cfg.general.effective_me_reinit_every_secs(),
|
me_reinit_every_secs: cfg.general.effective_me_reinit_every_secs(),
|
||||||
me_pool_force_close_secs: cfg.general.effective_me_pool_force_close_secs(),
|
me_pool_force_close_secs: cfg.general.effective_me_pool_force_close_secs(),
|
||||||
timeouts: EffectiveTimeoutLimits {
|
timeouts: EffectiveTimeoutLimits {
|
||||||
|
client_first_byte_idle_secs: cfg.timeouts.client_first_byte_idle_secs,
|
||||||
client_handshake_secs: cfg.timeouts.client_handshake,
|
client_handshake_secs: cfg.timeouts.client_handshake,
|
||||||
tg_connect_secs: cfg.general.tg_connect,
|
tg_connect_secs: cfg.general.tg_connect,
|
||||||
client_keepalive_secs: cfg.timeouts.client_keepalive,
|
client_keepalive_secs: cfg.timeouts.client_keepalive,
|
||||||
|
|
|
||||||
|
|
@ -610,7 +610,8 @@ ip = "0.0.0.0"
|
||||||
ip = "::"
|
ip = "::"
|
||||||
|
|
||||||
[timeouts]
|
[timeouts]
|
||||||
client_handshake = 15
|
client_first_byte_idle_secs = 300
|
||||||
|
client_handshake = 60
|
||||||
client_keepalive = 60
|
client_keepalive = 60
|
||||||
client_ack = 300
|
client_ack = 300
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -110,7 +110,11 @@ pub(crate) fn default_replay_window_secs() -> u64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn default_handshake_timeout() -> u64 {
|
pub(crate) fn default_handshake_timeout() -> u64 {
|
||||||
30
|
60
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_client_first_byte_idle_secs() -> u64 {
|
||||||
|
300
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn default_relay_idle_policy_v2_enabled() -> bool {
|
pub(crate) fn default_relay_idle_policy_v2_enabled() -> bool {
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,28 @@ fn remove_temp_config(path: &PathBuf) {
|
||||||
let _ = fs::remove_file(path);
|
let _ = fs::remove_file(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn default_timeouts_enable_apple_compatible_handshake_profile() {
|
||||||
|
let cfg = ProxyConfig::default();
|
||||||
|
assert_eq!(cfg.timeouts.client_first_byte_idle_secs, 300);
|
||||||
|
assert_eq!(cfg.timeouts.client_handshake, 60);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn load_accepts_zero_first_byte_idle_timeout_as_legacy_opt_out() {
|
||||||
|
let path = write_temp_config(
|
||||||
|
r#"
|
||||||
|
[timeouts]
|
||||||
|
client_first_byte_idle_secs = 0
|
||||||
|
"#,
|
||||||
|
);
|
||||||
|
|
||||||
|
let cfg = ProxyConfig::load(&path).expect("config with zero first-byte idle timeout must load");
|
||||||
|
assert_eq!(cfg.timeouts.client_first_byte_idle_secs, 0);
|
||||||
|
|
||||||
|
remove_temp_config(&path);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn load_rejects_relay_hard_idle_smaller_than_soft_idle_with_clear_error() {
|
fn load_rejects_relay_hard_idle_smaller_than_soft_idle_with_clear_error() {
|
||||||
let path = write_temp_config(
|
let path = write_temp_config(
|
||||||
|
|
|
||||||
|
|
@ -1319,6 +1319,12 @@ impl Default for ServerConfig {
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct TimeoutsConfig {
|
pub struct TimeoutsConfig {
|
||||||
|
/// Maximum idle wait in seconds for the first client byte before handshake parsing starts.
|
||||||
|
/// `0` disables the separate idle phase and keeps legacy timeout behavior.
|
||||||
|
#[serde(default = "default_client_first_byte_idle_secs")]
|
||||||
|
pub client_first_byte_idle_secs: u64,
|
||||||
|
|
||||||
|
/// Maximum active handshake duration in seconds after the first client byte is received.
|
||||||
#[serde(default = "default_handshake_timeout")]
|
#[serde(default = "default_handshake_timeout")]
|
||||||
pub client_handshake: u64,
|
pub client_handshake: u64,
|
||||||
|
|
||||||
|
|
@ -1358,6 +1364,7 @@ pub struct TimeoutsConfig {
|
||||||
impl Default for TimeoutsConfig {
|
impl Default for TimeoutsConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
client_first_byte_idle_secs: default_client_first_byte_idle_secs(),
|
||||||
client_handshake: default_handshake_timeout(),
|
client_handshake: default_handshake_timeout(),
|
||||||
relay_idle_policy_v2_enabled: default_relay_idle_policy_v2_enabled(),
|
relay_idle_policy_v2_enabled: default_relay_idle_policy_v2_enabled(),
|
||||||
relay_client_idle_soft_secs: default_relay_client_idle_soft_secs(),
|
relay_client_idle_soft_secs: default_relay_client_idle_soft_secs(),
|
||||||
|
|
|
||||||
|
|
@ -416,16 +416,68 @@ where
|
||||||
|
|
||||||
debug!(peer = %real_peer, "New connection (generic stream)");
|
debug!(peer = %real_peer, "New connection (generic stream)");
|
||||||
|
|
||||||
|
let first_byte = if config.timeouts.client_first_byte_idle_secs == 0 {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
let idle_timeout = Duration::from_secs(config.timeouts.client_first_byte_idle_secs);
|
||||||
|
let mut first_byte = [0u8; 1];
|
||||||
|
match timeout(idle_timeout, stream.read(&mut first_byte)).await {
|
||||||
|
Ok(Ok(0)) => {
|
||||||
|
debug!(peer = %real_peer, "Connection closed before first client byte");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Ok(Ok(_)) => Some(first_byte[0]),
|
||||||
|
Ok(Err(e))
|
||||||
|
if matches!(
|
||||||
|
e.kind(),
|
||||||
|
std::io::ErrorKind::UnexpectedEof
|
||||||
|
| std::io::ErrorKind::ConnectionReset
|
||||||
|
| std::io::ErrorKind::ConnectionAborted
|
||||||
|
| std::io::ErrorKind::BrokenPipe
|
||||||
|
| std::io::ErrorKind::NotConnected
|
||||||
|
) =>
|
||||||
|
{
|
||||||
|
debug!(
|
||||||
|
peer = %real_peer,
|
||||||
|
error = %e,
|
||||||
|
"Connection closed before first client byte"
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
debug!(
|
||||||
|
peer = %real_peer,
|
||||||
|
error = %e,
|
||||||
|
"Failed while waiting for first client byte"
|
||||||
|
);
|
||||||
|
return Err(ProxyError::Io(e));
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
debug!(
|
||||||
|
peer = %real_peer,
|
||||||
|
idle_secs = config.timeouts.client_first_byte_idle_secs,
|
||||||
|
"Closing idle pooled connection before first client byte"
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let handshake_timeout = handshake_timeout_with_mask_grace(&config);
|
let handshake_timeout = handshake_timeout_with_mask_grace(&config);
|
||||||
let stats_for_timeout = stats.clone();
|
let stats_for_timeout = stats.clone();
|
||||||
let config_for_timeout = config.clone();
|
let config_for_timeout = config.clone();
|
||||||
let beobachten_for_timeout = beobachten.clone();
|
let beobachten_for_timeout = beobachten.clone();
|
||||||
let peer_for_timeout = real_peer.ip();
|
let peer_for_timeout = real_peer.ip();
|
||||||
|
|
||||||
// Phase 1: handshake (with timeout)
|
// Phase 2: active handshake (with timeout after the first client byte)
|
||||||
let outcome = match timeout(handshake_timeout, async {
|
let outcome = match timeout(handshake_timeout, async {
|
||||||
let mut first_bytes = [0u8; 5];
|
let mut first_bytes = [0u8; 5];
|
||||||
stream.read_exact(&mut first_bytes).await?;
|
if let Some(first_byte) = first_byte {
|
||||||
|
first_bytes[0] = first_byte;
|
||||||
|
stream.read_exact(&mut first_bytes[1..]).await?;
|
||||||
|
} else {
|
||||||
|
stream.read_exact(&mut first_bytes).await?;
|
||||||
|
}
|
||||||
|
|
||||||
let is_tls = tls::is_tls_handshake(&first_bytes[..3]);
|
let is_tls = tls::is_tls_handshake(&first_bytes[..3]);
|
||||||
debug!(peer = %real_peer, is_tls = is_tls, "Handshake type detected");
|
debug!(peer = %real_peer, is_tls = is_tls, "Handshake type detected");
|
||||||
|
|
@ -736,36 +788,9 @@ impl RunningClientHandler {
|
||||||
debug!(peer = %peer, error = %e, "Failed to configure client socket");
|
debug!(peer = %peer, error = %e, "Failed to configure client socket");
|
||||||
}
|
}
|
||||||
|
|
||||||
let handshake_timeout = handshake_timeout_with_mask_grace(&self.config);
|
let outcome = match self.do_handshake().await? {
|
||||||
let stats = self.stats.clone();
|
Some(outcome) => outcome,
|
||||||
let config_for_timeout = self.config.clone();
|
None => return Ok(()),
|
||||||
let beobachten_for_timeout = self.beobachten.clone();
|
|
||||||
let peer_for_timeout = peer.ip();
|
|
||||||
|
|
||||||
// Phase 1: handshake (with timeout)
|
|
||||||
let outcome = match timeout(handshake_timeout, self.do_handshake()).await {
|
|
||||||
Ok(Ok(outcome)) => outcome,
|
|
||||||
Ok(Err(e)) => {
|
|
||||||
debug!(peer = %peer, error = %e, "Handshake failed");
|
|
||||||
record_handshake_failure_class(
|
|
||||||
&beobachten_for_timeout,
|
|
||||||
&config_for_timeout,
|
|
||||||
peer_for_timeout,
|
|
||||||
&e,
|
|
||||||
);
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
stats.increment_handshake_timeouts();
|
|
||||||
debug!(peer = %peer, "Handshake timeout");
|
|
||||||
record_beobachten_class(
|
|
||||||
&beobachten_for_timeout,
|
|
||||||
&config_for_timeout,
|
|
||||||
peer_for_timeout,
|
|
||||||
"other",
|
|
||||||
);
|
|
||||||
return Err(ProxyError::TgHandshakeTimeout);
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Phase 2: relay (WITHOUT handshake timeout — relay has its own activity timeouts)
|
// Phase 2: relay (WITHOUT handshake timeout — relay has its own activity timeouts)
|
||||||
|
|
@ -774,7 +799,7 @@ impl RunningClientHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn do_handshake(mut self) -> Result<HandshakeOutcome> {
|
async fn do_handshake(mut self) -> Result<Option<HandshakeOutcome>> {
|
||||||
let mut local_addr = self.stream.local_addr().map_err(ProxyError::Io)?;
|
let mut local_addr = self.stream.local_addr().map_err(ProxyError::Io)?;
|
||||||
|
|
||||||
if self.proxy_protocol_enabled {
|
if self.proxy_protocol_enabled {
|
||||||
|
|
@ -849,19 +874,107 @@ impl RunningClientHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut first_bytes = [0u8; 5];
|
let first_byte = if self.config.timeouts.client_first_byte_idle_secs == 0 {
|
||||||
self.stream.read_exact(&mut first_bytes).await?;
|
None
|
||||||
|
|
||||||
let is_tls = tls::is_tls_handshake(&first_bytes[..3]);
|
|
||||||
let peer = self.peer;
|
|
||||||
|
|
||||||
debug!(peer = %peer, is_tls = is_tls, "Handshake type detected");
|
|
||||||
|
|
||||||
if is_tls {
|
|
||||||
self.handle_tls_client(first_bytes, local_addr).await
|
|
||||||
} else {
|
} else {
|
||||||
self.handle_direct_client(first_bytes, local_addr).await
|
let idle_timeout = Duration::from_secs(self.config.timeouts.client_first_byte_idle_secs);
|
||||||
}
|
let mut first_byte = [0u8; 1];
|
||||||
|
match timeout(idle_timeout, self.stream.read(&mut first_byte)).await {
|
||||||
|
Ok(Ok(0)) => {
|
||||||
|
debug!(peer = %self.peer, "Connection closed before first client byte");
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
Ok(Ok(_)) => Some(first_byte[0]),
|
||||||
|
Ok(Err(e))
|
||||||
|
if matches!(
|
||||||
|
e.kind(),
|
||||||
|
std::io::ErrorKind::UnexpectedEof
|
||||||
|
| std::io::ErrorKind::ConnectionReset
|
||||||
|
| std::io::ErrorKind::ConnectionAborted
|
||||||
|
| std::io::ErrorKind::BrokenPipe
|
||||||
|
| std::io::ErrorKind::NotConnected
|
||||||
|
) =>
|
||||||
|
{
|
||||||
|
debug!(
|
||||||
|
peer = %self.peer,
|
||||||
|
error = %e,
|
||||||
|
"Connection closed before first client byte"
|
||||||
|
);
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
debug!(
|
||||||
|
peer = %self.peer,
|
||||||
|
error = %e,
|
||||||
|
"Failed while waiting for first client byte"
|
||||||
|
);
|
||||||
|
return Err(ProxyError::Io(e));
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
debug!(
|
||||||
|
peer = %self.peer,
|
||||||
|
idle_secs = self.config.timeouts.client_first_byte_idle_secs,
|
||||||
|
"Closing idle pooled connection before first client byte"
|
||||||
|
);
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let handshake_timeout = handshake_timeout_with_mask_grace(&self.config);
|
||||||
|
let stats = self.stats.clone();
|
||||||
|
let config_for_timeout = self.config.clone();
|
||||||
|
let beobachten_for_timeout = self.beobachten.clone();
|
||||||
|
let peer_for_timeout = self.peer.ip();
|
||||||
|
let peer_for_log = self.peer;
|
||||||
|
|
||||||
|
let outcome = match timeout(handshake_timeout, async {
|
||||||
|
let mut first_bytes = [0u8; 5];
|
||||||
|
if let Some(first_byte) = first_byte {
|
||||||
|
first_bytes[0] = first_byte;
|
||||||
|
self.stream.read_exact(&mut first_bytes[1..]).await?;
|
||||||
|
} else {
|
||||||
|
self.stream.read_exact(&mut first_bytes).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let is_tls = tls::is_tls_handshake(&first_bytes[..3]);
|
||||||
|
let peer = self.peer;
|
||||||
|
|
||||||
|
debug!(peer = %peer, is_tls = is_tls, "Handshake type detected");
|
||||||
|
|
||||||
|
if is_tls {
|
||||||
|
self.handle_tls_client(first_bytes, local_addr).await
|
||||||
|
} else {
|
||||||
|
self.handle_direct_client(first_bytes, local_addr).await
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(Ok(outcome)) => outcome,
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
debug!(peer = %peer_for_log, error = %e, "Handshake failed");
|
||||||
|
record_handshake_failure_class(
|
||||||
|
&beobachten_for_timeout,
|
||||||
|
&config_for_timeout,
|
||||||
|
peer_for_timeout,
|
||||||
|
&e,
|
||||||
|
);
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
stats.increment_handshake_timeouts();
|
||||||
|
debug!(peer = %peer_for_log, "Handshake timeout");
|
||||||
|
record_beobachten_class(
|
||||||
|
&beobachten_for_timeout,
|
||||||
|
&config_for_timeout,
|
||||||
|
peer_for_timeout,
|
||||||
|
"other",
|
||||||
|
);
|
||||||
|
return Err(ProxyError::TgHandshakeTimeout);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Some(outcome))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_tls_client(
|
async fn handle_tls_client(
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,10 @@
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::config::{UpstreamConfig, UpstreamType};
|
use crate::config::{UpstreamConfig, UpstreamType};
|
||||||
use crate::crypto::AesCtr;
|
use crate::crypto::{AesCtr, sha256, sha256_hmac};
|
||||||
use crate::crypto::sha256_hmac;
|
use crate::protocol::constants::{
|
||||||
use crate::protocol::constants::ProtoTag;
|
DC_IDX_POS, HANDSHAKE_LEN, IV_LEN, PREKEY_LEN, PROTO_TAG_POS, ProtoTag, SKIP_LEN,
|
||||||
|
TLS_RECORD_CHANGE_CIPHER,
|
||||||
|
};
|
||||||
use crate::protocol::tls;
|
use crate::protocol::tls;
|
||||||
use crate::proxy::handshake::HandshakeSuccess;
|
use crate::proxy::handshake::HandshakeSuccess;
|
||||||
use crate::stream::{CryptoReader, CryptoWriter};
|
use crate::stream::{CryptoReader, CryptoWriter};
|
||||||
|
|
@ -1319,6 +1321,163 @@ async fn running_client_handler_increments_connects_all_exactly_once() {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test(start_paused = true)]
|
||||||
|
async fn idle_pooled_connection_closes_cleanly_in_generic_stream_path() {
|
||||||
|
let mut cfg = ProxyConfig::default();
|
||||||
|
cfg.general.beobachten = false;
|
||||||
|
cfg.timeouts.client_first_byte_idle_secs = 1;
|
||||||
|
|
||||||
|
let config = Arc::new(cfg);
|
||||||
|
let stats = Arc::new(Stats::new());
|
||||||
|
let upstream_manager = Arc::new(UpstreamManager::new(
|
||||||
|
vec![UpstreamConfig {
|
||||||
|
upstream_type: UpstreamType::Direct {
|
||||||
|
interface: None,
|
||||||
|
bind_addresses: None,
|
||||||
|
},
|
||||||
|
weight: 1,
|
||||||
|
enabled: true,
|
||||||
|
scopes: String::new(),
|
||||||
|
selected_scope: String::new(),
|
||||||
|
}],
|
||||||
|
1,
|
||||||
|
1,
|
||||||
|
1,
|
||||||
|
10,
|
||||||
|
1,
|
||||||
|
false,
|
||||||
|
stats.clone(),
|
||||||
|
));
|
||||||
|
let replay_checker = Arc::new(ReplayChecker::new(128, Duration::from_secs(60)));
|
||||||
|
let buffer_pool = Arc::new(BufferPool::new());
|
||||||
|
let rng = Arc::new(SecureRandom::new());
|
||||||
|
let route_runtime = Arc::new(RouteRuntimeController::new(RelayRouteMode::Direct));
|
||||||
|
let ip_tracker = Arc::new(UserIpTracker::new());
|
||||||
|
let beobachten = Arc::new(BeobachtenStore::new());
|
||||||
|
|
||||||
|
let (server_side, _client_side) = duplex(4096);
|
||||||
|
let peer: SocketAddr = "198.51.100.169:55200".parse().unwrap();
|
||||||
|
|
||||||
|
let handler = tokio::spawn(handle_client_stream(
|
||||||
|
server_side,
|
||||||
|
peer,
|
||||||
|
config,
|
||||||
|
stats.clone(),
|
||||||
|
upstream_manager,
|
||||||
|
replay_checker,
|
||||||
|
buffer_pool,
|
||||||
|
rng,
|
||||||
|
None,
|
||||||
|
route_runtime,
|
||||||
|
None,
|
||||||
|
ip_tracker,
|
||||||
|
beobachten,
|
||||||
|
false,
|
||||||
|
));
|
||||||
|
|
||||||
|
// Let the spawned handler arm the idle-phase timeout before advancing paused time.
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
tokio::time::advance(Duration::from_secs(2)).await;
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
|
||||||
|
let result = tokio::time::timeout(Duration::from_secs(1), handler)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
assert!(result.is_ok());
|
||||||
|
assert_eq!(stats.get_handshake_timeouts(), 0);
|
||||||
|
assert_eq!(stats.get_connects_bad(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(start_paused = true)]
|
||||||
|
async fn idle_pooled_connection_closes_cleanly_in_client_handler_path() {
|
||||||
|
let front_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
let front_addr = front_listener.local_addr().unwrap();
|
||||||
|
|
||||||
|
let mut cfg = ProxyConfig::default();
|
||||||
|
cfg.general.beobachten = false;
|
||||||
|
cfg.timeouts.client_first_byte_idle_secs = 1;
|
||||||
|
|
||||||
|
let config = Arc::new(cfg);
|
||||||
|
let stats = Arc::new(Stats::new());
|
||||||
|
let upstream_manager = Arc::new(UpstreamManager::new(
|
||||||
|
vec![UpstreamConfig {
|
||||||
|
upstream_type: UpstreamType::Direct {
|
||||||
|
interface: None,
|
||||||
|
bind_addresses: None,
|
||||||
|
},
|
||||||
|
weight: 1,
|
||||||
|
enabled: true,
|
||||||
|
scopes: String::new(),
|
||||||
|
selected_scope: String::new(),
|
||||||
|
}],
|
||||||
|
1,
|
||||||
|
1,
|
||||||
|
1,
|
||||||
|
10,
|
||||||
|
1,
|
||||||
|
false,
|
||||||
|
stats.clone(),
|
||||||
|
));
|
||||||
|
let replay_checker = Arc::new(ReplayChecker::new(128, Duration::from_secs(60)));
|
||||||
|
let buffer_pool = Arc::new(BufferPool::new());
|
||||||
|
let rng = Arc::new(SecureRandom::new());
|
||||||
|
let route_runtime = Arc::new(RouteRuntimeController::new(RelayRouteMode::Direct));
|
||||||
|
let ip_tracker = Arc::new(UserIpTracker::new());
|
||||||
|
let beobachten = Arc::new(BeobachtenStore::new());
|
||||||
|
|
||||||
|
let server_task = {
|
||||||
|
let config = config.clone();
|
||||||
|
let stats = stats.clone();
|
||||||
|
let upstream_manager = upstream_manager.clone();
|
||||||
|
let replay_checker = replay_checker.clone();
|
||||||
|
let buffer_pool = buffer_pool.clone();
|
||||||
|
let rng = rng.clone();
|
||||||
|
let route_runtime = route_runtime.clone();
|
||||||
|
let ip_tracker = ip_tracker.clone();
|
||||||
|
let beobachten = beobachten.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let (stream, peer) = front_listener.accept().await.unwrap();
|
||||||
|
let real_peer_report = Arc::new(std::sync::Mutex::new(None));
|
||||||
|
ClientHandler::new(
|
||||||
|
stream,
|
||||||
|
peer,
|
||||||
|
config,
|
||||||
|
stats,
|
||||||
|
upstream_manager,
|
||||||
|
replay_checker,
|
||||||
|
buffer_pool,
|
||||||
|
rng,
|
||||||
|
None,
|
||||||
|
route_runtime,
|
||||||
|
None,
|
||||||
|
ip_tracker,
|
||||||
|
beobachten,
|
||||||
|
false,
|
||||||
|
real_peer_report,
|
||||||
|
)
|
||||||
|
.run()
|
||||||
|
.await
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let _client = TcpStream::connect(front_addr).await.unwrap();
|
||||||
|
|
||||||
|
// Let the accepted connection reach the idle wait before advancing paused time.
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
tokio::time::advance(Duration::from_secs(2)).await;
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
|
||||||
|
let result = tokio::time::timeout(Duration::from_secs(1), server_task)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
assert!(result.is_ok());
|
||||||
|
assert_eq!(stats.get_handshake_timeouts(), 0);
|
||||||
|
assert_eq!(stats.get_connects_bad(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn partial_tls_header_stall_triggers_handshake_timeout() {
|
async fn partial_tls_header_stall_triggers_handshake_timeout() {
|
||||||
let mut cfg = ProxyConfig::default();
|
let mut cfg = ProxyConfig::default();
|
||||||
|
|
@ -1487,6 +1646,147 @@ fn wrap_tls_application_data(payload: &[u8]) -> Vec<u8> {
|
||||||
record
|
record
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn wrap_tls_ccs_record() -> Vec<u8> {
|
||||||
|
let mut record = Vec::with_capacity(6);
|
||||||
|
record.push(TLS_RECORD_CHANGE_CIPHER);
|
||||||
|
record.extend_from_slice(&[0x03, 0x03]);
|
||||||
|
record.extend_from_slice(&1u16.to_be_bytes());
|
||||||
|
record.push(0x01);
|
||||||
|
record
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_valid_mtproto_handshake(
|
||||||
|
secret_hex: &str,
|
||||||
|
proto_tag: ProtoTag,
|
||||||
|
dc_idx: i16,
|
||||||
|
) -> [u8; HANDSHAKE_LEN] {
|
||||||
|
let secret = hex::decode(secret_hex).expect("secret hex must decode for mtproto test helper");
|
||||||
|
|
||||||
|
let mut handshake = [0x5Au8; HANDSHAKE_LEN];
|
||||||
|
for (idx, b) in handshake[SKIP_LEN..SKIP_LEN + PREKEY_LEN + IV_LEN]
|
||||||
|
.iter_mut()
|
||||||
|
.enumerate()
|
||||||
|
{
|
||||||
|
*b = (idx as u8).wrapping_add(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
let dec_prekey = &handshake[SKIP_LEN..SKIP_LEN + PREKEY_LEN];
|
||||||
|
let dec_iv_bytes = &handshake[SKIP_LEN + PREKEY_LEN..SKIP_LEN + PREKEY_LEN + IV_LEN];
|
||||||
|
|
||||||
|
let mut dec_key_input = Vec::with_capacity(PREKEY_LEN + secret.len());
|
||||||
|
dec_key_input.extend_from_slice(dec_prekey);
|
||||||
|
dec_key_input.extend_from_slice(&secret);
|
||||||
|
let dec_key = sha256(&dec_key_input);
|
||||||
|
|
||||||
|
let mut dec_iv_arr = [0u8; IV_LEN];
|
||||||
|
dec_iv_arr.copy_from_slice(dec_iv_bytes);
|
||||||
|
let dec_iv = u128::from_be_bytes(dec_iv_arr);
|
||||||
|
|
||||||
|
let mut stream = AesCtr::new(&dec_key, dec_iv);
|
||||||
|
let keystream = stream.encrypt(&[0u8; HANDSHAKE_LEN]);
|
||||||
|
|
||||||
|
let mut target_plain = [0u8; HANDSHAKE_LEN];
|
||||||
|
target_plain[PROTO_TAG_POS..PROTO_TAG_POS + 4].copy_from_slice(&proto_tag.to_bytes());
|
||||||
|
target_plain[DC_IDX_POS..DC_IDX_POS + 2].copy_from_slice(&dc_idx.to_le_bytes());
|
||||||
|
|
||||||
|
for idx in PROTO_TAG_POS..HANDSHAKE_LEN {
|
||||||
|
handshake[idx] = target_plain[idx] ^ keystream[idx];
|
||||||
|
}
|
||||||
|
|
||||||
|
handshake
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fragmented_tls_mtproto_with_interleaved_ccs_is_accepted() {
|
||||||
|
let secret_hex = "55555555555555555555555555555555";
|
||||||
|
let secret = [0x55u8; 16];
|
||||||
|
let client_hello = make_valid_tls_client_hello(&secret, 0);
|
||||||
|
let mtproto_handshake = make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 2);
|
||||||
|
|
||||||
|
let mut cfg = ProxyConfig::default();
|
||||||
|
cfg.general.beobachten = false;
|
||||||
|
cfg.access.ignore_time_skew = true;
|
||||||
|
cfg.access
|
||||||
|
.users
|
||||||
|
.insert("user".to_string(), secret_hex.to_string());
|
||||||
|
|
||||||
|
let config = Arc::new(cfg);
|
||||||
|
let replay_checker = Arc::new(ReplayChecker::new(128, Duration::from_secs(60)));
|
||||||
|
let rng = SecureRandom::new();
|
||||||
|
|
||||||
|
let (server_side, mut client_side) = duplex(131072);
|
||||||
|
let peer: SocketAddr = "198.51.100.85:55007".parse().unwrap();
|
||||||
|
let (read_half, write_half) = tokio::io::split(server_side);
|
||||||
|
|
||||||
|
let (mut tls_reader, tls_writer, tls_user) = match handle_tls_handshake(
|
||||||
|
&client_hello,
|
||||||
|
read_half,
|
||||||
|
write_half,
|
||||||
|
peer,
|
||||||
|
&config,
|
||||||
|
&replay_checker,
|
||||||
|
&rng,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
HandshakeResult::Success(result) => result,
|
||||||
|
_ => panic!("expected successful TLS handshake"),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut tls_response_head = [0u8; 5];
|
||||||
|
client_side
|
||||||
|
.read_exact(&mut tls_response_head)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(tls_response_head[0], 0x16);
|
||||||
|
let tls_response_len = u16::from_be_bytes([tls_response_head[3], tls_response_head[4]]) as usize;
|
||||||
|
let mut tls_response_body = vec![0u8; tls_response_len];
|
||||||
|
client_side
|
||||||
|
.read_exact(&mut tls_response_body)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
client_side
|
||||||
|
.write_all(&wrap_tls_application_data(&mtproto_handshake[..13]))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
client_side.write_all(&wrap_tls_ccs_record()).await.unwrap();
|
||||||
|
client_side
|
||||||
|
.write_all(&wrap_tls_application_data(&mtproto_handshake[13..37]))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
client_side.write_all(&wrap_tls_ccs_record()).await.unwrap();
|
||||||
|
client_side
|
||||||
|
.write_all(&wrap_tls_application_data(&mtproto_handshake[37..]))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mtproto_data = tls_reader.read_exact(HANDSHAKE_LEN).await.unwrap();
|
||||||
|
assert_eq!(&mtproto_data[..], &mtproto_handshake);
|
||||||
|
|
||||||
|
let mtproto_handshake: [u8; HANDSHAKE_LEN] = mtproto_data[..].try_into().unwrap();
|
||||||
|
let (_, _, success) = match handle_mtproto_handshake(
|
||||||
|
&mtproto_handshake,
|
||||||
|
tls_reader,
|
||||||
|
tls_writer,
|
||||||
|
peer,
|
||||||
|
&config,
|
||||||
|
&replay_checker,
|
||||||
|
true,
|
||||||
|
Some(tls_user.as_str()),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
HandshakeResult::Success(result) => result,
|
||||||
|
_ => panic!("expected successful MTProto handshake"),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(success.user, "user");
|
||||||
|
assert_eq!(success.proto_tag, ProtoTag::Secure);
|
||||||
|
assert_eq!(success.dc_idx, 2);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn valid_tls_path_does_not_fall_back_to_mask_backend() {
|
async fn valid_tls_path_does_not_fall_back_to_mask_backend() {
|
||||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue