From a40d6929e585229fcff27306e42bc0218d8d97aa Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Mon, 23 Mar 2026 22:41:17 +0300 Subject: [PATCH] Upstream-driver getProxyConfig and getProxyConfig Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/load.rs | 15 +- src/maestro/helpers.rs | 7 +- src/maestro/me_startup.rs | 5 +- src/proxy/handshake.rs | 3 +- src/transport/middle_proxy/config_updater.rs | 49 +++-- src/transport/middle_proxy/http_fetch.rs | 184 +++++++++++++++++++ src/transport/middle_proxy/mod.rs | 6 +- src/transport/middle_proxy/secret.rs | 38 ++-- 8 files changed, 266 insertions(+), 41 deletions(-) create mode 100644 src/transport/middle_proxy/http_fetch.rs diff --git a/src/config/load.rs b/src/config/load.rs index 8355fb4..2c46766 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -1267,10 +1267,7 @@ mod tests { cfg.server.proxy_protocol_trusted_cidrs, default_proxy_protocol_trusted_cidrs() ); - assert_eq!( - cfg.censorship.unknown_sni_action, - UnknownSniAction::Drop - ); + assert_eq!(cfg.censorship.unknown_sni_action, UnknownSniAction::Drop); assert_eq!(cfg.server.api.listen, default_api_listen()); assert_eq!(cfg.server.api.whitelist, default_api_whitelist()); assert_eq!( @@ -1493,7 +1490,10 @@ mod tests { "#, ) .unwrap(); - assert_eq!(cfg_default.censorship.unknown_sni_action, UnknownSniAction::Drop); + assert_eq!( + cfg_default.censorship.unknown_sni_action, + UnknownSniAction::Drop + ); let cfg_mask: ProxyConfig = toml::from_str( r#" @@ -1506,7 +1506,10 @@ mod tests { "#, ) .unwrap(); - assert_eq!(cfg_mask.censorship.unknown_sni_action, UnknownSniAction::Mask); + assert_eq!( + cfg_mask.censorship.unknown_sni_action, + UnknownSniAction::Mask + ); } #[test] diff --git a/src/maestro/helpers.rs b/src/maestro/helpers.rs index 35f796f..032460c 100644 --- a/src/maestro/helpers.rs +++ b/src/maestro/helpers.rs @@ -8,8 +8,10 @@ use tracing::{debug, error, info, warn}; use crate::cli; use crate::config::ProxyConfig; +use crate::transport::UpstreamManager; use crate::transport::middle_proxy::{ - ProxyConfigData, fetch_proxy_config_with_raw, load_proxy_config_cache, save_proxy_config_cache, + ProxyConfigData, fetch_proxy_config_with_raw_via_upstream, load_proxy_config_cache, + save_proxy_config_cache, }; pub(crate) fn resolve_runtime_config_path( @@ -288,9 +290,10 @@ pub(crate) async fn load_startup_proxy_config_snapshot( cache_path: Option<&str>, me2dc_fallback: bool, label: &'static str, + upstream: Option>, ) -> Option { loop { - match fetch_proxy_config_with_raw(url).await { + match fetch_proxy_config_with_raw_via_upstream(url, upstream.clone()).await { Ok((cfg, raw)) => { if !cfg.map.is_empty() { if let Some(path) = cache_path diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index 022f8ae..b1e605c 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -63,9 +63,10 @@ pub(crate) async fn initialize_me_pool( let proxy_secret_path = config.general.proxy_secret_path.as_deref(); let pool_size = config.general.middle_proxy_pool_size.max(1); let proxy_secret = loop { - match crate::transport::middle_proxy::fetch_proxy_secret( + match crate::transport::middle_proxy::fetch_proxy_secret_with_upstream( proxy_secret_path, config.general.proxy_secret_len_max, + Some(upstream_manager.clone()), ) .await { @@ -129,6 +130,7 @@ pub(crate) async fn initialize_me_pool( config.general.proxy_config_v4_cache_path.as_deref(), me2dc_fallback, "getProxyConfig", + Some(upstream_manager.clone()), ) .await; if cfg_v4.is_some() { @@ -160,6 +162,7 @@ pub(crate) async fn initialize_me_pool( config.general.proxy_config_v6_cache_path.as_deref(), me2dc_fallback, "getProxyConfigV6", + Some(upstream_manager.clone()), ) .await; if cfg_v6.is_some() { diff --git a/src/proxy/handshake.rs b/src/proxy/handshake.rs index 8b8c4a3..9d48fe9 100644 --- a/src/proxy/handshake.rs +++ b/src/proxy/handshake.rs @@ -667,7 +667,8 @@ where let cached = if config.censorship.tls_emulation { if let Some(cache) = tls_cache.as_ref() { - let selected_domain = matched_tls_domain.unwrap_or(config.censorship.tls_domain.as_str()); + let selected_domain = + matched_tls_domain.unwrap_or(config.censorship.tls_domain.as_str()); let cached_entry = cache.get(selected_domain).await; let use_full_cert_payload = cache .take_full_cert_budget_for_ip( diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 8e5a701..9819e8d 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -11,17 +11,19 @@ use tracing::{debug, info, warn}; use crate::config::ProxyConfig; use crate::error::Result; +use crate::transport::UpstreamManager; +use super::http_fetch::https_get; use super::MePool; use super::rotation::{MeReinitTrigger, enqueue_reinit_trigger}; -use super::secret::download_proxy_secret_with_max_len; +use super::secret::download_proxy_secret_with_max_len_via_upstream; use super::selftest::record_timeskew_sample; use std::time::SystemTime; -async fn retry_fetch(url: &str) -> Option { +async fn retry_fetch(url: &str, upstream: Option>) -> Option { let delays = [1u64, 5, 15]; for (i, d) in delays.iter().enumerate() { - match fetch_proxy_config(url).await { + match fetch_proxy_config_via_upstream(url, upstream.clone()).await { Ok(cfg) => return Some(cfg), Err(e) => { if i == delays.len() - 1 { @@ -96,13 +98,17 @@ pub async fn save_proxy_config_cache(path: &str, raw_text: &str) -> Result<()> { } pub async fn fetch_proxy_config_with_raw(url: &str) -> Result<(ProxyConfigData, String)> { - let resp = reqwest::get(url).await.map_err(|e| { - crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")) - })?; - let http_status = resp.status().as_u16(); + fetch_proxy_config_with_raw_via_upstream(url, None).await +} - if let Some(date) = resp.headers().get(reqwest::header::DATE) - && let Ok(date_str) = date.to_str() +pub async fn fetch_proxy_config_with_raw_via_upstream( + url: &str, + upstream: Option>, +) -> Result<(ProxyConfigData, String)> { + let resp = https_get(url, upstream).await?; + let http_status = resp.status; + + if let Some(date_str) = resp.date_header.as_deref() && let Ok(server_time) = httpdate::parse_http_date(date_str) && let Ok(skew) = SystemTime::now() .duration_since(server_time) @@ -123,9 +129,7 @@ pub async fn fetch_proxy_config_with_raw(url: &str) -> Result<(ProxyConfigData, } } - let text = resp.text().await.map_err(|e| { - crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")) - })?; + let text = String::from_utf8_lossy(&resp.body).into_owned(); let parsed = parse_proxy_config_text(&text, http_status); Ok((parsed, text)) } @@ -261,7 +265,14 @@ fn parse_proxy_line(line: &str) -> Option<(i32, IpAddr, u16)> { } pub async fn fetch_proxy_config(url: &str) -> Result { - fetch_proxy_config_with_raw(url) + fetch_proxy_config_via_upstream(url, None).await +} + +pub async fn fetch_proxy_config_via_upstream( + url: &str, + upstream: Option>, +) -> Result { + fetch_proxy_config_with_raw_via_upstream(url, upstream) .await .map(|(parsed, _raw)| parsed) } @@ -300,6 +311,7 @@ async fn run_update_cycle( state: &mut UpdaterState, reinit_tx: &mpsc::Sender, ) { + let upstream = pool.upstream.clone(); pool.update_runtime_reinit_policy( cfg.general.hardswap, cfg.general.me_pool_drain_ttl_secs, @@ -354,7 +366,7 @@ async fn run_update_cycle( let mut maps_changed = false; let mut ready_v4: Option<(ProxyConfigData, u64)> = None; - let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await; + let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig", upstream.clone()).await; if let Some(cfg_v4) = cfg_v4 && snapshot_passes_guards(cfg, &cfg_v4, "getProxyConfig") { @@ -378,7 +390,7 @@ async fn run_update_cycle( } let mut ready_v6: Option<(ProxyConfigData, u64)> = None; - let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await; + let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6", upstream.clone()).await; if let Some(cfg_v6) = cfg_v6 && snapshot_passes_guards(cfg, &cfg_v6, "getProxyConfigV6") { @@ -456,7 +468,12 @@ async fn run_update_cycle( pool.reset_stun_state(); if cfg.general.proxy_secret_rotate_runtime { - match download_proxy_secret_with_max_len(cfg.general.proxy_secret_len_max).await { + match download_proxy_secret_with_max_len_via_upstream( + cfg.general.proxy_secret_len_max, + upstream, + ) + .await + { Ok(secret) => { let secret_hash = hash_secret(&secret); let stable_hits = state.secret.observe(secret_hash); diff --git a/src/transport/middle_proxy/http_fetch.rs b/src/transport/middle_proxy/http_fetch.rs new file mode 100644 index 0000000..c1bb4f6 --- /dev/null +++ b/src/transport/middle_proxy/http_fetch.rs @@ -0,0 +1,184 @@ +use std::sync::Arc; +use std::time::Duration; + +use http_body_util::{BodyExt, Empty}; +use hyper::header::{CONNECTION, DATE, HOST, USER_AGENT}; +use hyper::{Method, Request}; +use hyper_util::rt::TokioIo; +use rustls::pki_types::ServerName; +use tokio::net::TcpStream; +use tokio::time::timeout; +use tokio_rustls::TlsConnector; +use tracing::debug; + +use crate::error::{ProxyError, Result}; +use crate::network::dns_overrides::resolve_socket_addr; +use crate::transport::{UpstreamManager, UpstreamStream}; + +const HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); +const HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(15); + +pub(crate) struct HttpsGetResponse { + pub(crate) status: u16, + pub(crate) date_header: Option, + pub(crate) body: Vec, +} + +fn build_tls_client_config() -> Arc { + let mut root_store = rustls::RootCertStore::empty(); + root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + let config = rustls::ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth(); + Arc::new(config) +} + +fn extract_host_port_path(url: &str) -> Result<(String, u16, String)> { + let parsed = url::Url::parse(url) + .map_err(|e| ProxyError::Proxy(format!("invalid URL '{url}': {e}")))?; + if parsed.scheme() != "https" { + return Err(ProxyError::Proxy(format!( + "unsupported URL scheme '{}': only https is supported", + parsed.scheme() + ))); + } + + let host = parsed + .host_str() + .ok_or_else(|| ProxyError::Proxy(format!("URL has no host: {url}")))? + .to_string(); + let port = parsed + .port_or_known_default() + .ok_or_else(|| ProxyError::Proxy(format!("URL has no known port: {url}")))?; + + let mut path = parsed.path().to_string(); + if path.is_empty() { + path.push('/'); + } + if let Some(query) = parsed.query() { + path.push('?'); + path.push_str(query); + } + + Ok((host, port, path)) +} + +async fn resolve_target_addr(host: &str, port: u16) -> Result { + if let Some(addr) = resolve_socket_addr(host, port) { + return Ok(addr); + } + + let addrs: Vec = tokio::net::lookup_host((host, port)) + .await + .map_err(|e| ProxyError::Proxy(format!("DNS resolve failed for {host}:{port}: {e}")))? + .collect(); + + if let Some(addr) = addrs.iter().copied().find(|addr| addr.is_ipv4()) { + return Ok(addr); + } + + addrs + .first() + .copied() + .ok_or_else(|| ProxyError::Proxy(format!("DNS returned no addresses for {host}:{port}"))) +} + +async fn connect_https_transport( + host: &str, + port: u16, + upstream: Option>, +) -> Result { + if let Some(manager) = upstream { + let target = resolve_target_addr(host, port).await?; + return timeout(HTTP_CONNECT_TIMEOUT, manager.connect(target, None, None)) + .await + .map_err(|_| { + ProxyError::Proxy(format!("upstream connect timeout for {host}:{port}")) + })? + .map_err(|e| { + ProxyError::Proxy(format!( + "upstream connect failed for {host}:{port}: {e}" + )) + }); + } + + if let Some(addr) = resolve_socket_addr(host, port) { + let stream = timeout(HTTP_CONNECT_TIMEOUT, TcpStream::connect(addr)) + .await + .map_err(|_| ProxyError::Proxy(format!("connect timeout for {host}:{port}")))? + .map_err(|e| ProxyError::Proxy(format!("connect failed for {host}:{port}: {e}")))?; + return Ok(UpstreamStream::Tcp(stream)); + } + + let stream = timeout(HTTP_CONNECT_TIMEOUT, TcpStream::connect((host, port))) + .await + .map_err(|_| ProxyError::Proxy(format!("connect timeout for {host}:{port}")))? + .map_err(|e| ProxyError::Proxy(format!("connect failed for {host}:{port}: {e}")))?; + Ok(UpstreamStream::Tcp(stream)) +} + +pub(crate) async fn https_get( + url: &str, + upstream: Option>, +) -> Result { + let (host, port, path_and_query) = extract_host_port_path(url)?; + let stream = connect_https_transport(&host, port, upstream).await?; + + let server_name = ServerName::try_from(host.clone()) + .map_err(|_| ProxyError::Proxy(format!("invalid TLS server name: {host}")))?; + let connector = TlsConnector::from(build_tls_client_config()); + let tls_stream = timeout(HTTP_REQUEST_TIMEOUT, connector.connect(server_name, stream)) + .await + .map_err(|_| ProxyError::Proxy(format!("TLS handshake timeout for {host}:{port}")))? + .map_err(|e| ProxyError::Proxy(format!("TLS handshake failed for {host}:{port}: {e}")))?; + + let (mut sender, connection) = hyper::client::conn::http1::handshake(TokioIo::new(tls_stream)) + .await + .map_err(|e| ProxyError::Proxy(format!("HTTP handshake failed for {host}:{port}: {e}")))?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + debug!(error = %e, "HTTPS fetch connection task failed"); + } + }); + + let host_header = if port == 443 { + host.clone() + } else { + format!("{host}:{port}") + }; + + let request = Request::builder() + .method(Method::GET) + .uri(path_and_query) + .header(HOST, host_header) + .header(USER_AGENT, "telemt-middle-proxy/1") + .header(CONNECTION, "close") + .body(Empty::::new()) + .map_err(|e| ProxyError::Proxy(format!("build HTTP request failed for {url}: {e}")))?; + + let response = timeout(HTTP_REQUEST_TIMEOUT, sender.send_request(request)) + .await + .map_err(|_| ProxyError::Proxy(format!("HTTP request timeout for {url}")))? + .map_err(|e| ProxyError::Proxy(format!("HTTP request failed for {url}: {e}")))?; + + let status = response.status().as_u16(); + let date_header = response + .headers() + .get(DATE) + .and_then(|value| value.to_str().ok()) + .map(|value| value.to_string()); + + let body = timeout(HTTP_REQUEST_TIMEOUT, response.into_body().collect()) + .await + .map_err(|_| ProxyError::Proxy(format!("HTTP body read timeout for {url}")))? + .map_err(|e| ProxyError::Proxy(format!("HTTP body read failed for {url}: {e}")))? + .to_bytes() + .to_vec(); + + Ok(HttpsGetResponse { + status, + date_header, + body, + }) +} diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index 5536869..3a3642a 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -4,6 +4,7 @@ mod codec; mod config_updater; mod handshake; mod health; +mod http_fetch; #[cfg(test)] #[path = "tests/health_adversarial_tests.rs"] mod health_adversarial_tests; @@ -44,7 +45,8 @@ use bytes::Bytes; #[allow(unused_imports)] pub use config_updater::{ - ProxyConfigData, fetch_proxy_config, fetch_proxy_config_with_raw, load_proxy_config_cache, + ProxyConfigData, fetch_proxy_config, fetch_proxy_config_via_upstream, + fetch_proxy_config_with_raw, fetch_proxy_config_with_raw_via_upstream, load_proxy_config_cache, me_config_updater, save_proxy_config_cache, }; pub use health::{me_drain_timeout_enforcer, me_health_monitor, me_zombie_writer_watchdog}; @@ -57,7 +59,7 @@ pub use pool::MePool; pub use pool_nat::{detect_public_ip, stun_probe}; pub use registry::ConnRegistry; pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task}; -pub use secret::fetch_proxy_secret; +pub use secret::{fetch_proxy_secret, fetch_proxy_secret_with_upstream}; pub(crate) use selftest::{bnd_snapshot, timeskew_snapshot, upstream_bnd_snapshots}; pub use wire::proto_flags_for_tag; diff --git a/src/transport/middle_proxy/secret.rs b/src/transport/middle_proxy/secret.rs index 504270a..450c80a 100644 --- a/src/transport/middle_proxy/secret.rs +++ b/src/transport/middle_proxy/secret.rs @@ -1,8 +1,11 @@ use httpdate; +use std::sync::Arc; use std::time::SystemTime; use tracing::{debug, info, warn}; +use super::http_fetch::https_get; use super::selftest::record_timeskew_sample; +use crate::transport::UpstreamManager; use crate::error::{ProxyError, Result}; pub const PROXY_SECRET_MIN_LEN: usize = 32; @@ -34,10 +37,19 @@ pub(super) fn validate_proxy_secret_len(data_len: usize, max_len: usize) -> Resu /// Fetch Telegram proxy-secret binary. pub async fn fetch_proxy_secret(cache_path: Option<&str>, max_len: usize) -> Result> { + fetch_proxy_secret_with_upstream(cache_path, max_len, None).await +} + +/// Fetch Telegram proxy-secret binary, optionally through upstream routing. +pub async fn fetch_proxy_secret_with_upstream( + cache_path: Option<&str>, + max_len: usize, + upstream: Option>, +) -> Result> { let cache = cache_path.unwrap_or("proxy-secret"); // 1) Try fresh download first. - match download_proxy_secret_with_max_len(max_len).await { + match download_proxy_secret_with_max_len_via_upstream(max_len, upstream).await { Ok(data) => { if let Err(e) = tokio::fs::write(cache, &data).await { warn!(error = %e, "Failed to cache proxy-secret (non-fatal)"); @@ -77,19 +89,23 @@ pub async fn fetch_proxy_secret(cache_path: Option<&str>, max_len: usize) -> Res } pub async fn download_proxy_secret_with_max_len(max_len: usize) -> Result> { - let resp = reqwest::get("https://core.telegram.org/getProxySecret") - .await - .map_err(|e| ProxyError::Proxy(format!("Failed to download proxy-secret: {e}")))?; + download_proxy_secret_with_max_len_via_upstream(max_len, None).await +} - if !resp.status().is_success() { +pub async fn download_proxy_secret_with_max_len_via_upstream( + max_len: usize, + upstream: Option>, +) -> Result> { + let resp = https_get("https://core.telegram.org/getProxySecret", upstream).await?; + + if !(200..=299).contains(&resp.status) { return Err(ProxyError::Proxy(format!( "proxy-secret download HTTP {}", - resp.status() + resp.status ))); } - if let Some(date) = resp.headers().get(reqwest::header::DATE) - && let Ok(date_str) = date.to_str() + if let Some(date_str) = resp.date_header.as_deref() && let Ok(server_time) = httpdate::parse_http_date(date_str) && let Ok(skew) = SystemTime::now() .duration_since(server_time) @@ -110,11 +126,7 @@ pub async fn download_proxy_secret_with_max_len(max_len: usize) -> Result