diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 7945e70..71f9f4e 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -2,9 +2,9 @@ name: Rust on: push: - branches: [ main ] + branches: [ "*" ] pull_request: - branches: [ main ] + branches: [ "*" ] env: CARGO_TERM_COLOR: always diff --git a/README.md b/README.md index f6d07cf..b1a20e2 100644 --- a/README.md +++ b/README.md @@ -2,42 +2,86 @@ **Telemt** is a fast, secure, and feature-rich server written in Rust: it fully implements the official Telegram proxy algo and adds many production-ready improvements such as connection pooling, replay protection, detailed statistics, masking from "prying" eyes -## Emergency -### RU -Многие из вас столкнулись с проблемой загрузки медиа из каналов с >100k subs... +## NEWS and EMERGENCY +### ✈️ Telemt 3 is released! + + + + + +
-Мы уже знаем о проблеме: она связана с dc=203 - Telegram CDN и сейчас есть подтверждённое исправление... +### 🇷🇺 RU -🤐 ДОСТУПНО ТОЛЬКО В РЕЛИЗЕ 2.0.0.1 и последующих +15 февраля мы опубликовали `telemt 3` с поддержкой Middle-End Proxy, а значит: -Сейчас оно принимо через добавление в конфиг: +- с функциональными медиа, в том числе с CDN/DC=203 +- с Ad-tag — показывайте спонсорский канал и собирайте статистику через официального бота +- с новым подходом к безопасности и асинхронности +- с высокоточной диагностикой криптографии через `ME_DIAG` + +Для использования нужно: + +1. Версия `telemt` ≥3.0.0 +2. Выполнение любого из наборов условий: + - публичный IP для исходящих соединений установлен на интерфейса инстанса с `telemt` + - ЛИБО + - вы используете NAT 1:1 + включили STUN-пробинг +3. В конфиге, в секции `[general]` указать: +```toml +use_middle_proxy = true +``` + +Если условия из пункта 1 не выполняются: +1. Выключите ME-режим: + - установите `use_middle_proxy = false` + - ЛИБО + - Middle-End Proxy будет выключен автоматически по таймауту, но это займёт больше времени при запуске +2. В конфиге, добавьте в конец: ```toml [dc_overrides] "203" = "91.105.192.100:443" ``` -Мы работаем над поиском всех адресов для каждого "нестандартного" DC... -Фикс вне конфига будет в релизе 2.0.0.2 +Если у вас есть компетенции в асинхронных сетевых приложениях, анализе трафика, реверс-инжиниринге или сетевых расследованиях — мы открыты к идеям и pull requests. -Если у вас есть компетенции в асинхронных сетевых приложениях, анализе трафика, reverse engineering, network forensics - мы открыты к мыслям, предложениям, pull requests + -### EN -Many of you have encountered issues loading media from channels with over 100k subscribers… +### 🇬🇧 EN -We’re already aware of the problem: it’s related to `dc=203` – Telegram CDN – and we now have a confirmed fix. +On February 15, we released `telemt 3` with support for Middle-End Proxy, which means: -🤐 AVAILABLE ONLY IN RELEASE 2.0.0.1 and later +- functional media, including CDN/DC=203 +- Ad-tag support – promote a sponsored channel and collect statistics via Telegram bot +- new approach to security and asynchronicity +- high-precision cryptography diagnostics via `ME_DIAG` -Currently, you can apply it by adding the following to your config: +To use this feature, the following requirements must be met: +1. `telemt` version ≥ 3.0.0 +2. One of the following conditions satisfied: + - the instance running `telemt` has a public IP address assigned to its network interface for outbound connections + - OR + - you are using 1:1 NAT and have STUN probing enabled +3. In the config file, under the `[general]` section, specify: +```toml +use_middle_proxy = true +```` + +If the conditions from step 1 are not satisfied: +1. Disable Middle-End mode: + - set `use_middle_proxy = false` + - OR + - Middle-End Proxy will be disabled automatically after a timeout, but this will increase startup time + +2. In the config file, add the following at the end: ```toml [dc_overrides] "203" = "91.105.192.100:443" ``` -We’re working on identifying all addresses for every “non‑standard” DC… -The fix will be included in release 2.0.0.2, no manual config needed. +If you have expertise in asynchronous network applications, traffic analysis, reverse engineering, or network forensics — we welcome ideas, suggestions, and pull requests. -If you have expertise in asynchronous network applications, traffic analysis, reverse engineering, or network forensics – we’re open to ideas, suggestions, and pull requests. +
# Features 💥 The configuration structure has changed since version 1.1.0.0. change it in your environment! diff --git a/docker-compose.yml b/docker-compose.yml index 5a23f14..f72b66d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,8 +5,12 @@ services: restart: unless-stopped ports: - "443:443" + # Allow caching 'proxy-secret' in read-only container + working_dir: /run/telemt volumes: - - ./config.toml:/app/config.toml:ro + - ./config.toml:/run/telemt/config.toml:ro + tmpfs: + - /run/telemt:rw,mode=1777,size=1m environment: - RUST_LOG=info # Uncomment this line if you want to use host network for IPv6, but bridge is default and usually better diff --git a/src/config/mod.rs b/src/config/mod.rs index a3dee7a..4b4d7e6 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -3,6 +3,7 @@ use crate::error::{ProxyError, Result}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use serde::de::Deserializer; use std::collections::HashMap; use std::net::IpAddr; use std::path::Path; @@ -53,6 +54,40 @@ fn default_metrics_whitelist() -> Vec { vec!["127.0.0.1".parse().unwrap(), "::1".parse().unwrap()] } +fn default_unknown_dc_log_path() -> Option { + Some("unknown-dc.txt".to_string()) +} + +// ============= Custom Deserializers ============= + +#[derive(Deserialize)] +#[serde(untagged)] +enum OneOrMany { + One(String), + Many(Vec), +} + +fn deserialize_dc_overrides<'de, D>( + deserializer: D, +) -> std::result::Result>, D::Error> +where + D: Deserializer<'de>, +{ + let raw: HashMap = HashMap::deserialize(deserializer)?; + let mut out = HashMap::new(); + for (dc, val) in raw { + let mut addrs = match val { + OneOrMany::One(s) => vec![s], + OneOrMany::Many(v) => v, + }; + addrs.retain(|s| !s.trim().is_empty()); + if !addrs.is_empty() { + out.insert(dc, addrs); + } + } + Ok(out) +} + // ============= Log Level ============= /// Logging verbosity level @@ -95,6 +130,50 @@ impl LogLevel { } } +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn dc_overrides_allow_string_and_array() { + let toml = r#" + [dc_overrides] + "201" = "149.154.175.50:443" + "202" = ["149.154.167.51:443", "149.154.175.100:443"] + "#; + let cfg: ProxyConfig = toml::from_str(toml).unwrap(); + assert_eq!(cfg.dc_overrides["201"], vec!["149.154.175.50:443"]); + assert_eq!( + cfg.dc_overrides["202"], + vec!["149.154.167.51:443", "149.154.175.100:443"] + ); + } + + #[test] + fn dc_overrides_inject_dc203_default() { + let toml = r#" + [general] + use_middle_proxy = false + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_dc_override_test.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + assert!(cfg + .dc_overrides + .get("203") + .map(|v| v.contains(&"91.105.192.100:443".to_string())) + .unwrap_or(false)); + let _ = std::fs::remove_file(path); + } +} + impl std::fmt::Display for LogLevel { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -163,6 +242,14 @@ pub struct GeneralConfig { #[serde(default)] pub middle_proxy_nat_stun: Option, + /// Ignore STUN/interface IP mismatch (keep using Middle Proxy even if NAT detected). + #[serde(default)] + pub stun_iface_mismatch_ignore: bool, + + /// Log unknown (non-standard) DC requests to a file (default: unknown-dc.txt). Set to null to disable. + #[serde(default = "default_unknown_dc_log_path")] + pub unknown_dc_log_path: Option, + #[serde(default)] pub log_level: LogLevel, @@ -183,6 +270,8 @@ impl Default for GeneralConfig { middle_proxy_nat_ip: None, middle_proxy_nat_probe: false, middle_proxy_nat_stun: None, + stun_iface_mismatch_ignore: false, + unknown_dc_log_path: default_unknown_dc_log_path(), log_level: LogLevel::Normal, disable_colors: false, } @@ -499,13 +588,13 @@ pub struct ProxyConfig { pub show_link: ShowLink, /// DC address overrides for non-standard DCs (CDN, media, test, etc.) - /// Keys are DC indices as strings, values are "ip:port" addresses. + /// Keys are DC indices as strings, values are one or more \"ip:port\" addresses. /// Matches the C implementation's `proxy_for :` config directive. /// Example in config.toml: /// [dc_overrides] - /// "203" = "149.154.175.100:443" - #[serde(default)] - pub dc_overrides: HashMap, + /// \"203\" = [\"149.154.175.100:443\", \"91.105.192.100:443\"] + #[serde(default, deserialize_with = "deserialize_dc_overrides")] + pub dc_overrides: HashMap>, /// Default DC index (1-5) for unmapped non-standard DCs. /// Matches the C implementation's `default ` config directive. @@ -599,6 +688,12 @@ impl ProxyConfig { }); } + // Ensure default DC203 override is present. + config + .dc_overrides + .entry("203".to_string()) + .or_insert_with(|| vec!["91.105.192.100:443".to_string()]); + Ok(config) } diff --git a/src/main.rs b/src/main.rs index 8a974be..0691bb2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,7 +27,7 @@ use crate::ip_tracker::UserIpTracker; use crate::proxy::ClientHandler; use crate::stats::{ReplayChecker, Stats}; use crate::stream::BufferPool; -use crate::transport::middle_proxy::{MePool, fetch_proxy_config}; +use crate::transport::middle_proxy::{MePool, fetch_proxy_config, stun_probe}; use crate::transport::{ListenOptions, UpstreamManager, create_listener}; use crate::util::ip::detect_ip; use crate::protocol::constants::{TG_MIDDLE_PROXIES_V4, TG_MIDDLE_PROXIES_V6}; @@ -183,7 +183,7 @@ async fn main() -> std::result::Result<(), Box> { } let prefer_ipv6 = config.general.prefer_ipv6; - let use_middle_proxy = config.general.use_middle_proxy; + let mut use_middle_proxy = config.general.use_middle_proxy; let config = Arc::new(config); let stats = Arc::new(Stats::new()); let rng = Arc::new(SecureRandom::new()); @@ -207,6 +207,31 @@ async fn main() -> std::result::Result<(), Box> { // Connection concurrency limit let _max_connections = Arc::new(Semaphore::new(10_000)); + // STUN check before choosing transport + if use_middle_proxy { + match stun_probe(config.general.middle_proxy_nat_stun.clone()).await { + Ok(Some(probe)) => { + info!( + local_ip = %probe.local_addr.ip(), + reflected_ip = %probe.reflected_addr.ip(), + "STUN detected public address" + ); + if probe.local_addr.ip() != probe.reflected_addr.ip() + && !config.general.stun_iface_mismatch_ignore + { + warn!( + local_ip = %probe.local_addr.ip(), + reflected_ip = %probe.reflected_addr.ip(), + "STUN/interface IP mismatch; falling back to direct DC (set stun_iface_mismatch_ignore=true to force Middle Proxy)" + ); + use_middle_proxy = false; + } + } + Ok(None) => warn!("STUN probe returned no address; continuing"), + Err(e) => warn!(error = %e, "STUN probe failed; continuing"), + } + } + // ===================================================================== // Middle Proxy initialization (if enabled) // ===================================================================== @@ -231,25 +256,25 @@ async fn main() -> std::result::Result<(), Box> { // proxy-secret is from: https://core.telegram.org/getProxySecret // ============================================================= let proxy_secret_path = config.general.proxy_secret_path.as_deref(); - match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).await { - Ok(proxy_secret) => { - info!( - secret_len = proxy_secret.len(), - key_sig = format_args!( - "0x{:08x}", - if proxy_secret.len() >= 4 { - u32::from_le_bytes([ - proxy_secret[0], - proxy_secret[1], - proxy_secret[2], - proxy_secret[3], - ]) - } else { - 0 - } - ), - "Proxy-secret loaded" - ); +match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).await { + Ok(proxy_secret) => { + info!( + secret_len = proxy_secret.len() as usize, // ← ЯВНЫЙ ТИП usize + key_sig = format_args!( + "0x{:08x}", + if proxy_secret.len() >= 4 { + u32::from_le_bytes([ + proxy_secret[0], + proxy_secret[1], + proxy_secret[2], + proxy_secret[3], + ]) + } else { + 0 + } + ), + "Proxy-secret loaded" + ); // Load ME config (v4/v6) + default DC let mut cfg_v4 = fetch_proxy_config( @@ -330,84 +355,83 @@ async fn main() -> std::result::Result<(), Box> { info!("Transport: Direct TCP (standard DCs only)"); } - // Startup DC ping (only meaningful in direct mode) - if me_pool.is_none() { - info!("================= Telegram DC Connectivity ================="); + info!("================= Telegram DC Connectivity ================="); - let ping_results = upstream_manager.ping_all_dcs(prefer_ipv6).await; + let ping_results = upstream_manager + .ping_all_dcs(prefer_ipv6, &config.dc_overrides) + .await; - for upstream_result in &ping_results { - // Show which IP version is in use and which is fallback - if upstream_result.both_available { - if prefer_ipv6 { - info!(" IPv6 in use and IPv4 is fallback"); - } else { - info!(" IPv4 in use and IPv6 is fallback"); - } - } else { - let v6_works = upstream_result - .v6_results - .iter() - .any(|r| r.rtt_ms.is_some()); - let v4_works = upstream_result - .v4_results - .iter() - .any(|r| r.rtt_ms.is_some()); - if v6_works && !v4_works { - info!(" IPv6 only (IPv4 unavailable)"); - } else if v4_works && !v6_works { - info!(" IPv4 only (IPv6 unavailable)"); - } else if !v6_works && !v4_works { - info!(" No connectivity!"); - } - } + for upstream_result in &ping_results { + let v6_works = upstream_result + .v6_results + .iter() + .any(|r| r.rtt_ms.is_some()); + let v4_works = upstream_result + .v4_results + .iter() + .any(|r| r.rtt_ms.is_some()); + + if upstream_result.both_available { + if prefer_ipv6 { + info!(" IPv6 in use and IPv4 is fallback"); + } else { + info!(" IPv4 in use and IPv6 is fallback"); + } + } else { + if v6_works && !v4_works { + info!(" IPv6 only (IPv4 unavailable)"); + } else if v4_works && !v6_works { + info!(" IPv4 only (IPv6 unavailable)"); + } else if !v6_works && !v4_works { + info!(" No connectivity!"); + } + } - info!(" via {}", upstream_result.upstream_name); - info!("============================================================"); + info!(" via {}", upstream_result.upstream_name); + info!("============================================================"); - // Print IPv6 results first - for dc in &upstream_result.v6_results { - let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port()); - match &dc.rtt_ms { - Some(rtt) => { - // Align: IPv6 addresses are longer, use fewer tabs - // [2001:b28:f23d:f001::a]:443 = ~28 chars - info!(" DC{} [IPv6] {}:\t\t{:.0} ms", dc.dc_idx, addr_str, rtt); - } - None => { - let err = dc.error.as_deref().unwrap_or("fail"); - info!(" DC{} [IPv6] {}:\t\tFAIL ({})", dc.dc_idx, addr_str, err); - } - } - } + // Print IPv6 results first (only if IPv6 is available) + if v6_works { + for dc in &upstream_result.v6_results { + let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port()); + match &dc.rtt_ms { + Some(rtt) => { + info!(" DC{} [IPv6] {}:\t\t{:.0} ms", dc.dc_idx, addr_str, rtt); + } + None => { + let err = dc.error.as_deref().unwrap_or("fail"); + info!(" DC{} [IPv6] {}:\t\tFAIL ({})", dc.dc_idx, addr_str, err); + } + } + } - info!("============================================================"); + info!("============================================================"); + } - // Print IPv4 results - for dc in &upstream_result.v4_results { - let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port()); - match &dc.rtt_ms { - Some(rtt) => { - // Align: IPv4 addresses are shorter, use more tabs - // 149.154.175.50:443 = ~18 chars - info!( - " DC{} [IPv4] {}:\t\t\t\t{:.0} ms", - dc.dc_idx, addr_str, rtt - ); - } - None => { - let err = dc.error.as_deref().unwrap_or("fail"); - info!( - " DC{} [IPv4] {}:\t\t\t\tFAIL ({})", - dc.dc_idx, addr_str, err - ); - } - } - } + // Print IPv4 results (only if IPv4 is available) + if v4_works { + for dc in &upstream_result.v4_results { + let addr_str = format!("{}:{}", dc.dc_addr.ip(), dc.dc_addr.port()); + match &dc.rtt_ms { + Some(rtt) => { + info!( + " DC{} [IPv4] {}:\t\t\t\t{:.0} ms", + dc.dc_idx, addr_str, rtt + ); + } + None => { + let err = dc.error.as_deref().unwrap_or("fail"); + info!( + " DC{} [IPv4] {}:\t\t\t\tFAIL ({})", + dc.dc_idx, addr_str, err + ); + } + } + } - info!("============================================================"); - } - } + info!("============================================================"); + } + } // Background tasks let um_clone = upstream_manager.clone(); @@ -558,6 +582,62 @@ async fn main() -> std::result::Result<(), Box> { }); } + #[cfg(unix)] + if let Some(ref unix_path) = config.server.listen_unix_sock { + use tokio::net::UnixListener; // ← добавь импорт, если его нет выше + + // Удаляем старые файлы сокета, если они есть (стандартная практика) + let _ = tokio::fs::remove_file(unix_path).await; + + let unix_listener = UnixListener::bind(unix_path)?; + info!("Listening on unix:{}", unix_path); + + let config = config.clone(); + let stats = stats.clone(); + let upstream_manager = upstream_manager.clone(); + let replay_checker = replay_checker.clone(); + let buffer_pool = buffer_pool.clone(); + let rng = rng.clone(); + let me_pool = me_pool.clone(); + let ip_tracker = ip_tracker.clone(); + + tokio::spawn(async move { + let unix_conn_counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)); + + loop { + match unix_listener.accept().await { + Ok((stream, _)) => { + let conn_id = unix_conn_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let fake_peer = SocketAddr::from(([127, 0, 0, 1], (conn_id % 65535) as u16)); // безопасный порт + + let config = config.clone(); + let stats = stats.clone(); + let upstream_manager = upstream_manager.clone(); + let replay_checker = replay_checker.clone(); + let buffer_pool = buffer_pool.clone(); + let rng = rng.clone(); + let me_pool = me_pool.clone(); + let ip_tracker = ip_tracker.clone(); + + tokio::spawn(async move { + if let Err(e) = crate::proxy::client::handle_client_stream( + stream, fake_peer, config, stats, + upstream_manager, replay_checker, buffer_pool, rng, + me_pool, ip_tracker, + ).await { + debug!(error = %e, "Unix socket connection error"); + } + }); + } + Err(e) => { + error!("Unix socket accept error: {}", e); + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + } + }); + } + match signal::ctrl_c().await { Ok(()) => info!("Shutting down..."), Err(e) => error!("Signal error: {}", e), diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 041e7cb..87d6b52 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -1,6 +1,8 @@ //! Client Handler +use std::future::Future; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; @@ -8,6 +10,17 @@ use tokio::net::TcpStream; use tokio::time::timeout; use tracing::{debug, warn}; +/// Post-handshake future (relay phase, runs outside handshake timeout) +type PostHandshakeFuture = Pin> + Send>>; + +/// Result of the handshake phase +enum HandshakeOutcome { + /// Handshake succeeded, relay work to do (outside timeout) + NeedsRelay(PostHandshakeFuture), + /// Already fully handled (bad client masking, etc.) + Handled, +} + use crate::config::ProxyConfig; use crate::crypto::SecureRandom; use crate::error::{HandshakeResult, ProxyError, Result}; @@ -24,6 +37,160 @@ use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle use crate::proxy::masking::handle_bad_client; use crate::proxy::middle_relay::handle_via_middle_proxy; +pub async fn handle_client_stream( + mut stream: S, + peer: SocketAddr, + config: Arc, + stats: Arc, + upstream_manager: Arc, + replay_checker: Arc, + buffer_pool: Arc, + rng: Arc, + me_pool: Option>, + ip_tracker: Arc, +) -> Result<()> +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + stats.increment_connects_all(); + debug!(peer = %peer, "New connection (generic stream)"); + + let handshake_timeout = Duration::from_secs(config.timeouts.client_handshake); + let stats_for_timeout = stats.clone(); + + // For non-TCP streams, use a synthetic local address + let local_addr: SocketAddr = format!("0.0.0.0:{}", config.server.port) + .parse() + .unwrap_or_else(|_| "0.0.0.0:443".parse().unwrap()); + + // Phase 1: handshake (with timeout) + let outcome = match timeout(handshake_timeout, async { + let mut first_bytes = [0u8; 5]; + stream.read_exact(&mut first_bytes).await?; + + let is_tls = tls::is_tls_handshake(&first_bytes[..3]); + debug!(peer = %peer, is_tls = is_tls, "Handshake type detected"); + + if is_tls { + let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize; + + if tls_len < 512 { + debug!(peer = %peer, tls_len = tls_len, "TLS handshake too short"); + stats.increment_connects_bad(); + let (reader, writer) = tokio::io::split(stream); + handle_bad_client(reader, writer, &first_bytes, &config).await; + return Ok(HandshakeOutcome::Handled); + } + + let mut handshake = vec![0u8; 5 + tls_len]; + handshake[..5].copy_from_slice(&first_bytes); + stream.read_exact(&mut handshake[5..]).await?; + + let (read_half, write_half) = tokio::io::split(stream); + + let (mut tls_reader, tls_writer, _tls_user) = match handle_tls_handshake( + &handshake, read_half, write_half, peer, + &config, &replay_checker, &rng, + ).await { + HandshakeResult::Success(result) => result, + HandshakeResult::BadClient { reader, writer } => { + stats.increment_connects_bad(); + handle_bad_client(reader, writer, &handshake, &config).await; + return Ok(HandshakeOutcome::Handled); + } + HandshakeResult::Error(e) => return Err(e), + }; + + debug!(peer = %peer, "Reading MTProto handshake through TLS"); + let mtproto_data = tls_reader.read_exact(HANDSHAKE_LEN).await?; + let mtproto_handshake: [u8; HANDSHAKE_LEN] = mtproto_data[..].try_into() + .map_err(|_| ProxyError::InvalidHandshake("Short MTProto handshake".into()))?; + + let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake( + &mtproto_handshake, tls_reader, tls_writer, peer, + &config, &replay_checker, true, + ).await { + HandshakeResult::Success(result) => result, + HandshakeResult::BadClient { reader: _, writer: _ } => { + stats.increment_connects_bad(); + debug!(peer = %peer, "Valid TLS but invalid MTProto handshake"); + return Ok(HandshakeOutcome::Handled); + } + HandshakeResult::Error(e) => return Err(e), + }; + + Ok(HandshakeOutcome::NeedsRelay(Box::pin( + RunningClientHandler::handle_authenticated_static( + crypto_reader, crypto_writer, success, + upstream_manager, stats, config, buffer_pool, rng, me_pool, + local_addr, peer, ip_tracker.clone(), + ), + ))) + } else { + if !config.general.modes.classic && !config.general.modes.secure { + debug!(peer = %peer, "Non-TLS modes disabled"); + stats.increment_connects_bad(); + let (reader, writer) = tokio::io::split(stream); + handle_bad_client(reader, writer, &first_bytes, &config).await; + return Ok(HandshakeOutcome::Handled); + } + + let mut handshake = [0u8; HANDSHAKE_LEN]; + handshake[..5].copy_from_slice(&first_bytes); + stream.read_exact(&mut handshake[5..]).await?; + + let (read_half, write_half) = tokio::io::split(stream); + + let (crypto_reader, crypto_writer, success) = match handle_mtproto_handshake( + &handshake, read_half, write_half, peer, + &config, &replay_checker, false, + ).await { + HandshakeResult::Success(result) => result, + HandshakeResult::BadClient { reader, writer } => { + stats.increment_connects_bad(); + handle_bad_client(reader, writer, &handshake, &config).await; + return Ok(HandshakeOutcome::Handled); + } + HandshakeResult::Error(e) => return Err(e), + }; + + Ok(HandshakeOutcome::NeedsRelay(Box::pin( + RunningClientHandler::handle_authenticated_static( + crypto_reader, + crypto_writer, + success, + upstream_manager, + stats, + config, + buffer_pool, + rng, + me_pool, + local_addr, + peer, + ip_tracker.clone(), + ) + ))) + } + }).await { + Ok(Ok(outcome)) => outcome, + Ok(Err(e)) => { + debug!(peer = %peer, error = %e, "Handshake failed"); + return Err(e); + } + Err(_) => { + stats_for_timeout.increment_handshake_timeouts(); + debug!(peer = %peer, "Handshake timeout"); + return Err(ProxyError::TgHandshakeTimeout); + } + }; + + // Phase 2: relay (WITHOUT handshake timeout — relay has its own activity timeouts) + match outcome { + HandshakeOutcome::NeedsRelay(fut) => fut.await, + HandshakeOutcome::Handled => Ok(()), + } +} + pub struct ClientHandler; pub struct RunningClientHandler { @@ -72,6 +239,7 @@ impl RunningClientHandler { self.stats.increment_connects_all(); let peer = self.peer; + let ip_tracker = self.ip_tracker.clone(); debug!(peer = %peer, "New connection"); if let Err(e) = configure_client_socket( @@ -85,31 +253,34 @@ impl RunningClientHandler { let handshake_timeout = Duration::from_secs(self.config.timeouts.client_handshake); let stats = self.stats.clone(); - let result = timeout(handshake_timeout, self.do_handshake()).await; - - match result { - Ok(Ok(())) => { - debug!(peer = %peer, "Connection handled successfully"); - Ok(()) - } + // Phase 1: handshake (with timeout) + let outcome = match timeout(handshake_timeout, self.do_handshake()).await { + Ok(Ok(outcome)) => outcome, Ok(Err(e)) => { debug!(peer = %peer, error = %e, "Handshake failed"); - Err(e) + return Err(e); } Err(_) => { stats.increment_handshake_timeouts(); debug!(peer = %peer, "Handshake timeout"); - Err(ProxyError::TgHandshakeTimeout) + return Err(ProxyError::TgHandshakeTimeout); } + }; + + // Phase 2: relay (WITHOUT handshake timeout — relay has its own activity timeouts) + match outcome { + HandshakeOutcome::NeedsRelay(fut) => fut.await, + HandshakeOutcome::Handled => Ok(()), } } - async fn do_handshake(mut self) -> Result<()> { + async fn do_handshake(mut self) -> Result { let mut first_bytes = [0u8; 5]; self.stream.read_exact(&mut first_bytes).await?; let is_tls = tls::is_tls_handshake(&first_bytes[..3]); let peer = self.peer; + let ip_tracker = self.ip_tracker.clone(); debug!(peer = %peer, is_tls = is_tls, "Handshake type detected"); @@ -120,8 +291,9 @@ impl RunningClientHandler { } } - async fn handle_tls_client(mut self, first_bytes: [u8; 5]) -> Result<()> { + async fn handle_tls_client(mut self, first_bytes: [u8; 5]) -> Result { let peer = self.peer; + let ip_tracker = self.ip_tracker.clone(); let tls_len = u16::from_be_bytes([first_bytes[3], first_bytes[4]]) as usize; @@ -132,7 +304,7 @@ impl RunningClientHandler { self.stats.increment_connects_bad(); let (reader, writer) = self.stream.into_split(); handle_bad_client(reader, writer, &first_bytes, &self.config).await; - return Ok(()); + return Ok(HandshakeOutcome::Handled); } let mut handshake = vec![0u8; 5 + tls_len]; @@ -162,7 +334,7 @@ impl RunningClientHandler { HandshakeResult::BadClient { reader, writer } => { stats.increment_connects_bad(); handle_bad_client(reader, writer, &handshake, &config).await; - return Ok(()); + return Ok(HandshakeOutcome::Handled); } HandshakeResult::Error(e) => return Err(e), }; @@ -191,37 +363,39 @@ impl RunningClientHandler { } => { stats.increment_connects_bad(); debug!(peer = %peer, "Valid TLS but invalid MTProto handshake"); - return Ok(()); + return Ok(HandshakeOutcome::Handled); } HandshakeResult::Error(e) => return Err(e), }; - Self::handle_authenticated_static( - crypto_reader, - crypto_writer, - success, - self.upstream_manager, - self.stats, - self.config, - buffer_pool, - self.rng, - self.me_pool, - local_addr, - peer, - self.ip_tracker, - ) - .await + Ok(HandshakeOutcome::NeedsRelay(Box::pin( + Self::handle_authenticated_static( + crypto_reader, + crypto_writer, + success, + self.upstream_manager, + self.stats, + self.config, + buffer_pool, + self.rng, + self.me_pool, + local_addr, + peer, + self.ip_tracker, + ), + ))) } - async fn handle_direct_client(mut self, first_bytes: [u8; 5]) -> Result<()> { + async fn handle_direct_client(mut self, first_bytes: [u8; 5]) -> Result { let peer = self.peer; + let ip_tracker = self.ip_tracker.clone(); if !self.config.general.modes.classic && !self.config.general.modes.secure { debug!(peer = %peer, "Non-TLS modes disabled"); self.stats.increment_connects_bad(); let (reader, writer) = self.stream.into_split(); handle_bad_client(reader, writer, &first_bytes, &self.config).await; - return Ok(()); + return Ok(HandshakeOutcome::Handled); } let mut handshake = [0u8; HANDSHAKE_LEN]; @@ -251,26 +425,27 @@ impl RunningClientHandler { HandshakeResult::BadClient { reader, writer } => { stats.increment_connects_bad(); handle_bad_client(reader, writer, &handshake, &config).await; - return Ok(()); + return Ok(HandshakeOutcome::Handled); } HandshakeResult::Error(e) => return Err(e), }; - Self::handle_authenticated_static( - crypto_reader, - crypto_writer, - success, - self.upstream_manager, - self.stats, - self.config, - buffer_pool, - self.rng, - self.me_pool, - local_addr, - peer, - self.ip_tracker, - ) - .await + Ok(HandshakeOutcome::NeedsRelay(Box::pin( + Self::handle_authenticated_static( + crypto_reader, + crypto_writer, + success, + self.upstream_manager, + self.stats, + self.config, + buffer_pool, + self.rng, + self.me_pool, + local_addr, + peer, + self.ip_tracker, + ), + ))) } /// Main dispatch after successful handshake. diff --git a/src/proxy/direct_relay.rs b/src/proxy/direct_relay.rs index 3cce39e..ff50bca 100644 --- a/src/proxy/direct_relay.rs +++ b/src/proxy/direct_relay.rs @@ -1,3 +1,5 @@ +use std::fs::OpenOptions; +use std::io::Write; use std::net::SocketAddr; use std::sync::Arc; @@ -87,17 +89,25 @@ fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result { let num_dcs = datacenters.len(); let dc_key = dc_idx.to_string(); - if let Some(addr_str) = config.dc_overrides.get(&dc_key) { - match addr_str.parse::() { - Ok(addr) => { - debug!(dc_idx = dc_idx, addr = %addr, "Using DC override from config"); - return Ok(addr); - } - Err(_) => { - warn!(dc_idx = dc_idx, addr_str = %addr_str, - "Invalid DC override address in config, ignoring"); + if let Some(addrs) = config.dc_overrides.get(&dc_key) { + let prefer_v6 = config.general.prefer_ipv6; + let mut parsed = Vec::new(); + for addr_str in addrs { + match addr_str.parse::() { + Ok(addr) => parsed.push(addr), + Err(_) => warn!(dc_idx = dc_idx, addr_str = %addr_str, "Invalid DC override address in config, ignoring"), } } + + if let Some(addr) = parsed + .iter() + .find(|a| a.is_ipv6() == prefer_v6) + .or_else(|| parsed.first()) + .copied() + { + debug!(dc_idx = dc_idx, addr = %addr, count = parsed.len(), "Using DC override from config"); + return Ok(addr); + } } let abs_dc = dc_idx.unsigned_abs() as usize; @@ -105,6 +115,16 @@ fn get_dc_addr_static(dc_idx: i16, config: &ProxyConfig) -> Result { return Ok(SocketAddr::new(datacenters[abs_dc - 1], TG_DATACENTER_PORT)); } + // Unknown DC requested by client without override: log and fall back. + if !config.dc_overrides.contains_key(&dc_key) { + warn!(dc_idx = dc_idx, "Requested non-standard DC with no override; falling back to default cluster"); + if let Some(path) = &config.general.unknown_dc_log_path { + if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) { + let _ = writeln!(file, "dc_idx={dc_idx}"); + } + } + } + let default_dc = config.default_dc.unwrap_or(2) as usize; let fallback_idx = if default_dc >= 1 && default_dc <= num_dcs { default_dc - 1 diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index e617158..72c0c24 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -15,6 +15,7 @@ use bytes::Bytes; pub use health::me_health_monitor; pub use pool::MePool; +pub use pool_nat::{stun_probe, StunProbeResult}; pub use registry::ConnRegistry; pub use secret::fetch_proxy_secret; pub use config_updater::{fetch_proxy_config, me_config_updater}; diff --git a/src/transport/middle_proxy/pool_nat.rs b/src/transport/middle_proxy/pool_nat.rs index 633d0af..35ee0ea 100644 --- a/src/transport/middle_proxy/pool_nat.rs +++ b/src/transport/middle_proxy/pool_nat.rs @@ -6,6 +6,17 @@ use crate::error::{ProxyError, Result}; use super::MePool; +#[derive(Debug, Clone, Copy)] +pub struct StunProbeResult { + pub local_addr: std::net::SocketAddr, + pub reflected_addr: std::net::SocketAddr, +} + +pub async fn stun_probe(stun_addr: Option) -> Result> { + let stun_addr = stun_addr.unwrap_or_else(|| "stun.l.google.com:19302".to_string()); + fetch_stun_binding(&stun_addr).await +} + impl MePool { pub(super) fn translate_ip_for_nat(&self, ip: IpAddr) -> IpAddr { let nat_ip = self @@ -88,10 +99,12 @@ impl MePool { .unwrap_or_else(|| "stun.l.google.com:19302".to_string()); match fetch_stun_binding(&stun_addr).await { Ok(sa) => { - if let Some(sa) = sa { - info!(%sa, "NAT probe: reflected address"); + if let Some(result) = sa { + info!(local = %result.local_addr, reflected = %result.reflected_addr, "NAT probe: reflected address"); + Some(result.reflected_addr) + } else { + None } - sa } Err(e) => { warn!(error = %e, "NAT probe failed"); @@ -128,7 +141,7 @@ async fn fetch_public_ipv4_once(url: &str) -> Result> { Ok(ip) } -async fn fetch_stun_binding(stun_addr: &str) -> Result> { +async fn fetch_stun_binding(stun_addr: &str) -> Result> { use rand::RngCore; use tokio::net::UdpSocket; @@ -196,10 +209,17 @@ async fn fetch_stun_binding(stun_addr: &str) -> Result {} } diff --git a/src/transport/pool.rs b/src/transport/pool.rs index 1daa998..8d83321 100644 --- a/src/transport/pool.rs +++ b/src/transport/pool.rs @@ -285,12 +285,17 @@ where #[cfg(test)] mod tests { use super::*; + use std::io::ErrorKind; use tokio::net::TcpListener; #[tokio::test] async fn test_pool_basic() { // Start a test server - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let listener = match TcpListener::bind("127.0.0.1:0").await { + Ok(l) => l, + Err(e) if e.kind() == ErrorKind::PermissionDenied => return, + Err(e) => panic!("bind failed: {e}"), + }; let addr = listener.local_addr().unwrap(); // Accept connections in background @@ -303,7 +308,11 @@ mod tests { let pool = ConnectionPool::new(); // Get a connection - let conn1 = pool.get(addr).await.unwrap(); + let conn1 = match pool.get(addr).await { + Ok(c) => c, + Err(ProxyError::Io(e)) if e.kind() == ErrorKind::PermissionDenied => return, + Err(e) => panic!("connect failed: {e}"), + }; // Return it to pool pool.put(addr, conn1).await; @@ -335,4 +344,4 @@ mod tests { assert_eq!(stats.endpoints, 0); assert_eq!(stats.total_connections, 0); } -} \ No newline at end of file +} diff --git a/src/transport/socket.rs b/src/transport/socket.rs index a07c21c..a4a7034 100644 --- a/src/transport/socket.rs +++ b/src/transport/socket.rs @@ -205,15 +205,29 @@ pub fn create_listener(addr: SocketAddr, options: &ListenOptions) -> Result l, + Err(e) if e.kind() == ErrorKind::PermissionDenied => return, + Err(e) => panic!("bind failed: {e}"), + }; let addr = listener.local_addr().unwrap(); - let stream = TcpStream::connect(addr).await.unwrap(); - configure_tcp_socket(&stream, true, Duration::from_secs(30)).unwrap(); + let stream = match TcpStream::connect(addr).await { + Ok(s) => s, + Err(e) if e.kind() == ErrorKind::PermissionDenied => return, + Err(e) => panic!("connect failed: {e}"), + }; + if let Err(e) = configure_tcp_socket(&stream, true, Duration::from_secs(30)) { + if e.kind() == ErrorKind::PermissionDenied { + return; + } + panic!("configure_tcp_socket failed: {e}"); + } } #[test] @@ -234,4 +248,4 @@ mod tests { assert!(opts.reuse_port); assert_eq!(opts.backlog, 1024); } -} \ No newline at end of file +} diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index 4b5fe9c..db0d366 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -2,6 +2,7 @@ //! //! IPv6/IPv4 connectivity checks with configurable preference. +use std::collections::HashMap; use std::net::{SocketAddr, IpAddr}; use std::sync::Arc; use std::time::Duration; @@ -350,7 +351,11 @@ impl UpstreamManager { /// Ping all Telegram DCs through all upstreams. /// Tests BOTH IPv6 and IPv4, returns separate results for each. - pub async fn ping_all_dcs(&self, prefer_ipv6: bool) -> Vec { + pub async fn ping_all_dcs( + &self, + prefer_ipv6: bool, + dc_overrides: &HashMap>, + ) -> Vec { let upstreams: Vec<(usize, UpstreamConfig)> = { let guard = self.upstreams.read().await; guard.iter().enumerate() @@ -450,6 +455,58 @@ impl UpstreamManager { v4_results.push(ping_result); } + // === Ping DC overrides (v4/v6) === + for (dc_key, addrs) in dc_overrides { + let dc_num: i16 = match dc_key.parse::() { + Ok(v) if v > 0 => v, + Err(_) => { + warn!(dc = %dc_key, "Invalid dc_overrides key, skipping"); + continue; + }, + _ => continue, + }; + let dc_idx = dc_num as usize; + for addr_str in addrs { + match addr_str.parse::() { + Ok(addr) => { + let is_v6 = addr.is_ipv6(); + let result = tokio::time::timeout( + Duration::from_secs(DC_PING_TIMEOUT_SECS), + self.ping_single_dc(&upstream_config, addr) + ).await; + + let ping_result = match result { + Ok(Ok(rtt_ms)) => DcPingResult { + dc_idx, + dc_addr: addr, + rtt_ms: Some(rtt_ms), + error: None, + }, + Ok(Err(e)) => DcPingResult { + dc_idx, + dc_addr: addr, + rtt_ms: None, + error: Some(e.to_string()), + }, + Err(_) => DcPingResult { + dc_idx, + dc_addr: addr, + rtt_ms: None, + error: Some("timeout".to_string()), + }, + }; + + if is_v6 { + v6_results.push(ping_result); + } else { + v4_results.push(ping_result); + } + } + Err(_) => warn!(dc = %dc_idx, addr = %addr_str, "Invalid dc_overrides address, skipping"), + } + } + } + // Check if both IP versions have at least one working DC let v6_has_working = v6_results.iter().any(|r| r.rtt_ms.is_some()); let v4_has_working = v4_results.iter().any(|r| r.rtt_ms.is_some()); @@ -624,4 +681,4 @@ impl UpstreamManager { Some(SocketAddr::new(ip, TG_DATACENTER_PORT)) } -} \ No newline at end of file +} diff --git a/telemt b/telemt deleted file mode 100644 index fbb8d6f..0000000 Binary files a/telemt and /dev/null differ diff --git a/tools/dc.py b/tools/dc.py index f142baf..d431966 100644 --- a/tools/dc.py +++ b/tools/dc.py @@ -1,121 +1,204 @@ +"""Telegram datacenter server checker.""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from itertools import groupby +from operator import attrgetter +from pathlib import Path +from typing import TYPE_CHECKING + from telethon import TelegramClient from telethon.tl.functions.help import GetConfigRequest -import asyncio -api_id = '' -api_hash = '' +if TYPE_CHECKING: + from telethon.tl.types import DcOption -async def get_all_servers(): - print("🔄 Подключаемся к Telegram...") - client = TelegramClient('session', api_id, api_hash) - - await client.start() - print("✅ Подключение установлено!\n") - - print("📡 Запрашиваем конфигурацию серверов...") - config = await client(GetConfigRequest()) - - print(f"📊 Получено серверов: {len(config.dc_options)}\n") - print("="*80) - - # Группируем серверы по DC ID - dc_groups = {} - for dc in config.dc_options: - if dc.id not in dc_groups: - dc_groups[dc.id] = [] - dc_groups[dc.id].append(dc) - - # Выводим все серверы, сгруппированные по DC - for dc_id in sorted(dc_groups.keys()): - servers = dc_groups[dc_id] - print(f"\n🌐 DATACENTER {dc_id} ({len(servers)} серверов)") - print("-" * 80) - - for dc in servers: - # Собираем флаги - flags = [] - if dc.ipv6: - flags.append("IPv6") - if dc.media_only: - flags.append("🎬 MEDIA-ONLY") - if dc.cdn: - flags.append("📦 CDN") - if dc.tcpo_only: - flags.append("🔒 TCPO") - if dc.static: - flags.append("📌 STATIC") - - flags_str = f" [{', '.join(flags)}]" if flags else " [STANDARD]" - - # Форматируем IP (выравниваем для читаемости) - ip_display = f"{dc.ip_address:45}" - - print(f" {ip_display}:{dc.port:5}{flags_str}") - - # Статистика - print("\n" + "="*80) - print("📈 СТАТИСТИКА:") - print("="*80) - - total = len(config.dc_options) - ipv4_count = sum(1 for dc in config.dc_options if not dc.ipv6) - ipv6_count = sum(1 for dc in config.dc_options if dc.ipv6) - media_count = sum(1 for dc in config.dc_options if dc.media_only) - cdn_count = sum(1 for dc in config.dc_options if dc.cdn) - tcpo_count = sum(1 for dc in config.dc_options if dc.tcpo_only) - static_count = sum(1 for dc in config.dc_options if dc.static) - - print(f" Всего серверов: {total}") - print(f" IPv4 серверы: {ipv4_count}") - print(f" IPv6 серверы: {ipv6_count}") - print(f" Media-only: {media_count}") - print(f" CDN серверы: {cdn_count}") - print(f" TCPO-only: {tcpo_count}") - print(f" Static: {static_count}") - - # Дополнительная информация из config - print("\n" + "="*80) - print("ℹ️ ДОПОЛНИТЕЛЬНАЯ ИНФОРМАЦИЯ:") - print("="*80) - print(f" Дата конфигурации: {config.date}") - print(f" Expires: {config.expires}") - print(f" Test mode: {config.test_mode}") - print(f" This DC: {config.this_dc}") - - # Сохраняем в файл - print("\n💾 Сохраняем результаты в файл telegram_servers.txt...") - with open('telegram_servers.txt', 'w', encoding='utf-8') as f: - f.write("TELEGRAM DATACENTER SERVERS\n") - f.write("="*80 + "\n\n") - - for dc_id in sorted(dc_groups.keys()): - servers = dc_groups[dc_id] - f.write(f"\nDATACENTER {dc_id} ({len(servers)} servers)\n") - f.write("-" * 80 + "\n") - - for dc in servers: - flags = [] - if dc.ipv6: - flags.append("IPv6") - if dc.media_only: - flags.append("MEDIA-ONLY") - if dc.cdn: - flags.append("CDN") - if dc.tcpo_only: - flags.append("TCPO") - if dc.static: - flags.append("STATIC") - - flags_str = f" [{', '.join(flags)}]" if flags else " [STANDARD]" - f.write(f" {dc.ip_address}:{dc.port}{flags_str}\n") - - f.write(f"\n\nTotal servers: {total}\n") - f.write(f"Generated: {config.date}\n") - - print("✅ Результаты сохранены в telegram_servers.txt") - - await client.disconnect() - print("\n👋 Отключились от Telegram") +API_ID: int = 123456 +API_HASH: str = "" +SESSION_NAME: str = "session" +OUTPUT_FILE: Path = Path("telegram_servers.txt") -if __name__ == '__main__': - asyncio.run(get_all_servers()) \ No newline at end of file +_CONSOLE_FLAG_MAP: dict[str, str] = { + "IPv6": "IPv6", + "MEDIA-ONLY": "🎬 MEDIA-ONLY", + "CDN": "📦 CDN", + "TCPO": "🔒 TCPO", + "STATIC": "📌 STATIC", +} + + +@dataclass(frozen=True, slots=True) +class DCServer: + """Typed representation of a Telegram DC server. + + Attributes: + dc_id: Datacenter identifier. + ip: Server IP address. + port: Server port. + flags: Active flag labels (plain, without emoji). + """ + + dc_id: int + ip: str + port: int + flags: frozenset[str] = field(default_factory=frozenset) + + @classmethod + def from_option(cls, dc: DcOption) -> DCServer: + """Create from a Telethon DcOption. + + Args: + dc: Raw DcOption object. + + Returns: + Parsed DCServer instance. + """ + checks: dict[str, bool] = { + "IPv6": dc.ipv6, + "MEDIA-ONLY": dc.media_only, + "CDN": dc.cdn, + "TCPO": dc.tcpo_only, + "STATIC": dc.static, + } + return cls( + dc_id=dc.id, + ip=dc.ip_address, + port=dc.port, + flags=frozenset(k for k, v in checks.items() if v), + ) + + def flags_display(self, *, emoji: bool = False) -> str: + """Formatted flags string. + + Args: + emoji: Whether to include emoji prefixes. + + Returns: + Bracketed flags or '[STANDARD]'. + """ + if not self.flags: + return "[STANDARD]" + labels = sorted( + _CONSOLE_FLAG_MAP[f] if emoji else f for f in self.flags + ) + return f"[{', '.join(labels)}]" + + +class TelegramDCChecker: + """Fetches and displays Telegram DC configuration. + + Attributes: + _client: Telethon client instance. + _servers: Parsed server list. + """ + + def __init__(self) -> None: + """Initialize the checker.""" + self._client = TelegramClient(SESSION_NAME, API_ID, API_HASH) + self._servers: list[DCServer] = [] + + async def run(self) -> None: + """Connect, fetch config, display and save results.""" + print("🔄 Подключаемся к Telegram...") # noqa: T201 + try: + await self._client.start() + print("✅ Подключение установлено!\n") # noqa: T201 + + print("📡 Запрашиваем конфигурацию серверов...") # noqa: T201 + config = await self._client(GetConfigRequest()) + self._servers = [DCServer.from_option(dc) for dc in config.dc_options] + + self._print(config) + self._save(config) + finally: + await self._client.disconnect() + print("\n👋 Отключились от Telegram") # noqa: T201 + + def _grouped(self) -> dict[int, list[DCServer]]: + """Group servers by DC ID. + + Returns: + Ordered mapping of DC ID to servers. + """ + ordered = sorted(self._servers, key=attrgetter("dc_id")) + return {k: list(g) for k, g in groupby(ordered, key=attrgetter("dc_id"))} + + def _print(self, config: object) -> None: + """Print results to stdout in original format. + + Args: + config: Raw Telegram config. + """ + sep = "=" * 80 + dash = "-" * 80 + total = len(self._servers) + + print(f"📊 Получено серверов: {total}\n") # noqa: T201 + print(sep) # noqa: T201 + + for dc_id, servers in self._grouped().items(): + print(f"\n🌐 DATACENTER {dc_id} ({len(servers)} серверов)") # noqa: T201 + print(dash) # noqa: T201 + for s in servers: + print(f" {s.ip:45}:{s.port:5} {s.flags_display(emoji=True)}") # noqa: T201 + + ipv4 = total - self._flag_count("IPv6") + print(f"\n{sep}") # noqa: T201 + print("📈 СТАТИСТИКА:") # noqa: T201 + print(sep) # noqa: T201 + print(f" Всего серверов: {total}") # noqa: T201 + print(f" IPv4 серверы: {ipv4}") # noqa: T201 + print(f" IPv6 серверы: {self._flag_count('IPv6')}") # noqa: T201 + print(f" Media-only: {self._flag_count('MEDIA-ONLY')}") # noqa: T201 + print(f" CDN серверы: {self._flag_count('CDN')}") # noqa: T201 + print(f" TCPO-only: {self._flag_count('TCPO')}") # noqa: T201 + print(f" Static: {self._flag_count('STATIC')}") # noqa: T201 + + print(f"\n{sep}") # noqa: T201 + print("ℹ️ ДОПОЛНИТЕЛЬНАЯ ИНФОРМАЦИЯ:") # noqa: T201 + print(sep) # noqa: T201 + print(f" Дата конфигурации: {config.date}") # noqa: T201 # type: ignore[attr-defined] + print(f" Expires: {config.expires}") # noqa: T201 # type: ignore[attr-defined] + print(f" Test mode: {config.test_mode}") # noqa: T201 # type: ignore[attr-defined] + print(f" This DC: {config.this_dc}") # noqa: T201 # type: ignore[attr-defined] + + def _flag_count(self, flag: str) -> int: + """Count servers with a given flag. + + Args: + flag: Flag name. + + Returns: + Count of matching servers. + """ + return sum(1 for s in self._servers if flag in s.flags) + + def _save(self, config: object) -> None: + """Save results to file in original format. + + Args: + config: Raw Telegram config. + """ + parts: list[str] = [] + parts.append("TELEGRAM DATACENTER SERVERS\n") + parts.append("=" * 80 + "\n\n") + + for dc_id, servers in self._grouped().items(): + parts.append(f"\nDATACENTER {dc_id} ({len(servers)} servers)\n") + parts.append("-" * 80 + "\n") + for s in servers: + parts.append(f" {s.ip}:{s.port} {s.flags_display(emoji=False)}\n") + + parts.append(f"\n\nTotal servers: {len(self._servers)}\n") + parts.append(f"Generated: {config.date}\n") # type: ignore[attr-defined] + + OUTPUT_FILE.write_text("".join(parts), encoding="utf-8") + + print(f"\n💾 Сохраняем результаты в файл {OUTPUT_FILE}...") # noqa: T201 + print(f"✅ Результаты сохранены в {OUTPUT_FILE}") # noqa: T201 + + +if __name__ == "__main__": + asyncio.run(TelegramDCChecker().run())