Merge remote-tracking branch 'original/main'

This commit is contained in:
AndreyAkifev 2026-02-16 10:20:53 +07:00
commit b9b8ec529c
14 changed files with 905 additions and 303 deletions

View File

@ -2,9 +2,9 @@ name: Rust
on:
push:
branches: [ main ]
branches: [ "*" ]
pull_request:
branches: [ main ]
branches: [ "*" ]
env:
CARGO_TERM_COLOR: always

View File

@ -2,42 +2,86 @@
**Telemt** is a fast, secure, and feature-rich server written in Rust: it fully implements the official Telegram proxy algo and adds many production-ready improvements such as connection pooling, replay protection, detailed statistics, masking from "prying" eyes
## Emergency
### RU
Многие из вас столкнулись с проблемой загрузки медиа из каналов с >100k subs...
## NEWS and EMERGENCY
### ✈️ Telemt 3 is released!
<table>
<tr>
<td width="50%" valign="top">
Мы уже знаем о проблеме: она связана с dc=203 - Telegram CDN и сейчас есть подтверждённое исправление...
### 🇷🇺 RU
🤐 ДОСТУПНО ТОЛЬКО В РЕЛИЗЕ 2.0.0.1 и последующих
15 февраля мы опубликовали `telemt 3` с поддержкой Middle-End Proxy, а значит:
Сейчас оно принимо через добавление в конфиг:
- с функциональными медиа, в том числе с CDN/DC=203
- с Ad-tag — показывайте спонсорский канал и собирайте статистику через официального бота
- с новым подходом к безопасности и асинхронности
- с высокоточной диагностикой криптографии через `ME_DIAG`
Для использования нужно:
1. Версия `telemt` ≥3.0.0
2. Выполнение любого из наборов условий:
- публичный IP для исходящих соединений установлен на интерфейса инстанса с `telemt`
- ЛИБО
- вы используете NAT 1:1 + включили STUN-пробинг
3. В конфиге, в секции `[general]` указать:
```toml
use_middle_proxy = true
```
Если условия из пункта 1 не выполняются:
1. Выключите ME-режим:
- установите `use_middle_proxy = false`
- ЛИБО
- Middle-End Proxy будет выключен автоматически по таймауту, но это займёт больше времени при запуске
2. В конфиге, добавьте в конец:
```toml
[dc_overrides]
"203" = "91.105.192.100:443"
```
Мы работаем над поиском всех адресов для каждого "нестандартного" DC...
Фикс вне конфига будет в релизе 2.0.0.2
Если у вас есть компетенции в асинхронных сетевых приложениях, анализе трафика, реверс-инжиниринге или сетевых расследованиях — мы открыты к идеям и pull requests.
Если у вас есть компетенции в асинхронных сетевых приложениях, анализе трафика, reverse engineering, network forensics - мы открыты к мыслям, предложениям, pull requests
</td>
<td width="50%" valign="top">
### EN
Many of you have encountered issues loading media from channels with over 100k subscribers…
### 🇬🇧 EN
Were already aware of the problem: its related to `dc=203` Telegram CDN and we now have a confirmed fix.
On February 15, we released `telemt 3` with support for Middle-End Proxy, which means:
🤐 AVAILABLE ONLY IN RELEASE 2.0.0.1 and later
- functional media, including CDN/DC=203
- Ad-tag support promote a sponsored channel and collect statistics via Telegram bot
- new approach to security and asynchronicity
- high-precision cryptography diagnostics via `ME_DIAG`
Currently, you can apply it by adding the following to your config:
To use this feature, the following requirements must be met:
1. `telemt` version ≥ 3.0.0
2. One of the following conditions satisfied:
- the instance running `telemt` has a public IP address assigned to its network interface for outbound connections
- OR
- you are using 1:1 NAT and have STUN probing enabled
3. In the config file, under the `[general]` section, specify:
```toml
use_middle_proxy = true
````
If the conditions from step 1 are not satisfied:
1. Disable Middle-End mode:
- set `use_middle_proxy = false`
- OR
- Middle-End Proxy will be disabled automatically after a timeout, but this will increase startup time
2. In the config file, add the following at the end:
```toml
[dc_overrides]
"203" = "91.105.192.100:443"
```
Were working on identifying all addresses for every “nonstandard” DC…
The fix will be included in release 2.0.0.2, no manual config needed.
If you have expertise in asynchronous network applications, traffic analysis, reverse engineering, or network forensics — we welcome ideas, suggestions, and pull requests.
If you have expertise in asynchronous network applications, traffic analysis, reverse engineering, or network forensics were open to ideas, suggestions, and pull requests.
</td>
</tr>
</table>
# Features
💥 The configuration structure has changed since version 1.1.0.0. change it in your environment!

View File

@ -5,8 +5,12 @@ services:
restart: unless-stopped
ports:
- "443:443"
# Allow caching 'proxy-secret' in read-only container
working_dir: /run/telemt
volumes:
- ./config.toml:/app/config.toml:ro
- ./config.toml:/run/telemt/config.toml:ro
tmpfs:
- /run/telemt:rw,mode=1777,size=1m
environment:
- RUST_LOG=info
# Uncomment this line if you want to use host network for IPv6, but bridge is default and usually better

View File

@ -3,6 +3,7 @@
use crate::error::{ProxyError, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde::de::Deserializer;
use std::collections::HashMap;
use std::net::IpAddr;
use std::path::Path;
@ -53,6 +54,40 @@ fn default_metrics_whitelist() -> Vec<IpAddr> {
vec!["127.0.0.1".parse().unwrap(), "::1".parse().unwrap()]
}
fn default_unknown_dc_log_path() -> Option<String> {
Some("unknown-dc.txt".to_string())
}
// ============= Custom Deserializers =============
#[derive(Deserialize)]
#[serde(untagged)]
enum OneOrMany {
One(String),
Many(Vec<String>),
}
fn deserialize_dc_overrides<'de, D>(
deserializer: D,
) -> std::result::Result<HashMap<String, Vec<String>>, D::Error>
where
D: Deserializer<'de>,
{
let raw: HashMap<String, OneOrMany> = HashMap::deserialize(deserializer)?;
let mut out = HashMap::new();
for (dc, val) in raw {
let mut addrs = match val {
OneOrMany::One(s) => vec![s],
OneOrMany::Many(v) => v,
};
addrs.retain(|s| !s.trim().is_empty());
if !addrs.is_empty() {
out.insert(dc, addrs);
}
}
Ok(out)
}
// ============= Log Level =============
/// Logging verbosity level
@ -95,6 +130,50 @@ impl LogLevel {
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn dc_overrides_allow_string_and_array() {
let toml = r#"
[dc_overrides]
"201" = "149.154.175.50:443"
"202" = ["149.154.167.51:443", "149.154.175.100:443"]
"#;
let cfg: ProxyConfig = toml::from_str(toml).unwrap();
assert_eq!(cfg.dc_overrides["201"], vec!["149.154.175.50:443"]);
assert_eq!(
cfg.dc_overrides["202"],
vec!["149.154.167.51:443", "149.154.175.100:443"]
);
}
#[test]
fn dc_overrides_inject_dc203_default() {
let toml = r#"
[general]
use_middle_proxy = false
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_dc_override_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert!(cfg
.dc_overrides
.get("203")
.map(|v| v.contains(&"91.105.192.100:443".to_string()))
.unwrap_or(false));
let _ = std::fs::remove_file(path);
}
}
impl std::fmt::Display for LogLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
@ -163,6 +242,14 @@ pub struct GeneralConfig {
#[serde(default)]
pub middle_proxy_nat_stun: Option<String>,
/// Ignore STUN/interface IP mismatch (keep using Middle Proxy even if NAT detected).
#[serde(default)]
pub stun_iface_mismatch_ignore: bool,
/// Log unknown (non-standard) DC requests to a file (default: unknown-dc.txt). Set to null to disable.
#[serde(default = "default_unknown_dc_log_path")]
pub unknown_dc_log_path: Option<String>,
#[serde(default)]
pub log_level: LogLevel,
@ -183,6 +270,8 @@ impl Default for GeneralConfig {
middle_proxy_nat_ip: None,
middle_proxy_nat_probe: false,
middle_proxy_nat_stun: None,
stun_iface_mismatch_ignore: false,
unknown_dc_log_path: default_unknown_dc_log_path(),
log_level: LogLevel::Normal,
disable_colors: false,
}
@ -499,13 +588,13 @@ pub struct ProxyConfig {
pub show_link: ShowLink,
/// DC address overrides for non-standard DCs (CDN, media, test, etc.)
/// Keys are DC indices as strings, values are "ip:port" addresses.
/// Keys are DC indices as strings, values are one or more \"ip:port\" addresses.
/// Matches the C implementation's `proxy_for <dc_id> <ip>:<port>` config directive.
/// Example in config.toml:
/// [dc_overrides]
/// "203" = "149.154.175.100:443"
#[serde(default)]
pub dc_overrides: HashMap<String, String>,
/// \"203\" = [\"149.154.175.100:443\", \"91.105.192.100:443\"]
#[serde(default, deserialize_with = "deserialize_dc_overrides")]
pub dc_overrides: HashMap<String, Vec<String>>,
/// Default DC index (1-5) for unmapped non-standard DCs.
/// Matches the C implementation's `default <dc_id>` config directive.
@ -599,6 +688,12 @@ impl ProxyConfig {
});
}
// Ensure default DC203 override is present.
config
.dc_overrides
.entry("203".to_string())
.or_insert_with(|| vec!["91.105.192.100:443".to_string()]);
Ok(config)
}

View File

@ -27,7 +27,7 @@ use crate::ip_tracker::UserIpTracker;
use crate::proxy::ClientHandler;
use crate::stats::{ReplayChecker, Stats};
use crate::stream::BufferPool;
use crate::transport::middle_proxy::{MePool, fetch_proxy_config};
use crate::transport::middle_proxy::{MePool, fetch_proxy_config, stun_probe};
use crate::transport::{ListenOptions, UpstreamManager, create_listener};
use crate::util::ip::detect_ip;
use crate::protocol::constants::{TG_MIDDLE_PROXIES_V4, TG_MIDDLE_PROXIES_V6};
@ -183,7 +183,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
}
let prefer_ipv6 = config.general.prefer_ipv6;
let use_middle_proxy = config.general.use_middle_proxy;
let mut use_middle_proxy = config.general.use_middle_proxy;
let config = Arc::new(config);
let stats = Arc::new(Stats::new());
let rng = Arc::new(SecureRandom::new());
@ -207,6 +207,31 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
// Connection concurrency limit
let _max_connections = Arc::new(Semaphore::new(10_000));
// STUN check before choosing transport
if use_middle_proxy {
match stun_probe(config.general.middle_proxy_nat_stun.clone()).await {
Ok(Some(probe)) => {
info!(
local_ip = %probe.local_addr.ip(),
reflected_ip = %probe.reflected_addr.ip(),
"STUN detected public address"
);
if probe.local_addr.ip() != probe.reflected_addr.ip()
&& !config.general.stun_iface_mismatch_ignore
{
warn!(
local_ip = %probe.local_addr.ip(),
reflected_ip = %probe.reflected_addr.ip(),
"STUN/interface IP mismatch; falling back to direct DC (set stun_iface_mismatch_ignore=true to force Middle Proxy)"
);
use_middle_proxy = false;
}
}
Ok(None) => warn!("STUN probe returned no address; continuing"),
Err(e) => warn!(error = %e, "STUN probe failed; continuing"),
}
}
// =====================================================================
// Middle Proxy initialization (if enabled)
// =====================================================================
@ -234,7 +259,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).await {
Ok(proxy_secret) => {
info!(
secret_len = proxy_secret.len(),
secret_len = proxy_secret.len() as usize, // ← ЯВНЫЙ ТИП usize
key_sig = format_args!(
"0x{:08x}",
if proxy_secret.len() >= 4 {
@ -330,21 +355,13 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
info!("Transport: Direct TCP (standard DCs only)");
}
// Startup DC ping (only meaningful in direct mode)
if me_pool.is_none() {
info!("================= Telegram DC Connectivity =================");
let ping_results = upstream_manager.ping_all_dcs(prefer_ipv6).await;
let ping_results = upstream_manager
.ping_all_dcs(prefer_ipv6, &config.dc_overrides)
.await;
for upstream_result in &ping_results {
// Show which IP version is in use and which is fallback
if upstream_result.both_available {
if prefer_ipv6 {
info!(" IPv6 in use and IPv4 is fallback");
} else {
info!(" IPv4 in use and IPv6 is fallback");
}
} else {
let v6_works = upstream_result
.v6_results
.iter()
@ -353,6 +370,14 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
.v4_results
.iter()
.any(|r| r.rtt_ms.is_some());
if upstream_result.both_available {
if prefer_ipv6 {
info!(" IPv6 in use and IPv4 is fallback");
} else {
info!(" IPv4 in use and IPv6 is fallback");
}
} else {
if v6_works && !v4_works {
info!(" IPv6 only (IPv4 unavailable)");
} else if v4_works && !v6_works {
@ -365,13 +390,12 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
info!(" via {}", upstream_result.upstream_name);
info!("============================================================");
// Print IPv6 results first
// Print IPv6 results first (only if IPv6 is available)
if v6_works {
for dc in &upstream_result.v6_results {
let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port());
match &dc.rtt_ms {
Some(rtt) => {
// Align: IPv6 addresses are longer, use fewer tabs
// [2001:b28:f23d:f001::a]:443 = ~28 chars
info!(" DC{} [IPv6] {}:\t\t{:.0} ms", dc.dc_idx, addr_str, rtt);
}
None => {
@ -382,14 +406,14 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
}
info!("============================================================");
}
// Print IPv4 results
// Print IPv4 results (only if IPv4 is available)
if v4_works {
for dc in &upstream_result.v4_results {
let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port());
match &dc.rtt_ms {
Some(rtt) => {
// Align: IPv4 addresses are shorter, use more tabs
// 149.154.175.50:443 = ~18 chars
info!(
" DC{} [IPv4] {}:\t\t\t\t{:.0} ms",
dc.dc_idx, addr_str, rtt
@ -558,6 +582,62 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
});
}
#[cfg(unix)]
if let Some(ref unix_path) = config.server.listen_unix_sock {
use tokio::net::UnixListener; // ← добавь импорт, если его нет выше
// Удаляем старые файлы сокета, если они есть (стандартная практика)
let _ = tokio::fs::remove_file(unix_path).await;
let unix_listener = UnixListener::bind(unix_path)?;
info!("Listening on unix:{}", unix_path);
let config = config.clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone();
let buffer_pool = buffer_pool.clone();
let rng = rng.clone();
let me_pool = me_pool.clone();
let ip_tracker = ip_tracker.clone();
tokio::spawn(async move {
let unix_conn_counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1));
loop {
match unix_listener.accept().await {
Ok((stream, _)) => {
let conn_id = unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let fake_peer = SocketAddr::from(([127, 0, 0, 1], (conn_id % 65535) as u16)); // безопасный порт
let config = config.clone();
let stats = stats.clone();
let upstream_manager = upstream_manager.clone();
let replay_checker = replay_checker.clone();
let buffer_pool = buffer_pool.clone();
let rng = rng.clone();
let me_pool = me_pool.clone();
let ip_tracker = ip_tracker.clone();
tokio::spawn(async move {
if let Err(e) = crate::proxy::client::handle_client_stream(
stream, fake_peer, config, stats,
upstream_manager, replay_checker, buffer_pool, rng,
me_pool, ip_tracker,
).await {
debug!(error = %e, "Unix socket connection error");
}
});
}
Err(e) => {
error!("Unix socket accept error: {}", e);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
});
}
match signal::ctrl_c().await {
Ok(()) => info!("Shutting down..."),
Err(e) => error!("Signal error: {}", e),

View File

@ -1,6 +1,8 @@
//! Client Handler
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
@ -8,6 +10,17 @@ use tokio::net::TcpStream;
use tokio::time::timeout;
use tracing::{debug, warn};
/// Post-handshake future (relay phase, runs outside handshake timeout)
type PostHandshakeFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
/// Result of the handshake phase
enum HandshakeOutcome {
/// Handshake succeeded, relay work to do (outside timeout)
NeedsRelay(PostHandshakeFuture),
/// Already fully handled (bad client masking, etc.)
Handled,
}
use crate::config::ProxyConfig;
use crate::crypto::SecureRandom;
use crate::error::{HandshakeResult, ProxyError, Result};
@ -24,6 +37,160 @@ use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle
use crate::proxy::masking::handle_bad_client;
use crate::proxy::middle_relay::handle_via_middle_proxy;
pub async fn handle_client_stream<S>(
mut stream: S,
peer: SocketAddr,
config: Arc<ProxyConfig>,
stats: Arc<Stats>,
upstream_manager: Arc<UpstreamManager>,
replay_checker: Arc<ReplayChecker>,
buffer_pool: Arc<BufferPool>,
rng: Arc<SecureRandom>,
me_pool: Option<Arc<MePool>>,
ip_tracker: Arc<UserIpTracker>,
) -> Result<()>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
stats.increment_connects_all();
debug!(peer = %peer, "New connection (generic stream)");
let handshake_timeout = Duration::from_secs(config.timeouts.client_handshake);
let stats_for_timeout = stats.clone();
// For non-TCP streams, use a synthetic local address
let local_addr: SocketAddr = format!("0.0.0.0:{}", config.server.port)
.parse()
.unwrap_or_else(|_| "0.0.0.0:443".parse().unwrap());
// Phase 1: handshake (with timeout)
let outcome = match timeout(handshake_timeout, async {
let mut first_bytes = [0u8; 5];
stream.read_exact(&mut first_bytes).await?;
let is_tls = tls::is_tls_handshake(&first_bytes[..3]);
debug!(peer = %peer, is_tls = is_tls, "Handshake type detected");
if is_tls {
let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize;
if tls_len < 512 {
debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short");
stats.increment_connects_bad();
let (reader, writer) = tokio::io::split(stream);
handle_bad_client(reader, writer, &first_bytes, &config).await;
return Ok(HandshakeOutcome::Handled);
}
let mut handshake = vec![0u8; 5 + tls_len];
handshake[..5].copy_from_slice(&first_bytes);
stream.read_exact(&mut handshake[5..]).await?;
let (read_half, write_half) = tokio::io::split(stream);
let (mut tls_reader, tls_writer, _tls_user) = match handle_tls_handshake(
&handshake, read_half, write_half, peer,
&config, &replay_checker, &rng,
).await {
HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad();
handle_bad_client(reader, writer, &handshake, &config).await;
return Ok(HandshakeOutcome::Handled);
}
HandshakeResult::Error(e) => return Err(e),
};
debug!(peer = %peer, "Reading MTProto handshake through TLS");
let mtproto_data = tls_reader.read_exact(HANDSHAKE_LEN).await?;
let mtproto_handshake: [u8; HANDSHAKE_LEN] = mtproto_data[..].try_into()
.map_err(|_| ProxyError::InvalidHandshake("Short MTProto handshake".into()))?;
let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake(
&mtproto_handshake, tls_reader, tls_writer, peer,
&config, &replay_checker, true,
).await {
HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader: _, writer: _ } => {
stats.increment_connects_bad();
debug!(peer = %peer, "Valid TLS but invalid MTProto handshake");
return Ok(HandshakeOutcome::Handled);
}
HandshakeResult::Error(e) => return Err(e),
};
Ok(HandshakeOutcome::NeedsRelay(Box::pin(
RunningClientHandler::handle_authenticated_static(
crypto_reader, crypto_writer, success,
upstream_manager, stats, config, buffer_pool, rng, me_pool,
local_addr, peer, ip_tracker.clone(),
),
)))
} else {
if !config.general.modes.classic && !config.general.modes.secure {
debug!(peer = %peer, "Non-TLS modes disabled");
stats.increment_connects_bad();
let (reader, writer) = tokio::io::split(stream);
handle_bad_client(reader, writer, &first_bytes, &config).await;
return Ok(HandshakeOutcome::Handled);
}
let mut handshake = [0u8; HANDSHAKE_LEN];
handshake[..5].copy_from_slice(&first_bytes);
stream.read_exact(&mut handshake[5..]).await?;
let (read_half, write_half) = tokio::io::split(stream);
let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake(
&handshake, read_half, write_half, peer,
&config, &replay_checker, false,
).await {
HandshakeResult::Success(result) => result,
HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad();
handle_bad_client(reader, writer, &handshake, &config).await;
return Ok(HandshakeOutcome::Handled);
}
HandshakeResult::Error(e) => return Err(e),
};
Ok(HandshakeOutcome::NeedsRelay(Box::pin(
RunningClientHandler::handle_authenticated_static(
crypto_reader,
crypto_writer,
success,
upstream_manager,
stats,
config,
buffer_pool,
rng,
me_pool,
local_addr,
peer,
ip_tracker.clone(),
)
)))
}
}).await {
Ok(Ok(outcome)) => outcome,
Ok(Err(e)) => {
debug!(peer = %peer, error = %e, "Handshake failed");
return Err(e);
}
Err(_) => {
stats_for_timeout.increment_handshake_timeouts();
debug!(peer = %peer, "Handshake timeout");
return Err(ProxyError::TgHandshakeTimeout);
}
};
// Phase 2: relay (WITHOUT handshake timeout — relay has its own activity timeouts)
match outcome {
HandshakeOutcome::NeedsRelay(fut) => fut.await,
HandshakeOutcome::Handled => Ok(()),
}
}
pub struct ClientHandler;
pub struct RunningClientHandler {
@ -72,6 +239,7 @@ impl RunningClientHandler {
self.stats.increment_connects_all();
let peer = self.peer;
let ip_tracker = self.ip_tracker.clone();
debug!(peer = %peer, "New connection");
if let Err(e) = configure_client_socket(
@ -85,31 +253,34 @@ impl RunningClientHandler {
let handshake_timeout = Duration::from_secs(self.config.timeouts.client_handshake);
let stats = self.stats.clone();
let result = timeout(handshake_timeout, self.do_handshake()).await;
match result {
Ok(Ok(())) => {
debug!(peer = %peer, "Connection handled successfully");
Ok(())
}
// Phase 1: handshake (with timeout)
let outcome = match timeout(handshake_timeout, self.do_handshake()).await {
Ok(Ok(outcome)) => outcome,
Ok(Err(e)) => {
debug!(peer = %peer, error = %e, "Handshake failed");
Err(e)
return Err(e);
}
Err(_) => {
stats.increment_handshake_timeouts();
debug!(peer = %peer, "Handshake timeout");
Err(ProxyError::TgHandshakeTimeout)
return Err(ProxyError::TgHandshakeTimeout);
}
};
// Phase 2: relay (WITHOUT handshake timeout — relay has its own activity timeouts)
match outcome {
HandshakeOutcome::NeedsRelay(fut) => fut.await,
HandshakeOutcome::Handled => Ok(()),
}
}
async fn do_handshake(mut self) -> Result<()> {
async fn do_handshake(mut self) -> Result<HandshakeOutcome> {
let mut first_bytes = [0u8; 5];
self.stream.read_exact(&mut first_bytes).await?;
let is_tls = tls::is_tls_handshake(&first_bytes[..3]);
let peer = self.peer;
let ip_tracker = self.ip_tracker.clone();
debug!(peer = %peer, is_tls = is_tls, "Handshake type detected");
@ -120,8 +291,9 @@ impl RunningClientHandler {
}
}
async fn handle_tls_client(mut self, first_bytes: [u8; 5]) -> Result<()> {
async fn handle_tls_client(mut self, first_bytes: [u8; 5]) -> Result<HandshakeOutcome> {
let peer = self.peer;
let ip_tracker = self.ip_tracker.clone();
let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize;
@ -132,7 +304,7 @@ impl RunningClientHandler {
self.stats.increment_connects_bad();
let (reader, writer) = self.stream.into_split();
handle_bad_client(reader, writer, &first_bytes, &self.config).await;
return Ok(());
return Ok(HandshakeOutcome::Handled);
}
let mut handshake = vec![0u8; 5 + tls_len];
@ -162,7 +334,7 @@ impl RunningClientHandler {
HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad();
handle_bad_client(reader, writer, &handshake, &config).await;
return Ok(());
return Ok(HandshakeOutcome::Handled);
}
HandshakeResult::Error(e) => return Err(e),
};
@ -191,11 +363,12 @@ impl RunningClientHandler {
} => {
stats.increment_connects_bad();
debug!(peer = %peer, "Valid TLS but invalid MTProto handshake");
return Ok(());
return Ok(HandshakeOutcome::Handled);
}
HandshakeResult::Error(e) => return Err(e),
};
Ok(HandshakeOutcome::NeedsRelay(Box::pin(
Self::handle_authenticated_static(
crypto_reader,
crypto_writer,
@ -209,19 +382,20 @@ impl RunningClientHandler {
local_addr,
peer,
self.ip_tracker,
)
.await
),
)))
}
async fn handle_direct_client(mut self, first_bytes: [u8; 5]) -> Result<()> {
async fn handle_direct_client(mut self, first_bytes: [u8; 5]) -> Result<HandshakeOutcome> {
let peer = self.peer;
let ip_tracker = self.ip_tracker.clone();
if !self.config.general.modes.classic && !self.config.general.modes.secure {
debug!(peer = %peer, "Non-TLS modes disabled");
self.stats.increment_connects_bad();
let (reader, writer) = self.stream.into_split();
handle_bad_client(reader, writer, &first_bytes, &self.config).await;
return Ok(());
return Ok(HandshakeOutcome::Handled);
}
let mut handshake = [0u8; HANDSHAKE_LEN];
@ -251,11 +425,12 @@ impl RunningClientHandler {
HandshakeResult::BadClient { reader, writer } => {
stats.increment_connects_bad();
handle_bad_client(reader, writer, &handshake, &config).await;
return Ok(());
return Ok(HandshakeOutcome::Handled);
}
HandshakeResult::Error(e) => return Err(e),
};
Ok(HandshakeOutcome::NeedsRelay(Box::pin(
Self::handle_authenticated_static(
crypto_reader,
crypto_writer,
@ -269,8 +444,8 @@ impl RunningClientHandler {
local_addr,
peer,
self.ip_tracker,
)
.await
),
)))
}
/// Main dispatch after successful handshake.

View File

@ -1,3 +1,5 @@
use std::fs::OpenOptions;
use std::io::Write;
use std::net::SocketAddr;
use std::sync::Arc;
@ -87,17 +89,25 @@ fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result<SocketAddr> {
let num_dcs = datacenters.len();
let dc_key = dc_idx.to_string();
if let Some(addr_str) = config.dc_overrides.get(&dc_key) {
if let Some(addrs) = config.dc_overrides.get(&dc_key) {
let prefer_v6 = config.general.prefer_ipv6;
let mut parsed = Vec::new();
for addr_str in addrs {
match addr_str.parse::<SocketAddr>() {
Ok(addr) => {
debug!(dc_idx = dc_idx, addr = %addr, "Using DC override from config");
Ok(addr) => parsed.push(addr),
Err(_) => warn!(dc_idx = dc_idx, addr_str = %addr_str, "Invalid DC override address in config, ignoring"),
}
}
if let Some(addr) = parsed
.iter()
.find(|a| a.is_ipv6() == prefer_v6)
.or_else(|| parsed.first())
.copied()
{
debug!(dc_idx = dc_idx, addr = %addr, count = parsed.len(), "Using DC override from config");
return Ok(addr);
}
Err(_) => {
warn!(dc_idx = dc_idx, addr_str = %addr_str,
"Invalid DC override address in config, ignoring");
}
}
}
let abs_dc = dc_idx.unsigned_abs() as usize;
@ -105,6 +115,16 @@ fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result<SocketAddr> {
return Ok(SocketAddr::new(datacenters[abs_dc - 1], TG_DATACENTER_PORT));
}
// Unknown DC requested by client without override: log and fall back.
if !config.dc_overrides.contains_key(&dc_key) {
warn!(dc_idx = dc_idx, "Requested non-standard DC with no override; falling back to default cluster");
if let Some(path) = &config.general.unknown_dc_log_path {
if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) {
let _ = writeln!(file, "dc_idx={dc_idx}");
}
}
}
let default_dc = config.default_dc.unwrap_or(2) as usize;
let fallback_idx = if default_dc >= 1 && default_dc <= num_dcs {
default_dc - 1

View File

@ -15,6 +15,7 @@ use bytes::Bytes;
pub use health::me_health_monitor;
pub use pool::MePool;
pub use pool_nat::{stun_probe, StunProbeResult};
pub use registry::ConnRegistry;
pub use secret::fetch_proxy_secret;
pub use config_updater::{fetch_proxy_config, me_config_updater};

View File

@ -6,6 +6,17 @@ use crate::error::{ProxyError, Result};
use super::MePool;
#[derive(Debug, Clone, Copy)]
pub struct StunProbeResult {
pub local_addr: std::net::SocketAddr,
pub reflected_addr: std::net::SocketAddr,
}
pub async fn stun_probe(stun_addr: Option<String>) -> Result<Option<StunProbeResult>> {
let stun_addr = stun_addr.unwrap_or_else(|| "stun.l.google.com:19302".to_string());
fetch_stun_binding(&stun_addr).await
}
impl MePool {
pub(super) fn translate_ip_for_nat(&self, ip: IpAddr) -> IpAddr {
let nat_ip = self
@ -88,10 +99,12 @@ impl MePool {
.unwrap_or_else(|| "stun.l.google.com:19302".to_string());
match fetch_stun_binding(&stun_addr).await {
Ok(sa) => {
if let Some(sa) = sa {
info!(%sa, "NAT probe: reflected address");
if let Some(result) = sa {
info!(local = %result.local_addr, reflected = %result.reflected_addr, "NAT probe: reflected address");
Some(result.reflected_addr)
} else {
None
}
sa
}
Err(e) => {
warn!(error = %e, "NAT probe failed");
@ -128,7 +141,7 @@ async fn fetch_public_ipv4_once(url: &str) -> Result<Option<Ipv4Addr>> {
Ok(ip)
}
async fn fetch_stun_binding(stun_addr: &str) -> Result<Option<std::net::SocketAddr>> {
async fn fetch_stun_binding(stun_addr: &str) -> Result<Option<StunProbeResult>> {
use rand::RngCore;
use tokio::net::UdpSocket;
@ -196,10 +209,17 @@ async fn fetch_stun_binding(stun_addr: &str) -> Result<Option<std::net::SocketAd
} else {
(u16::from_be_bytes(port_bytes), ip_bytes)
};
return Ok(Some(std::net::SocketAddr::new(
let reflected = std::net::SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3])),
port,
)));
);
let local_addr = socket.local_addr().map_err(|e| {
ProxyError::Proxy(format!("STUN local_addr failed: {e}"))
})?;
return Ok(Some(StunProbeResult {
local_addr,
reflected_addr: reflected,
}));
}
_ => {}
}

View File

@ -285,12 +285,17 @@ where
#[cfg(test)]
mod tests {
use super::*;
use std::io::ErrorKind;
use tokio::net::TcpListener;
#[tokio::test]
async fn test_pool_basic() {
// Start a test server
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let listener = match TcpListener::bind("127.0.0.1:0").await {
Ok(l) => l,
Err(e) if e.kind() == ErrorKind::PermissionDenied => return,
Err(e) => panic!("bind failed: {e}"),
};
let addr = listener.local_addr().unwrap();
// Accept connections in background
@ -303,7 +308,11 @@ mod tests {
let pool = ConnectionPool::new();
// Get a connection
let conn1 = pool.get(addr).await.unwrap();
let conn1 = match pool.get(addr).await {
Ok(c) => c,
Err(ProxyError::Io(e)) if e.kind() == ErrorKind::PermissionDenied => return,
Err(e) => panic!("connect failed: {e}"),
};
// Return it to pool
pool.put(addr, conn1).await;

View File

@ -205,15 +205,29 @@ pub fn create_listener(addr: SocketAddr, options: &ListenOptions) -> Result<Sock
#[cfg(test)]
mod tests {
use super::*;
use std::io::ErrorKind;
use tokio::net::TcpListener;
#[tokio::test]
async fn test_configure_socket() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let listener = match TcpListener::bind("127.0.0.1:0").await {
Ok(l) => l,
Err(e) if e.kind() == ErrorKind::PermissionDenied => return,
Err(e) => panic!("bind failed: {e}"),
};
let addr = listener.local_addr().unwrap();
let stream = TcpStream::connect(addr).await.unwrap();
configure_tcp_socket(&stream, true, Duration::from_secs(30)).unwrap();
let stream = match TcpStream::connect(addr).await {
Ok(s) => s,
Err(e) if e.kind() == ErrorKind::PermissionDenied => return,
Err(e) => panic!("connect failed: {e}"),
};
if let Err(e) = configure_tcp_socket(&stream, true, Duration::from_secs(30)) {
if e.kind() == ErrorKind::PermissionDenied {
return;
}
panic!("configure_tcp_socket failed: {e}");
}
}
#[test]

View File

@ -2,6 +2,7 @@
//!
//! IPv6/IPv4 connectivity checks with configurable preference.
use std::collections::HashMap;
use std::net::{SocketAddr, IpAddr};
use std::sync::Arc;
use std::time::Duration;
@ -350,7 +351,11 @@ impl UpstreamManager {
/// Ping all Telegram DCs through all upstreams.
/// Tests BOTH IPv6 and IPv4, returns separate results for each.
pub async fn ping_all_dcs(&self, prefer_ipv6: bool) -> Vec<StartupPingResult> {
pub async fn ping_all_dcs(
&self,
prefer_ipv6: bool,
dc_overrides: &HashMap<String, Vec<String>>,
) -> Vec<StartupPingResult> {
let upstreams: Vec<(usize, UpstreamConfig)> = {
let guard = self.upstreams.read().await;
guard.iter().enumerate()
@ -450,6 +455,58 @@ impl UpstreamManager {
v4_results.push(ping_result);
}
// === Ping DC overrides (v4/v6) ===
for (dc_key, addrs) in dc_overrides {
let dc_num: i16 = match dc_key.parse::<i16>() {
Ok(v) if v > 0 => v,
Err(_) => {
warn!(dc = %dc_key, "Invalid dc_overrides key, skipping");
continue;
},
_ => continue,
};
let dc_idx = dc_num as usize;
for addr_str in addrs {
match addr_str.parse::<SocketAddr>() {
Ok(addr) => {
let is_v6 = addr.is_ipv6();
let result = tokio::time::timeout(
Duration::from_secs(DC_PING_TIMEOUT_SECS),
self.ping_single_dc(&upstream_config, addr)
).await;
let ping_result = match result {
Ok(Ok(rtt_ms)) => DcPingResult {
dc_idx,
dc_addr: addr,
rtt_ms: Some(rtt_ms),
error: None,
},
Ok(Err(e)) => DcPingResult {
dc_idx,
dc_addr: addr,
rtt_ms: None,
error: Some(e.to_string()),
},
Err(_) => DcPingResult {
dc_idx,
dc_addr: addr,
rtt_ms: None,
error: Some("timeout".to_string()),
},
};
if is_v6 {
v6_results.push(ping_result);
} else {
v4_results.push(ping_result);
}
}
Err(_) => warn!(dc = %dc_idx, addr = %addr_str, "Invalid dc_overrides address, skipping"),
}
}
}
// Check if both IP versions have at least one working DC
let v6_has_working = v6_results.iter().any(|r| r.rtt_ms.is_some());
let v4_has_working = v4_results.iter().any(|r| r.rtt_ms.is_some());

BIN
telemt

Binary file not shown.

View File

@ -1,121 +1,204 @@
"""Telegram datacenter server checker."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from itertools import groupby
from operator import attrgetter
from pathlib import Path
from typing import TYPE_CHECKING
from telethon import TelegramClient
from telethon.tl.functions.help import GetConfigRequest
import asyncio
api_id = ''
api_hash = ''
if TYPE_CHECKING:
from telethon.tl.types import DcOption
async def get_all_servers():
print("🔄 Подключаемся к Telegram...")
client = TelegramClient('session', api_id, api_hash)
API_ID: int = 123456
API_HASH: str = ""
SESSION_NAME: str = "session"
OUTPUT_FILE: Path = Path("telegram_servers.txt")
await client.start()
print("✅ Подключение установлено!\n")
_CONSOLE_FLAG_MAP: dict[str, str] = {
"IPv6": "IPv6",
"MEDIA-ONLY": "🎬 MEDIA-ONLY",
"CDN": "📦 CDN",
"TCPO": "🔒 TCPO",
"STATIC": "📌 STATIC",
}
print("📡 Запрашиваем конфигурацию серверов...")
config = await client(GetConfigRequest())
print(f"📊 Получено серверов: {len(config.dc_options)}\n")
print("="*80)
@dataclass(frozen=True, slots=True)
class DCServer:
"""Typed representation of a Telegram DC server.
# Группируем серверы по DC ID
dc_groups = {}
for dc in config.dc_options:
if dc.id not in dc_groups:
dc_groups[dc.id] = []
dc_groups[dc.id].append(dc)
Attributes:
dc_id: Datacenter identifier.
ip: Server IP address.
port: Server port.
flags: Active flag labels (plain, without emoji).
"""
# Выводим все серверы, сгруппированные по DC
for dc_id in sorted(dc_groups.keys()):
servers = dc_groups[dc_id]
print(f"\n🌐 DATACENTER {dc_id} ({len(servers)} серверов)")
print("-" * 80)
dc_id: int
ip: str
port: int
flags: frozenset[str] = field(default_factory=frozenset)
for dc in servers:
# Собираем флаги
flags = []
if dc.ipv6:
flags.append("IPv6")
if dc.media_only:
flags.append("🎬 MEDIA-ONLY")
if dc.cdn:
flags.append("📦 CDN")
if dc.tcpo_only:
flags.append("🔒 TCPO")
if dc.static:
flags.append("📌 STATIC")
@classmethod
def from_option(cls, dc: DcOption) -> DCServer:
"""Create from a Telethon DcOption.
flags_str = f" [{', '.join(flags)}]" if flags else " [STANDARD]"
Args:
dc: Raw DcOption object.
# Форматируем IP (выравниваем для читаемости)
ip_display = f"{dc.ip_address:45}"
Returns:
Parsed DCServer instance.
"""
checks: dict[str, bool] = {
"IPv6": dc.ipv6,
"MEDIA-ONLY": dc.media_only,
"CDN": dc.cdn,
"TCPO": dc.tcpo_only,
"STATIC": dc.static,
}
return cls(
dc_id=dc.id,
ip=dc.ip_address,
port=dc.port,
flags=frozenset(k for k, v in checks.items() if v),
)
print(f" {ip_display}:{dc.port:5}{flags_str}")
def flags_display(self, *, emoji: bool = False) -> str:
"""Formatted flags string.
# Статистика
print("\n" + "="*80)
print("📈 СТАТИСТИКА:")
print("="*80)
Args:
emoji: Whether to include emoji prefixes.
total = len(config.dc_options)
ipv4_count = sum(1 for dc in config.dc_options if not dc.ipv6)
ipv6_count = sum(1 for dc in config.dc_options if dc.ipv6)
media_count = sum(1 for dc in config.dc_options if dc.media_only)
cdn_count = sum(1 for dc in config.dc_options if dc.cdn)
tcpo_count = sum(1 for dc in config.dc_options if dc.tcpo_only)
static_count = sum(1 for dc in config.dc_options if dc.static)
Returns:
Bracketed flags or '[STANDARD]'.
"""
if not self.flags:
return "[STANDARD]"
labels = sorted(
_CONSOLE_FLAG_MAP[f] if emoji else f for f in self.flags
)
return f"[{', '.join(labels)}]"
print(f" Всего серверов: {total}")
print(f" IPv4 серверы: {ipv4_count}")
print(f" IPv6 серверы: {ipv6_count}")
print(f" Media-only: {media_count}")
print(f" CDN серверы: {cdn_count}")
print(f" TCPO-only: {tcpo_count}")
print(f" Static: {static_count}")
# Дополнительная информация из config
print("\n" + "="*80)
print(" ДОПОЛНИТЕЛЬНАЯ ИНФОРМАЦИЯ:")
print("="*80)
print(f" Дата конфигурации: {config.date}")
print(f" Expires: {config.expires}")
print(f" Test mode: {config.test_mode}")
print(f" This DC: {config.this_dc}")
class TelegramDCChecker:
"""Fetches and displays Telegram DC configuration.
# Сохраняем в файл
print("\n💾 Сохраняем результаты в файл telegram_servers.txt...")
with open('telegram_servers.txt', 'w', encoding='utf-8') as f:
f.write("TELEGRAM DATACENTER SERVERS\n")
f.write("="*80 + "\n\n")
Attributes:
_client: Telethon client instance.
_servers: Parsed server list.
"""
for dc_id in sorted(dc_groups.keys()):
servers = dc_groups[dc_id]
f.write(f"\nDATACENTER {dc_id} ({len(servers)} servers)\n")
f.write("-" * 80 + "\n")
def __init__(self) -> None:
"""Initialize the checker."""
self._client = TelegramClient(SESSION_NAME, API_ID, API_HASH)
self._servers: list[DCServer] = []
for dc in servers:
flags = []
if dc.ipv6:
flags.append("IPv6")
if dc.media_only:
flags.append("MEDIA-ONLY")
if dc.cdn:
flags.append("CDN")
if dc.tcpo_only:
flags.append("TCPO")
if dc.static:
flags.append("STATIC")
async def run(self) -> None:
"""Connect, fetch config, display and save results."""
print("🔄 Подключаемся к Telegram...") # noqa: T201
try:
await self._client.start()
print("✅ Подключение установлено!\n") # noqa: T201
flags_str = f" [{', '.join(flags)}]" if flags else " [STANDARD]"
f.write(f" {dc.ip_address}:{dc.port}{flags_str}\n")
print("📡 Запрашиваем конфигурацию серверов...") # noqa: T201
config = await self._client(GetConfigRequest())
self._servers = [DCServer.from_option(dc) for dc in config.dc_options]
f.write(f"\n\nTotal servers: {total}\n")
f.write(f"Generated: {config.date}\n")
self._print(config)
self._save(config)
finally:
await self._client.disconnect()
print("\n👋 Отключились от Telegram") # noqa: T201
print("✅ Результаты сохранены в telegram_servers.txt")
def _grouped(self) -> dict[int, list[DCServer]]:
"""Group servers by DC ID.
await client.disconnect()
print("\n👋 Отключились от Telegram")
Returns:
Ordered mapping of DC ID to servers.
"""
ordered = sorted(self._servers, key=attrgetter("dc_id"))
return {k: list(g) for k, g in groupby(ordered, key=attrgetter("dc_id"))}
if __name__ == '__main__':
asyncio.run(get_all_servers())
def _print(self, config: object) -> None:
"""Print results to stdout in original format.
Args:
config: Raw Telegram config.
"""
sep = "=" * 80
dash = "-" * 80
total = len(self._servers)
print(f"📊 Получено серверов: {total}\n") # noqa: T201
print(sep) # noqa: T201
for dc_id, servers in self._grouped().items():
print(f"\n🌐 DATACENTER {dc_id} ({len(servers)} серверов)") # noqa: T201
print(dash) # noqa: T201
for s in servers:
print(f" {s.ip:45}:{s.port:5} {s.flags_display(emoji=True)}") # noqa: T201
ipv4 = total - self._flag_count("IPv6")
print(f"\n{sep}") # noqa: T201
print("📈 СТАТИСТИКА:") # noqa: T201
print(sep) # noqa: T201
print(f" Всего серверов: {total}") # noqa: T201
print(f" IPv4 серверы: {ipv4}") # noqa: T201
print(f" IPv6 серверы: {self._flag_count('IPv6')}") # noqa: T201
print(f" Media-only: {self._flag_count('MEDIA-ONLY')}") # noqa: T201
print(f" CDN серверы: {self._flag_count('CDN')}") # noqa: T201
print(f" TCPO-only: {self._flag_count('TCPO')}") # noqa: T201
print(f" Static: {self._flag_count('STATIC')}") # noqa: T201
print(f"\n{sep}") # noqa: T201
print(" ДОПОЛНИТЕЛЬНАЯ ИНФОРМАЦИЯ:") # noqa: T201
print(sep) # noqa: T201
print(f" Дата конфигурации: {config.date}") # noqa: T201 # type: ignore[attr-defined]
print(f" Expires: {config.expires}") # noqa: T201 # type: ignore[attr-defined]
print(f" Test mode: {config.test_mode}") # noqa: T201 # type: ignore[attr-defined]
print(f" This DC: {config.this_dc}") # noqa: T201 # type: ignore[attr-defined]
def _flag_count(self, flag: str) -> int:
"""Count servers with a given flag.
Args:
flag: Flag name.
Returns:
Count of matching servers.
"""
return sum(1 for s in self._servers if flag in s.flags)
def _save(self, config: object) -> None:
"""Save results to file in original format.
Args:
config: Raw Telegram config.
"""
parts: list[str] = []
parts.append("TELEGRAM DATACENTER SERVERS\n")
parts.append("=" * 80 + "\n\n")
for dc_id, servers in self._grouped().items():
parts.append(f"\nDATACENTER {dc_id} ({len(servers)} servers)\n")
parts.append("-" * 80 + "\n")
for s in servers:
parts.append(f" {s.ip}:{s.port} {s.flags_display(emoji=False)}\n")
parts.append(f"\n\nTotal servers: {len(self._servers)}\n")
parts.append(f"Generated: {config.date}\n") # type: ignore[attr-defined]
OUTPUT_FILE.write_text("".join(parts), encoding="utf-8")
print(f"\n💾 Сохраняем результаты в файл {OUTPUT_FILE}...") # noqa: T201
print(f"✅ Результаты сохранены в {OUTPUT_FILE}") # noqa: T201
if __name__ == "__main__":
asyncio.run(TelegramDCChecker().run())