From f744e93de66cf66aa36666b66887c12030568bc7 Mon Sep 17 00:00:00 2001 From: Flowseal Date: Thu, 12 Mar 2026 19:36:02 +0300 Subject: [PATCH] Mobiles media fix, optimizations --- proxy/tg_ws_proxy.py | 167 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 145 insertions(+), 22 deletions(-) diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index b6895c9..6f2dd66 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -32,24 +32,34 @@ _TG_RANGES = [ struct.unpack('!I', _socket.inet_aton('91.108.255.255'))[0]), ] -_IP_TO_DC: Dict[str, int] = { +# IP -> (dc_id, is_media) +_IP_TO_DC: Dict[str, Tuple[int, bool]] = { # DC1 - '149.154.175.50': 1, '149.154.175.51': 1, '149.154.175.54': 1, + '149.154.175.50': (1, False), '149.154.175.51': (1, False), + '149.154.175.53': (1, False), '149.154.175.54': (1, False), + '149.154.175.52': (1, True), # DC2 - '149.154.167.41': 2, - '149.154.167.50': 2, '149.154.167.51': 2, '149.154.167.220': 2, + '149.154.167.41': (2, False), '149.154.167.50': (2, False), + '149.154.167.51': (2, False), '149.154.167.220': (2, False), + '95.161.76.100': (2, False), + '149.154.167.151': (2, True), '149.154.167.222': (2, True), + '149.154.167.223': (2, True), # DC3 - '149.154.175.100': 3, '149.154.175.101': 3, + '149.154.175.100': (3, False), '149.154.175.101': (3, False), + '149.154.175.102': (3, True), # DC4 - '149.154.167.91': 4, '149.154.167.92': 4, + '149.154.167.91': (4, False), '149.154.167.92': (4, False), + '149.154.164.250': (4, True), '149.154.166.120': (4, True), + '149.154.166.121': (4, True), '149.154.167.118': (4, True), + '149.154.165.111': (4, True), # DC5 - '91.108.56.100': 5, - '91.108.56.126': 5, '91.108.56.101': 5, '91.108.56.116': 5, + '91.108.56.100': (5, False), '91.108.56.101': (5, False), + '91.108.56.116': (5, False), '91.108.56.126': (5, False), + '149.154.171.5': (5, False), + '91.108.56.102': (5, True), '91.108.56.128': (5, True), + '91.108.56.151': (5, True), # DC203 - '91.105.192.100': 203, - # Media DCs - # '149.154.167.151': 2, '149.154.167.223': 2, - # '149.154.166.120': 4, '149.154.166.121': 4, + '91.105.192.100': (203, False), } _dc_opt: Dict[int, Optional[str]] = {} @@ -86,10 +96,9 @@ class WsHandshakeError(Exception): def _xor_mask(data: bytes, mask: bytes) -> bytes: if not data: return data - a = bytearray(data) - for i in range(len(a)): - a[i] ^= mask[i & 3] - return bytes(a) + n = len(data) + mask_rep = (mask * (n // 4 + 1))[:n] + return (int.from_bytes(data, 'big') ^ int.from_bytes(mask_rep, 'big')).to_bytes(n, 'big') class RawWebSocket: @@ -193,6 +202,15 @@ class RawWebSocket: self.writer.write(frame) await self.writer.drain() + async def send_batch(self, parts: List[bytes]): + """Send multiple binary frames with a single drain (less overhead).""" + if self._closed: + raise ConnectionError("WebSocket closed") + for part in parts: + frame = self._build_frame(self.OP_BINARY, part, mask=True) + self.writer.write(frame) + await self.writer.drain() + async def recv(self) -> Optional[bytes]: """ Receive the next data frame. Handles ping/pong/close @@ -343,6 +361,85 @@ def _dc_from_init(data: bytes) -> Tuple[Optional[int], bool]: return None, False +def _patch_init_dc(data: bytes, dc: int) -> bytes: + """ + Patch dc_id in the 64-byte MTProto init packet. + + Mobile clients with useSecret=0 leave bytes 60-61 as random. + The WS relay needs a valid dc_id to route correctly. + """ + if len(data) < 64: + return data + + new_dc = struct.pack(' %d", dc) + if len(data) > 64: + return bytes(patched) + data[64:] + return bytes(patched) + except Exception: + return data + + +class _MsgSplitter: + """ + Splits client TCP data into individual MTProto abridged-protocol + messages so each can be sent as a separate WebSocket frame. + + The Telegram WS relay processes one MTProto message per WS frame. + Mobile clients batches multiple messages in a single TCP write (e.g. + msgs_ack + req_DH_params). If sent as one WS frame, the relay + only processes the first message — DH handshake never completes. + """ + + def __init__(self, init_data: bytes): + key_raw = bytes(init_data[8:40]) + iv = bytes(init_data[40:56]) + cipher = Cipher(algorithms.AES(key_raw), modes.CTR(iv)) + self._dec = cipher.encryptor() + self._dec.update(b'\x00' * 64) # skip init packet + + def split(self, chunk: bytes) -> List[bytes]: + """Decrypt to find message boundaries, return split ciphertext.""" + plain = self._dec.update(chunk) + boundaries = [] + pos = 0 + while pos < len(plain): + first = plain[pos] + if first == 0x7f: + if pos + 4 > len(plain): + break + msg_len = ( + struct.unpack_from(' len(plain): + break + pos += msg_len + boundaries.append(pos) + if len(boundaries) <= 1: + return [chunk] + parts = [] + prev = 0 + for b in boundaries: + parts.append(chunk[prev:b]) + prev = b + if prev < len(chunk): + parts.append(chunk[prev:]) + return parts + + def _ws_domains(dc: int, is_media) -> List[str]: """ Return domain names to try for WebSocket connection to a DC. @@ -381,7 +478,8 @@ _stats = Stats() async def _bridge_ws(reader, writer, ws: RawWebSocket, label, - dc=None, dst=None, port=None, is_media=False): + dc=None, dst=None, port=None, is_media=False, + splitter: _MsgSplitter = None): """Bidirectional TCP <-> WebSocket forwarding.""" dc_tag = f"DC{dc}{'m' if is_media else ''}" if dc else "DC?" dst_tag = f"{dst}:{port}" if dst else "?" @@ -396,13 +494,20 @@ async def _bridge_ws(reader, writer, ws: RawWebSocket, label, nonlocal up_bytes, up_packets try: while True: - chunk = await reader.read(65536) + chunk = await reader.read(131072) if not chunk: break _stats.bytes_up += len(chunk) up_bytes += len(chunk) up_packets += 1 - await ws.send(chunk) + if splitter: + parts = splitter.split(chunk) + if len(parts) > 1: + await ws.send_batch(parts) + else: + await ws.send(parts[0]) + else: + await ws.send(chunk) except (asyncio.CancelledError, ConnectionError, OSError): return except Exception as e: @@ -419,7 +524,10 @@ async def _bridge_ws(reader, writer, ws: RawWebSocket, label, down_bytes += len(data) down_packets += 1 writer.write(data) - await writer.drain() + # drain only when kernel buffer is filling up + buf = writer.transport.get_write_buffer_size() + if buf > 262144: + await writer.drain() except (asyncio.CancelledError, ConnectionError, OSError): return except Exception as e: @@ -639,8 +747,15 @@ async def _handle_client(reader, writer): # -- Extract DC ID -- dc, is_media = _dc_from_init(init) + init_patched = False + + # Android (may be ios too) with useSecret=0 has random dc_id bytes — patch it if dc is None and dst in _IP_TO_DC: - dc = _IP_TO_DC.get(dst) + dc, is_media = _IP_TO_DC.get(dst) + if is_media and dc > 0: dc = -dc + if dc in _dc_opt: + init = _patch_init_dc(init, dc) + init_patched = True if dc is None or dc not in _dc_opt: log.warning("[%s] unknown DC%s for %s:%d -> TCP passthrough", @@ -745,12 +860,20 @@ async def _handle_client(reader, writer): _dc_fail_until.pop(dc_key, None) _stats.connections_ws += 1 + splitter = None + if init_patched: + try: + splitter = _MsgSplitter(init) + except Exception: + pass + # Send the buffered init packet await ws.send(init) # Bidirectional bridge await _bridge_ws(reader, writer, ws, label, - dc=dc, dst=dst, port=port, is_media=is_media) + dc=dc, dst=dst, port=port, is_media=is_media, + splitter=splitter) except asyncio.TimeoutError: log.warning("[%s] timeout during SOCKS5 handshake", label)