mirror of
https://github.com/telemt/telemt.git
synced 2026-04-25 14:34:10 +03:00
Сlassified Bad Connections and Handshake Failures in API
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2791,7 +2791,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "telemt"
|
name = "telemt"
|
||||||
version = "3.4.5"
|
version = "3.4.6"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aes",
|
"aes",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "telemt"
|
name = "telemt"
|
||||||
version = "3.4.5"
|
version = "3.4.6"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ use config_store::{current_revision, load_config_from_disk, parse_if_match};
|
|||||||
use events::ApiEventStore;
|
use events::ApiEventStore;
|
||||||
use http_utils::{error_response, read_json, read_optional_json, success_response};
|
use http_utils::{error_response, read_json, read_optional_json, success_response};
|
||||||
use model::{
|
use model::{
|
||||||
ApiFailure, CreateUserRequest, DeleteUserResponse, HealthData, HealthReadyData,
|
ApiFailure, ClassCount, CreateUserRequest, DeleteUserResponse, HealthData, HealthReadyData,
|
||||||
PatchUserRequest, RotateSecretRequest, SummaryData, UserActiveIps,
|
PatchUserRequest, RotateSecretRequest, SummaryData, UserActiveIps,
|
||||||
};
|
};
|
||||||
use runtime_edge::{
|
use runtime_edge::{
|
||||||
@@ -334,10 +334,24 @@ async fn handle(
|
|||||||
}
|
}
|
||||||
("GET", "/v1/stats/summary") => {
|
("GET", "/v1/stats/summary") => {
|
||||||
let revision = current_revision(&shared.config_path).await?;
|
let revision = current_revision(&shared.config_path).await?;
|
||||||
|
let connections_bad_by_class = shared
|
||||||
|
.stats
|
||||||
|
.get_connects_bad_class_counts()
|
||||||
|
.into_iter()
|
||||||
|
.map(|(class, total)| ClassCount { class, total })
|
||||||
|
.collect();
|
||||||
|
let handshake_failures_by_class = shared
|
||||||
|
.stats
|
||||||
|
.get_handshake_failure_class_counts()
|
||||||
|
.into_iter()
|
||||||
|
.map(|(class, total)| ClassCount { class, total })
|
||||||
|
.collect();
|
||||||
let data = SummaryData {
|
let data = SummaryData {
|
||||||
uptime_seconds: shared.stats.uptime_secs(),
|
uptime_seconds: shared.stats.uptime_secs(),
|
||||||
connections_total: shared.stats.get_connects_all(),
|
connections_total: shared.stats.get_connects_all(),
|
||||||
connections_bad_total: shared.stats.get_connects_bad(),
|
connections_bad_total: shared.stats.get_connects_bad(),
|
||||||
|
connections_bad_by_class,
|
||||||
|
handshake_failures_by_class,
|
||||||
handshake_timeouts_total: shared.stats.get_handshake_timeouts(),
|
handshake_timeouts_total: shared.stats.get_handshake_timeouts(),
|
||||||
configured_users: cfg.access.users.len(),
|
configured_users: cfg.access.users.len(),
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -71,11 +71,19 @@ pub(super) struct HealthReadyData {
|
|||||||
pub(super) total_upstreams: usize,
|
pub(super) total_upstreams: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Clone)]
|
||||||
|
pub(super) struct ClassCount {
|
||||||
|
pub(super) class: String,
|
||||||
|
pub(super) total: u64,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
pub(super) struct SummaryData {
|
pub(super) struct SummaryData {
|
||||||
pub(super) uptime_seconds: f64,
|
pub(super) uptime_seconds: f64,
|
||||||
pub(super) connections_total: u64,
|
pub(super) connections_total: u64,
|
||||||
pub(super) connections_bad_total: u64,
|
pub(super) connections_bad_total: u64,
|
||||||
|
pub(super) connections_bad_by_class: Vec<ClassCount>,
|
||||||
|
pub(super) handshake_failures_by_class: Vec<ClassCount>,
|
||||||
pub(super) handshake_timeouts_total: u64,
|
pub(super) handshake_timeouts_total: u64,
|
||||||
pub(super) configured_users: usize,
|
pub(super) configured_users: usize,
|
||||||
}
|
}
|
||||||
@@ -91,6 +99,8 @@ pub(super) struct ZeroCoreData {
|
|||||||
pub(super) uptime_seconds: f64,
|
pub(super) uptime_seconds: f64,
|
||||||
pub(super) connections_total: u64,
|
pub(super) connections_total: u64,
|
||||||
pub(super) connections_bad_total: u64,
|
pub(super) connections_bad_total: u64,
|
||||||
|
pub(super) connections_bad_by_class: Vec<ClassCount>,
|
||||||
|
pub(super) handshake_failures_by_class: Vec<ClassCount>,
|
||||||
pub(super) handshake_timeouts_total: u64,
|
pub(super) handshake_timeouts_total: u64,
|
||||||
pub(super) accept_permit_timeout_total: u64,
|
pub(super) accept_permit_timeout_total: u64,
|
||||||
pub(super) configured_users: usize,
|
pub(super) configured_users: usize,
|
||||||
|
|||||||
@@ -7,8 +7,8 @@ use crate::transport::upstream::IpPreference;
|
|||||||
|
|
||||||
use super::ApiShared;
|
use super::ApiShared;
|
||||||
use super::model::{
|
use super::model::{
|
||||||
DcEndpointWriters, DcStatus, DcStatusData, MeWriterStatus, MeWritersData, MeWritersSummary,
|
ClassCount, DcEndpointWriters, DcStatus, DcStatusData, MeWriterStatus, MeWritersData,
|
||||||
MinimalAllData, MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData,
|
MeWritersSummary, MinimalAllData, MinimalAllPayload, MinimalDcPathData, MinimalMeRuntimeData,
|
||||||
MinimalQuarantineData, UpstreamDcStatus, UpstreamStatus, UpstreamSummaryData, UpstreamsData,
|
MinimalQuarantineData, UpstreamDcStatus, UpstreamStatus, UpstreamSummaryData, UpstreamsData,
|
||||||
ZeroAllData, ZeroCodeCount, ZeroCoreData, ZeroDesyncData, ZeroMiddleProxyData, ZeroPoolData,
|
ZeroAllData, ZeroCodeCount, ZeroCoreData, ZeroDesyncData, ZeroMiddleProxyData, ZeroPoolData,
|
||||||
ZeroUpstreamData,
|
ZeroUpstreamData,
|
||||||
@@ -26,6 +26,16 @@ pub(crate) struct MinimalCacheEntry {
|
|||||||
|
|
||||||
pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> ZeroAllData {
|
pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> ZeroAllData {
|
||||||
let telemetry = stats.telemetry_policy();
|
let telemetry = stats.telemetry_policy();
|
||||||
|
let bad_connection_classes = stats
|
||||||
|
.get_connects_bad_class_counts()
|
||||||
|
.into_iter()
|
||||||
|
.map(|(class, total)| ClassCount { class, total })
|
||||||
|
.collect();
|
||||||
|
let handshake_failure_classes = stats
|
||||||
|
.get_handshake_failure_class_counts()
|
||||||
|
.into_iter()
|
||||||
|
.map(|(class, total)| ClassCount { class, total })
|
||||||
|
.collect();
|
||||||
let handshake_error_codes = stats
|
let handshake_error_codes = stats
|
||||||
.get_me_handshake_error_code_counts()
|
.get_me_handshake_error_code_counts()
|
||||||
.into_iter()
|
.into_iter()
|
||||||
@@ -38,6 +48,8 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer
|
|||||||
uptime_seconds: stats.uptime_secs(),
|
uptime_seconds: stats.uptime_secs(),
|
||||||
connections_total: stats.get_connects_all(),
|
connections_total: stats.get_connects_all(),
|
||||||
connections_bad_total: stats.get_connects_bad(),
|
connections_bad_total: stats.get_connects_bad(),
|
||||||
|
connections_bad_by_class: bad_connection_classes,
|
||||||
|
handshake_failures_by_class: handshake_failure_classes,
|
||||||
handshake_timeouts_total: stats.get_handshake_timeouts(),
|
handshake_timeouts_total: stats.get_handshake_timeouts(),
|
||||||
accept_permit_timeout_total: stats.get_accept_permit_timeout_total(),
|
accept_permit_timeout_total: stats.get_accept_permit_timeout_total(),
|
||||||
configured_users,
|
configured_users,
|
||||||
|
|||||||
@@ -324,38 +324,38 @@ fn record_beobachten_class(
|
|||||||
beobachten.record(class, peer_ip, beobachten_ttl(config));
|
beobachten.record(class, peer_ip, beobachten_ttl(config));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn classify_expected_64_got_0(kind: std::io::ErrorKind) -> Option<&'static str> {
|
||||||
|
match kind {
|
||||||
|
std::io::ErrorKind::UnexpectedEof => Some("expected_64_got_0_unexpected_eof"),
|
||||||
|
std::io::ErrorKind::ConnectionReset => Some("expected_64_got_0_connection_reset"),
|
||||||
|
std::io::ErrorKind::ConnectionAborted => Some("expected_64_got_0_connection_aborted"),
|
||||||
|
std::io::ErrorKind::BrokenPipe => Some("expected_64_got_0_broken_pipe"),
|
||||||
|
std::io::ErrorKind::NotConnected => Some("expected_64_got_0_not_connected"),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn classify_handshake_failure_class(error: &ProxyError) -> &'static str {
|
||||||
|
match error {
|
||||||
|
ProxyError::Io(err) => classify_expected_64_got_0(err.kind()).unwrap_or("other"),
|
||||||
|
ProxyError::Stream(StreamError::UnexpectedEof) => "expected_64_got_0_unexpected_eof",
|
||||||
|
ProxyError::Stream(StreamError::Io(err)) => {
|
||||||
|
classify_expected_64_got_0(err.kind()).unwrap_or("other")
|
||||||
|
}
|
||||||
|
_ => "other",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn record_handshake_failure_class(
|
fn record_handshake_failure_class(
|
||||||
beobachten: &BeobachtenStore,
|
beobachten: &BeobachtenStore,
|
||||||
config: &ProxyConfig,
|
config: &ProxyConfig,
|
||||||
peer_ip: IpAddr,
|
peer_ip: IpAddr,
|
||||||
error: &ProxyError,
|
error: &ProxyError,
|
||||||
) {
|
) {
|
||||||
let class = match error {
|
// Keep beobachten buckets stable while detailed per-kind classification
|
||||||
ProxyError::Io(err)
|
// is tracked in API counters.
|
||||||
if matches!(
|
let class = match classify_handshake_failure_class(error) {
|
||||||
err.kind(),
|
value if value.starts_with("expected_64_got_0_") => "expected_64_got_0",
|
||||||
std::io::ErrorKind::UnexpectedEof
|
|
||||||
| std::io::ErrorKind::ConnectionReset
|
|
||||||
| std::io::ErrorKind::ConnectionAborted
|
|
||||||
| std::io::ErrorKind::BrokenPipe
|
|
||||||
| std::io::ErrorKind::NotConnected
|
|
||||||
) =>
|
|
||||||
{
|
|
||||||
"expected_64_got_0"
|
|
||||||
}
|
|
||||||
ProxyError::Stream(StreamError::UnexpectedEof) => "expected_64_got_0",
|
|
||||||
ProxyError::Stream(StreamError::Io(err))
|
|
||||||
if matches!(
|
|
||||||
err.kind(),
|
|
||||||
std::io::ErrorKind::UnexpectedEof
|
|
||||||
| std::io::ErrorKind::ConnectionReset
|
|
||||||
| std::io::ErrorKind::ConnectionAborted
|
|
||||||
| std::io::ErrorKind::BrokenPipe
|
|
||||||
| std::io::ErrorKind::NotConnected
|
|
||||||
) =>
|
|
||||||
{
|
|
||||||
"expected_64_got_0"
|
|
||||||
}
|
|
||||||
_ => "other",
|
_ => "other",
|
||||||
};
|
};
|
||||||
record_beobachten_class(beobachten, config, peer_ip, class);
|
record_beobachten_class(beobachten, config, peer_ip, class);
|
||||||
@@ -364,7 +364,7 @@ fn record_handshake_failure_class(
|
|||||||
#[inline]
|
#[inline]
|
||||||
fn increment_bad_on_unknown_tls_sni(stats: &Stats, error: &ProxyError) {
|
fn increment_bad_on_unknown_tls_sni(stats: &Stats, error: &ProxyError) {
|
||||||
if matches!(error, ProxyError::UnknownTlsSni) {
|
if matches!(error, ProxyError::UnknownTlsSni) {
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad_with_class("unknown_tls_sni");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -465,7 +465,7 @@ where
|
|||||||
Ok(Ok(info)) => {
|
Ok(Ok(info)) => {
|
||||||
if !is_trusted_proxy_source(peer.ip(), &config.server.proxy_protocol_trusted_cidrs)
|
if !is_trusted_proxy_source(peer.ip(), &config.server.proxy_protocol_trusted_cidrs)
|
||||||
{
|
{
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad_with_class("proxy_protocol_untrusted");
|
||||||
warn!(
|
warn!(
|
||||||
peer = %peer,
|
peer = %peer,
|
||||||
trusted = ?config.server.proxy_protocol_trusted_cidrs,
|
trusted = ?config.server.proxy_protocol_trusted_cidrs,
|
||||||
@@ -486,13 +486,13 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad_with_class("proxy_protocol_invalid_header");
|
||||||
warn!(peer = %peer, error = %e, "Invalid PROXY protocol header");
|
warn!(peer = %peer, error = %e, "Invalid PROXY protocol header");
|
||||||
record_beobachten_class(&beobachten, &config, peer.ip(), "other");
|
record_beobachten_class(&beobachten, &config, peer.ip(), "other");
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad_with_class("proxy_protocol_header_timeout");
|
||||||
warn!(peer = %peer, timeout_ms = proxy_header_timeout.as_millis(), "PROXY protocol header timeout");
|
warn!(peer = %peer, timeout_ms = proxy_header_timeout.as_millis(), "PROXY protocol header timeout");
|
||||||
record_beobachten_class(&beobachten, &config, peer.ip(), "other");
|
record_beobachten_class(&beobachten, &config, peer.ip(), "other");
|
||||||
return Err(ProxyError::InvalidProxyProtocol);
|
return Err(ProxyError::InvalidProxyProtocol);
|
||||||
@@ -582,7 +582,7 @@ where
|
|||||||
// third-party clients or future Telegram versions.
|
// third-party clients or future Telegram versions.
|
||||||
if !tls_clienthello_len_in_bounds(tls_len) {
|
if !tls_clienthello_len_in_bounds(tls_len) {
|
||||||
debug!(peer = %real_peer, tls_len = tls_len, max_tls_len = MAX_TLS_PLAINTEXT_SIZE, "TLS handshake length out of bounds");
|
debug!(peer = %real_peer, tls_len = tls_len, max_tls_len = MAX_TLS_PLAINTEXT_SIZE, "TLS handshake length out of bounds");
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad_with_class("tls_clienthello_len_out_of_bounds");
|
||||||
maybe_apply_mask_reject_delay(&config).await;
|
maybe_apply_mask_reject_delay(&config).await;
|
||||||
let (reader, writer) = tokio::io::split(stream);
|
let (reader, writer) = tokio::io::split(stream);
|
||||||
return Ok(masking_outcome(
|
return Ok(masking_outcome(
|
||||||
@@ -602,7 +602,7 @@ where
|
|||||||
Ok(n) => n,
|
Ok(n) => n,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!(peer = %real_peer, error = %e, tls_len = tls_len, "TLS ClientHello body read failed; engaging masking fallback");
|
debug!(peer = %real_peer, error = %e, tls_len = tls_len, "TLS ClientHello body read failed; engaging masking fallback");
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad_with_class("tls_clienthello_read_error");
|
||||||
maybe_apply_mask_reject_delay(&config).await;
|
maybe_apply_mask_reject_delay(&config).await;
|
||||||
let initial_len = 5;
|
let initial_len = 5;
|
||||||
let (reader, writer) = tokio::io::split(stream);
|
let (reader, writer) = tokio::io::split(stream);
|
||||||
@@ -620,7 +620,7 @@ where
|
|||||||
|
|
||||||
if body_read < tls_len {
|
if body_read < tls_len {
|
||||||
debug!(peer = %real_peer, got = body_read, expected = tls_len, "Truncated in-range TLS ClientHello; engaging masking fallback");
|
debug!(peer = %real_peer, got = body_read, expected = tls_len, "Truncated in-range TLS ClientHello; engaging masking fallback");
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad_with_class("tls_clienthello_truncated");
|
||||||
maybe_apply_mask_reject_delay(&config).await;
|
maybe_apply_mask_reject_delay(&config).await;
|
||||||
let initial_len = 5 + body_read;
|
let initial_len = 5 + body_read;
|
||||||
let (reader, writer) = tokio::io::split(stream);
|
let (reader, writer) = tokio::io::split(stream);
|
||||||
@@ -644,7 +644,7 @@ where
|
|||||||
).await {
|
).await {
|
||||||
HandshakeResult::Success(result) => result,
|
HandshakeResult::Success(result) => result,
|
||||||
HandshakeResult::BadClient { reader, writer } => {
|
HandshakeResult::BadClient { reader, writer } => {
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad_with_class("tls_handshake_bad_client");
|
||||||
return Ok(masking_outcome(
|
return Ok(masking_outcome(
|
||||||
reader,
|
reader,
|
||||||
writer,
|
writer,
|
||||||
@@ -684,7 +684,7 @@ where
|
|||||||
wrap_tls_application_record(&pending_plaintext)
|
wrap_tls_application_record(&pending_plaintext)
|
||||||
};
|
};
|
||||||
let reader = tokio::io::AsyncReadExt::chain(std::io::Cursor::new(pending_record), reader);
|
let reader = tokio::io::AsyncReadExt::chain(std::io::Cursor::new(pending_record), reader);
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad_with_class("tls_mtproto_bad_client");
|
||||||
debug!(
|
debug!(
|
||||||
peer = %peer,
|
peer = %peer,
|
||||||
"Authenticated TLS session failed MTProto validation; engaging masking fallback"
|
"Authenticated TLS session failed MTProto validation; engaging masking fallback"
|
||||||
@@ -714,7 +714,7 @@ where
|
|||||||
} else {
|
} else {
|
||||||
if !config.general.modes.classic && !config.general.modes.secure {
|
if !config.general.modes.classic && !config.general.modes.secure {
|
||||||
debug!(peer = %real_peer, "Non-TLS modes disabled");
|
debug!(peer = %real_peer, "Non-TLS modes disabled");
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad_with_class("direct_modes_disabled");
|
||||||
maybe_apply_mask_reject_delay(&config).await;
|
maybe_apply_mask_reject_delay(&config).await;
|
||||||
let (reader, writer) = tokio::io::split(stream);
|
let (reader, writer) = tokio::io::split(stream);
|
||||||
return Ok(masking_outcome(
|
return Ok(masking_outcome(
|
||||||
@@ -741,7 +741,7 @@ where
|
|||||||
).await {
|
).await {
|
||||||
HandshakeResult::Success(result) => result,
|
HandshakeResult::Success(result) => result,
|
||||||
HandshakeResult::BadClient { reader, writer } => {
|
HandshakeResult::BadClient { reader, writer } => {
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad_with_class("direct_mtproto_bad_client");
|
||||||
return Ok(masking_outcome(
|
return Ok(masking_outcome(
|
||||||
reader,
|
reader,
|
||||||
writer,
|
writer,
|
||||||
@@ -778,6 +778,7 @@ where
|
|||||||
Ok(Ok(outcome)) => outcome,
|
Ok(Ok(outcome)) => outcome,
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
debug!(peer = %peer, error = %e, "Handshake failed");
|
debug!(peer = %peer, error = %e, "Handshake failed");
|
||||||
|
stats_for_timeout.increment_handshake_failure_class(classify_handshake_failure_class(&e));
|
||||||
record_handshake_failure_class(
|
record_handshake_failure_class(
|
||||||
&beobachten_for_timeout,
|
&beobachten_for_timeout,
|
||||||
&config_for_timeout,
|
&config_for_timeout,
|
||||||
@@ -788,6 +789,7 @@ where
|
|||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
stats_for_timeout.increment_handshake_timeouts();
|
stats_for_timeout.increment_handshake_timeouts();
|
||||||
|
stats_for_timeout.increment_handshake_failure_class("timeout");
|
||||||
debug!(peer = %peer, "Handshake timeout");
|
debug!(peer = %peer, "Handshake timeout");
|
||||||
record_beobachten_class(
|
record_beobachten_class(
|
||||||
&beobachten_for_timeout,
|
&beobachten_for_timeout,
|
||||||
@@ -977,7 +979,8 @@ impl RunningClientHandler {
|
|||||||
self.peer.ip(),
|
self.peer.ip(),
|
||||||
&self.config.server.proxy_protocol_trusted_cidrs,
|
&self.config.server.proxy_protocol_trusted_cidrs,
|
||||||
) {
|
) {
|
||||||
self.stats.increment_connects_bad();
|
self.stats
|
||||||
|
.increment_connects_bad_with_class("proxy_protocol_untrusted");
|
||||||
warn!(
|
warn!(
|
||||||
peer = %self.peer,
|
peer = %self.peer,
|
||||||
trusted = ?self.config.server.proxy_protocol_trusted_cidrs,
|
trusted = ?self.config.server.proxy_protocol_trusted_cidrs,
|
||||||
@@ -1007,7 +1010,8 @@ impl RunningClientHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
self.stats.increment_connects_bad();
|
self.stats
|
||||||
|
.increment_connects_bad_with_class("proxy_protocol_invalid_header");
|
||||||
warn!(peer = %self.peer, error = %e, "Invalid PROXY protocol header");
|
warn!(peer = %self.peer, error = %e, "Invalid PROXY protocol header");
|
||||||
record_beobachten_class(
|
record_beobachten_class(
|
||||||
&self.beobachten,
|
&self.beobachten,
|
||||||
@@ -1018,7 +1022,8 @@ impl RunningClientHandler {
|
|||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
self.stats.increment_connects_bad();
|
self.stats
|
||||||
|
.increment_connects_bad_with_class("proxy_protocol_header_timeout");
|
||||||
warn!(
|
warn!(
|
||||||
peer = %self.peer,
|
peer = %self.peer,
|
||||||
timeout_ms = proxy_header_timeout.as_millis(),
|
timeout_ms = proxy_header_timeout.as_millis(),
|
||||||
@@ -1116,6 +1121,7 @@ impl RunningClientHandler {
|
|||||||
Ok(Ok(outcome)) => outcome,
|
Ok(Ok(outcome)) => outcome,
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
debug!(peer = %peer_for_log, error = %e, "Handshake failed");
|
debug!(peer = %peer_for_log, error = %e, "Handshake failed");
|
||||||
|
stats.increment_handshake_failure_class(classify_handshake_failure_class(&e));
|
||||||
record_handshake_failure_class(
|
record_handshake_failure_class(
|
||||||
&beobachten_for_timeout,
|
&beobachten_for_timeout,
|
||||||
&config_for_timeout,
|
&config_for_timeout,
|
||||||
@@ -1126,6 +1132,7 @@ impl RunningClientHandler {
|
|||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
stats.increment_handshake_timeouts();
|
stats.increment_handshake_timeouts();
|
||||||
|
stats.increment_handshake_failure_class("timeout");
|
||||||
debug!(peer = %peer_for_log, "Handshake timeout");
|
debug!(peer = %peer_for_log, "Handshake timeout");
|
||||||
record_beobachten_class(
|
record_beobachten_class(
|
||||||
&beobachten_for_timeout,
|
&beobachten_for_timeout,
|
||||||
@@ -1161,7 +1168,8 @@ impl RunningClientHandler {
|
|||||||
// third-party clients or future Telegram versions.
|
// third-party clients or future Telegram versions.
|
||||||
if !tls_clienthello_len_in_bounds(tls_len) {
|
if !tls_clienthello_len_in_bounds(tls_len) {
|
||||||
debug!(peer = %peer, tls_len = tls_len, max_tls_len = MAX_TLS_PLAINTEXT_SIZE, "TLS handshake length out of bounds");
|
debug!(peer = %peer, tls_len = tls_len, max_tls_len = MAX_TLS_PLAINTEXT_SIZE, "TLS handshake length out of bounds");
|
||||||
self.stats.increment_connects_bad();
|
self.stats
|
||||||
|
.increment_connects_bad_with_class("tls_clienthello_len_out_of_bounds");
|
||||||
maybe_apply_mask_reject_delay(&self.config).await;
|
maybe_apply_mask_reject_delay(&self.config).await;
|
||||||
let (reader, writer) = self.stream.into_split();
|
let (reader, writer) = self.stream.into_split();
|
||||||
return Ok(masking_outcome(
|
return Ok(masking_outcome(
|
||||||
@@ -1181,7 +1189,8 @@ impl RunningClientHandler {
|
|||||||
Ok(n) => n,
|
Ok(n) => n,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!(peer = %peer, error = %e, tls_len = tls_len, "TLS ClientHello body read failed; engaging masking fallback");
|
debug!(peer = %peer, error = %e, tls_len = tls_len, "TLS ClientHello body read failed; engaging masking fallback");
|
||||||
self.stats.increment_connects_bad();
|
self.stats
|
||||||
|
.increment_connects_bad_with_class("tls_clienthello_read_error");
|
||||||
maybe_apply_mask_reject_delay(&self.config).await;
|
maybe_apply_mask_reject_delay(&self.config).await;
|
||||||
let (reader, writer) = self.stream.into_split();
|
let (reader, writer) = self.stream.into_split();
|
||||||
return Ok(masking_outcome(
|
return Ok(masking_outcome(
|
||||||
@@ -1198,7 +1207,8 @@ impl RunningClientHandler {
|
|||||||
|
|
||||||
if body_read < tls_len {
|
if body_read < tls_len {
|
||||||
debug!(peer = %peer, got = body_read, expected = tls_len, "Truncated in-range TLS ClientHello; engaging masking fallback");
|
debug!(peer = %peer, got = body_read, expected = tls_len, "Truncated in-range TLS ClientHello; engaging masking fallback");
|
||||||
self.stats.increment_connects_bad();
|
self.stats
|
||||||
|
.increment_connects_bad_with_class("tls_clienthello_truncated");
|
||||||
maybe_apply_mask_reject_delay(&self.config).await;
|
maybe_apply_mask_reject_delay(&self.config).await;
|
||||||
let initial_len = 5 + body_read;
|
let initial_len = 5 + body_read;
|
||||||
let (reader, writer) = self.stream.into_split();
|
let (reader, writer) = self.stream.into_split();
|
||||||
@@ -1235,7 +1245,7 @@ impl RunningClientHandler {
|
|||||||
{
|
{
|
||||||
HandshakeResult::Success(result) => result,
|
HandshakeResult::Success(result) => result,
|
||||||
HandshakeResult::BadClient { reader, writer } => {
|
HandshakeResult::BadClient { reader, writer } => {
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad_with_class("tls_handshake_bad_client");
|
||||||
return Ok(masking_outcome(
|
return Ok(masking_outcome(
|
||||||
reader,
|
reader,
|
||||||
writer,
|
writer,
|
||||||
@@ -1285,7 +1295,7 @@ impl RunningClientHandler {
|
|||||||
};
|
};
|
||||||
let reader =
|
let reader =
|
||||||
tokio::io::AsyncReadExt::chain(std::io::Cursor::new(pending_record), reader);
|
tokio::io::AsyncReadExt::chain(std::io::Cursor::new(pending_record), reader);
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad_with_class("tls_mtproto_bad_client");
|
||||||
debug!(
|
debug!(
|
||||||
peer = %peer,
|
peer = %peer,
|
||||||
"Authenticated TLS session failed MTProto validation; engaging masking fallback"
|
"Authenticated TLS session failed MTProto validation; engaging masking fallback"
|
||||||
@@ -1332,7 +1342,8 @@ impl RunningClientHandler {
|
|||||||
|
|
||||||
if !self.config.general.modes.classic && !self.config.general.modes.secure {
|
if !self.config.general.modes.classic && !self.config.general.modes.secure {
|
||||||
debug!(peer = %peer, "Non-TLS modes disabled");
|
debug!(peer = %peer, "Non-TLS modes disabled");
|
||||||
self.stats.increment_connects_bad();
|
self.stats
|
||||||
|
.increment_connects_bad_with_class("direct_modes_disabled");
|
||||||
maybe_apply_mask_reject_delay(&self.config).await;
|
maybe_apply_mask_reject_delay(&self.config).await;
|
||||||
let (reader, writer) = self.stream.into_split();
|
let (reader, writer) = self.stream.into_split();
|
||||||
return Ok(masking_outcome(
|
return Ok(masking_outcome(
|
||||||
@@ -1372,7 +1383,7 @@ impl RunningClientHandler {
|
|||||||
{
|
{
|
||||||
HandshakeResult::Success(result) => result,
|
HandshakeResult::Success(result) => result,
|
||||||
HandshakeResult::BadClient { reader, writer } => {
|
HandshakeResult::BadClient { reader, writer } => {
|
||||||
stats.increment_connects_bad();
|
stats.increment_connects_bad_with_class("direct_mtproto_bad_client");
|
||||||
return Ok(masking_outcome(
|
return Ok(masking_outcome(
|
||||||
reader,
|
reader,
|
||||||
writer,
|
writer,
|
||||||
|
|||||||
@@ -88,6 +88,8 @@ impl Drop for RouteConnectionLease {
|
|||||||
pub struct Stats {
|
pub struct Stats {
|
||||||
connects_all: AtomicU64,
|
connects_all: AtomicU64,
|
||||||
connects_bad: AtomicU64,
|
connects_bad: AtomicU64,
|
||||||
|
connects_bad_classes: DashMap<&'static str, AtomicU64>,
|
||||||
|
handshake_failure_classes: DashMap<&'static str, AtomicU64>,
|
||||||
current_connections_direct: AtomicU64,
|
current_connections_direct: AtomicU64,
|
||||||
current_connections_me: AtomicU64,
|
current_connections_me: AtomicU64,
|
||||||
handshake_timeouts: AtomicU64,
|
handshake_timeouts: AtomicU64,
|
||||||
@@ -518,10 +520,32 @@ impl Stats {
|
|||||||
self.connects_all.fetch_add(1, Ordering::Relaxed);
|
self.connects_all.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn increment_connects_bad(&self) {
|
|
||||||
if self.telemetry_core_enabled() {
|
pub fn increment_connects_bad_with_class(&self, class: &'static str) {
|
||||||
self.connects_bad.fetch_add(1, Ordering::Relaxed);
|
if !self.telemetry_core_enabled() {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
self.connects_bad.fetch_add(1, Ordering::Relaxed);
|
||||||
|
let entry = self
|
||||||
|
.connects_bad_classes
|
||||||
|
.entry(class)
|
||||||
|
.or_insert_with(|| AtomicU64::new(0));
|
||||||
|
entry.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn increment_connects_bad(&self) {
|
||||||
|
self.increment_connects_bad_with_class("other");
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn increment_handshake_failure_class(&self, class: &'static str) {
|
||||||
|
if !self.telemetry_core_enabled() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let entry = self
|
||||||
|
.handshake_failure_classes
|
||||||
|
.entry(class)
|
||||||
|
.or_insert_with(|| AtomicU64::new(0));
|
||||||
|
entry.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
pub fn increment_current_connections_direct(&self) {
|
pub fn increment_current_connections_direct(&self) {
|
||||||
self.current_connections_direct
|
self.current_connections_direct
|
||||||
@@ -1640,6 +1664,37 @@ impl Stats {
|
|||||||
pub fn get_connects_bad(&self) -> u64 {
|
pub fn get_connects_bad(&self) -> u64 {
|
||||||
self.connects_bad.load(Ordering::Relaxed)
|
self.connects_bad.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_connects_bad_class_counts(&self) -> Vec<(String, u64)> {
|
||||||
|
let mut out: Vec<(String, u64)> = self
|
||||||
|
.connects_bad_classes
|
||||||
|
.iter()
|
||||||
|
.map(|entry| {
|
||||||
|
(
|
||||||
|
entry.key().to_string(),
|
||||||
|
entry.value().load(Ordering::Relaxed),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
out.sort_by(|a, b| a.0.cmp(&b.0));
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_handshake_failure_class_counts(&self) -> Vec<(String, u64)> {
|
||||||
|
let mut out: Vec<(String, u64)> = self
|
||||||
|
.handshake_failure_classes
|
||||||
|
.iter()
|
||||||
|
.map(|entry| {
|
||||||
|
(
|
||||||
|
entry.key().to_string(),
|
||||||
|
entry.value().load(Ordering::Relaxed),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
out.sort_by(|a, b| a.0.cmp(&b.0));
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_accept_permit_timeout_total(&self) -> u64 {
|
pub fn get_accept_permit_timeout_total(&self) -> u64 {
|
||||||
self.accept_permit_timeout_total.load(Ordering::Relaxed)
|
self.accept_permit_timeout_total.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -436,7 +436,10 @@ fn build_client_hello(
|
|||||||
let session_id = if session_id_len == 0 {
|
let session_id = if session_id_len == 0 {
|
||||||
Vec::new()
|
Vec::new()
|
||||||
} else if deterministic {
|
} else if deterministic {
|
||||||
deterministic_bytes(&format!("tls-fetch-session:{sni}:{}", profile.as_str()), session_id_len)
|
deterministic_bytes(
|
||||||
|
&format!("tls-fetch-session:{sni}:{}", profile.as_str()),
|
||||||
|
session_id_len,
|
||||||
|
)
|
||||||
} else {
|
} else {
|
||||||
rng.bytes(session_id_len)
|
rng.bytes(session_id_len)
|
||||||
};
|
};
|
||||||
@@ -1480,17 +1483,13 @@ mod tests {
|
|||||||
out
|
out
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn capture_rustls_client_hello_record(alpn_protocols: &'static [&'static [u8]]) -> Vec<u8> {
|
async fn capture_rustls_client_hello_record(
|
||||||
|
alpn_protocols: &'static [&'static [u8]],
|
||||||
|
) -> Vec<u8> {
|
||||||
let (client, mut server) = tokio::io::duplex(32 * 1024);
|
let (client, mut server) = tokio::io::duplex(32 * 1024);
|
||||||
let fetch_task = tokio::spawn(async move {
|
let fetch_task = tokio::spawn(async move {
|
||||||
fetch_via_rustls_stream(
|
fetch_via_rustls_stream(client, "example.com", "example.com", None, alpn_protocols)
|
||||||
client,
|
.await
|
||||||
"example.com",
|
|
||||||
"example.com",
|
|
||||||
None,
|
|
||||||
alpn_protocols,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut header = [0u8; 5];
|
let mut header = [0u8; 5];
|
||||||
@@ -1507,7 +1506,10 @@ mod tests {
|
|||||||
drop(server);
|
drop(server);
|
||||||
|
|
||||||
let result = fetch_task.await.expect("fetch task must join");
|
let result = fetch_task.await.expect("fetch task must join");
|
||||||
assert!(result.is_err(), "capture task should end with handshake error");
|
assert!(
|
||||||
|
result.is_err(),
|
||||||
|
"capture task should end with handshake error"
|
||||||
|
);
|
||||||
|
|
||||||
let mut record = Vec::with_capacity(5 + body_len);
|
let mut record = Vec::with_capacity(5 + body_len);
|
||||||
record.extend_from_slice(&header);
|
record.extend_from_slice(&header);
|
||||||
@@ -1685,14 +1687,20 @@ mod tests {
|
|||||||
true,
|
true,
|
||||||
);
|
);
|
||||||
let parsed = parse_client_hello_for_test(&hello);
|
let parsed = parse_client_hello_for_test(&hello);
|
||||||
assert_eq!(parsed.session_id.len(), 32, "modern chrome must use non-empty session id");
|
assert_eq!(
|
||||||
|
parsed.session_id.len(),
|
||||||
|
32,
|
||||||
|
"modern chrome must use non-empty session id"
|
||||||
|
);
|
||||||
|
|
||||||
let extension_ids = parsed
|
let extension_ids = parsed
|
||||||
.extensions
|
.extensions
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(ext_type, _)| *ext_type)
|
.map(|(ext_type, _)| *ext_type)
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
let expected_prefix = [0x0000, 0x000b, 0x000a, 0x0023, 0x000d, 0x002b, 0x002d, 0x0033, 0x0010];
|
let expected_prefix = [
|
||||||
|
0x0000, 0x000b, 0x000a, 0x0023, 0x000d, 0x002b, 0x002d, 0x0033, 0x0010,
|
||||||
|
];
|
||||||
assert!(
|
assert!(
|
||||||
extension_ids.as_slice().starts_with(&expected_prefix),
|
extension_ids.as_slice().starts_with(&expected_prefix),
|
||||||
"unexpected extension order: {extension_ids:?}"
|
"unexpected extension order: {extension_ids:?}"
|
||||||
@@ -1713,13 +1721,20 @@ mod tests {
|
|||||||
"key_share payload is too short"
|
"key_share payload is too short"
|
||||||
);
|
);
|
||||||
let entry_len = u16::from_be_bytes([key_share_data[0], key_share_data[1]]) as usize;
|
let entry_len = u16::from_be_bytes([key_share_data[0], key_share_data[1]]) as usize;
|
||||||
assert_eq!(entry_len, key_share_data.len() - 2, "key_share list length mismatch");
|
assert_eq!(
|
||||||
|
entry_len,
|
||||||
|
key_share_data.len() - 2,
|
||||||
|
"key_share list length mismatch"
|
||||||
|
);
|
||||||
let group = u16::from_be_bytes([key_share_data[2], key_share_data[3]]);
|
let group = u16::from_be_bytes([key_share_data[2], key_share_data[3]]);
|
||||||
let key_len = u16::from_be_bytes([key_share_data[4], key_share_data[5]]) as usize;
|
let key_len = u16::from_be_bytes([key_share_data[4], key_share_data[5]]) as usize;
|
||||||
let key = &key_share_data[6..6 + key_len];
|
let key = &key_share_data[6..6 + key_len];
|
||||||
assert_eq!(group, 0x001d, "key_share group must be x25519");
|
assert_eq!(group, 0x001d, "key_share group must be x25519");
|
||||||
assert_eq!(key_len, 32, "x25519 key length must be 32");
|
assert_eq!(key_len, 32, "x25519 key length must be 32");
|
||||||
assert!(key.iter().any(|b| *b != 0), "x25519 key must not be all zero");
|
assert!(
|
||||||
|
key.iter().any(|b| *b != 0),
|
||||||
|
"x25519 key must not be all zero"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
Reference in New Issue
Block a user