From c8f6f8caf450b039f496388f7c849ff8f5db2f16 Mon Sep 17 00:00:00 2001 From: Flowseal Date: Fri, 26 Jun 2026 18:37:30 +0300 Subject: [PATCH] Fronting fallback --- proxy/pool.py | 8 ++++--- proxy/raw_websocket.py | 9 ++++++-- proxy/stats.py | 2 ++ proxy/tg_ws_proxy.py | 51 +++++++++++++++++++++++++++++++++++++++++- 4 files changed, 64 insertions(+), 6 deletions(-) diff --git a/proxy/pool.py b/proxy/pool.py index 3523bb1..46abd11 100644 --- a/proxy/pool.py +++ b/proxy/pool.py @@ -19,6 +19,7 @@ class _WsPool: def __init__(self): self._idle: Dict[Tuple[int, bool], deque] = {} self._refilling: Set[Tuple[int, bool]] = set() + self.fronting_until: float = 0.0 async def get(self, dc: int, is_media: bool, target_ip: str, domains: List[str] @@ -61,7 +62,7 @@ class _WsPool: if needed <= 0: return tasks = [asyncio.create_task( - self._connect_one(target_ip, domains)) + self._connect_one(target_ip, domains, time.monotonic() < self.fronting_until)) for _ in range(needed)] for t in tasks: try: @@ -76,11 +77,11 @@ class _WsPool: self._refilling.discard(key) @staticmethod - async def _connect_one(target_ip, domains) -> Optional[RawWebSocket]: + async def _connect_one(target_ip, domains, fronting_active) -> Optional[RawWebSocket]: for domain in domains: try: return await RawWebSocket.connect( - target_ip, domain, timeout=8) + target_ip, domain, timeout=8, sni="sprinthost.ru" if fronting_active else None) except WsHandshakeError as exc: if exc.is_redirect: continue @@ -108,6 +109,7 @@ class _WsPool: def reset(self): self._idle.clear() self._refilling.clear() + self.fronting_until = 0.0 class _CfWorkerPool: diff --git a/proxy/raw_websocket.py b/proxy/raw_websocket.py index 7dc9aa3..30d452b 100644 --- a/proxy/raw_websocket.py +++ b/proxy/raw_websocket.py @@ -82,10 +82,14 @@ class RawWebSocket: @staticmethod async def connect(host: str, domain: str, timeout: float = 10.0, - path: str = '/apiws') -> 'RawWebSocket': + path: str = '/apiws', *, + sni: Optional[str] = None) -> 'RawWebSocket': + if sni is None: + sni = domain + reader, writer = await asyncio.wait_for( asyncio.open_connection(host, 443, ssl=_ssl_ctx, - server_hostname=domain), + server_hostname=sni), timeout=min(timeout, 10)) set_sock_opts(writer.transport, proxy_config.buffer_size) @@ -102,6 +106,7 @@ class RawWebSocket: f'Sec-WebSocket-Protocol: binary\r\n' f'\r\n' ) + writer.write(req.encode()) await writer.drain() diff --git a/proxy/stats.py b/proxy/stats.py index 111f29a..b61390c 100644 --- a/proxy/stats.py +++ b/proxy/stats.py @@ -7,6 +7,7 @@ class _Stats: self.connections_ws = 0 self.connections_tcp_fallback = 0 self.connections_cfproxy = 0 + self.connections_fronting = 0 self.connections_bad = 0 self.connections_masked = 0 self.ws_errors = 0 @@ -29,6 +30,7 @@ class _Stats: f"ws={self.connections_ws} " f"tcp_fb={self.connections_tcp_fallback} " f"cf={self.connections_cfproxy} " + f"front={self.connections_fronting} " f"bad={self.connections_bad} " f"masked={self.connections_masked} " f"err={self.ws_errors} " diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index 13fdaf5..3b5825c 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -35,10 +35,12 @@ log = logging.getLogger('tg-mtproto-proxy') DC_FAIL_COOLDOWN = 30.0 WS_FAIL_TIMEOUT = 2.0 +FRONTING_COOLDOWN = 1800.0 LISTENER_CHECK_INTERVAL = 5.0 LISTENER_RESTART_DELAY = 1.0 ws_blacklist: Set[str] = set() dc_fail_until: Dict[str, float] = {} +fronting_until: float = 0.0 def _try_handshake(handshake: bytes, secret: bytes) -> Optional[Tuple[int, bool, bytes, bytes]]: @@ -246,6 +248,8 @@ def _build_crypto_ctx(client_dec_prekey_iv, secret, relay_init): async def _handle_client(reader, writer, secret: bytes): + global fronting_until + stats.connections_total += 1 stats.connections_active += 1 peer = writer.get_extra_info('peername') @@ -317,17 +321,37 @@ async def _handle_client(reader, writer, secret: bytes): now = time.monotonic() fail_until = dc_fail_until.get(dc_key, 0) ws_timeout = WS_FAIL_TIMEOUT if now < fail_until else 10.0 + fronting_active = now < fronting_until domains = ws_domains(dc, is_media) target = proxy_config.dc_redirects[dc] ws = None ws_failed_redirect = False + ws_timed_out = False all_redirects = True ws = await ws_pool.get(dc, is_media, target, domains) if ws: log.info("[%s] DC%d%s -> pool hit via %s", label, dc, media_tag, target) + elif fronting_active: + log.info("[%s] DC%d%s -> fronting / Host %s", + label, dc, media_tag, domains[0]) + try: + ws = await RawWebSocket.connect(target, domains[0], + timeout=10.0, + sni="sprinthost.ru") + except Exception as exc: + stats.ws_errors += 1 + log.warning("[%s] DC%d%s fronting failed: %s", + label, dc, media_tag, repr(exc)) + if ws: + stats.connections_fronting += 1 + fronting_until = now + FRONTING_COOLDOWN + ws_pool.fronting_until = fronting_until + else: + fronting_until = 0.0 + ws_pool.fronting_until = 0.0 else: for domain in domains: url = f'wss://{domain}/apiws' @@ -351,12 +375,36 @@ async def _handle_client(reader, writer, secret: bytes): all_redirects = False log.warning("[%s] DC%d%s WS handshake: %s", label, dc, media_tag, exc.status_line) + except asyncio.TimeoutError: + stats.ws_errors += 1 + ws_timed_out = True + log.warning("[%s] DC%d%s WS connect timed out via %s", + label, dc, media_tag, domain) except Exception as exc: stats.ws_errors += 1 all_redirects = False log.warning("[%s] DC%d%s WS connect failed: %s", label, dc, media_tag, repr(exc)) + # Fronting fallback if WS timed out + if ws is None and ws_timed_out and not fronting_active: + log.info("[%s] DC%d%s -> fronting fallback (Host %s)", + label, dc, media_tag, domains[0]) + try: + ws = await RawWebSocket.connect(target, domains[0], + timeout=10.0, + sni="sprinthost.ru") + except Exception as exc: + stats.ws_errors += 1 + log.warning("[%s] DC%d%s fronting failed: %s", + label, dc, media_tag, repr(exc)) + if ws: + fronting_until = now + FRONTING_COOLDOWN + ws_pool.fronting_until = now + FRONTING_COOLDOWN + stats.connections_fronting += 1 + log.info("[%s] DC%d%s fronting OK for %ds", + label, dc, media_tag, int(FRONTING_COOLDOWN)) + # WS failed -> fallback if ws is None: if ws_failed_redirect and all_redirects: @@ -431,7 +479,7 @@ _client_tasks: Set[asyncio.Task] = set() async def _run(stop_event: Optional[asyncio.Event] = None): - global _server_instance, _server_stop_event + global _server_instance, _server_stop_event, fronting_until _server_stop_event = stop_event ws_pool.reset() @@ -439,6 +487,7 @@ async def _run(stop_event: Optional[asyncio.Event] = None): ws_blacklist.clear() dc_fail_until.clear() _client_tasks.clear() + fronting_until = 0.0 if proxy_config.fallback_cfproxy: user = proxy_config.cfproxy_user_domains