Mobiles media fix, optimizations

This commit is contained in:
Flowseal 2026-03-12 19:36:02 +03:00
parent 6147cda356
commit f744e93de6
1 changed files with 145 additions and 22 deletions

View File

@ -32,24 +32,34 @@ _TG_RANGES = [
struct.unpack('!I', _socket.inet_aton('91.108.255.255'))[0]),
]
_IP_TO_DC: Dict[str, int] = {
# IP -> (dc_id, is_media)
_IP_TO_DC: Dict[str, Tuple[int, bool]] = {
# DC1
'149.154.175.50': 1, '149.154.175.51': 1, '149.154.175.54': 1,
'149.154.175.50': (1, False), '149.154.175.51': (1, False),
'149.154.175.53': (1, False), '149.154.175.54': (1, False),
'149.154.175.52': (1, True),
# DC2
'149.154.167.41': 2,
'149.154.167.50': 2, '149.154.167.51': 2, '149.154.167.220': 2,
'149.154.167.41': (2, False), '149.154.167.50': (2, False),
'149.154.167.51': (2, False), '149.154.167.220': (2, False),
'95.161.76.100': (2, False),
'149.154.167.151': (2, True), '149.154.167.222': (2, True),
'149.154.167.223': (2, True),
# DC3
'149.154.175.100': 3, '149.154.175.101': 3,
'149.154.175.100': (3, False), '149.154.175.101': (3, False),
'149.154.175.102': (3, True),
# DC4
'149.154.167.91': 4, '149.154.167.92': 4,
'149.154.167.91': (4, False), '149.154.167.92': (4, False),
'149.154.164.250': (4, True), '149.154.166.120': (4, True),
'149.154.166.121': (4, True), '149.154.167.118': (4, True),
'149.154.165.111': (4, True),
# DC5
'91.108.56.100': 5,
'91.108.56.126': 5, '91.108.56.101': 5, '91.108.56.116': 5,
'91.108.56.100': (5, False), '91.108.56.101': (5, False),
'91.108.56.116': (5, False), '91.108.56.126': (5, False),
'149.154.171.5': (5, False),
'91.108.56.102': (5, True), '91.108.56.128': (5, True),
'91.108.56.151': (5, True),
# DC203
'91.105.192.100': 203,
# Media DCs
# '149.154.167.151': 2, '149.154.167.223': 2,
# '149.154.166.120': 4, '149.154.166.121': 4,
'91.105.192.100': (203, False),
}
_dc_opt: Dict[int, Optional[str]] = {}
@ -86,10 +96,9 @@ class WsHandshakeError(Exception):
def _xor_mask(data: bytes, mask: bytes) -> bytes:
if not data:
return data
a = bytearray(data)
for i in range(len(a)):
a[i] ^= mask[i & 3]
return bytes(a)
n = len(data)
mask_rep = (mask * (n // 4 + 1))[:n]
return (int.from_bytes(data, 'big') ^ int.from_bytes(mask_rep, 'big')).to_bytes(n, 'big')
class RawWebSocket:
@ -193,6 +202,15 @@ class RawWebSocket:
self.writer.write(frame)
await self.writer.drain()
async def send_batch(self, parts: List[bytes]):
"""Send multiple binary frames with a single drain (less overhead)."""
if self._closed:
raise ConnectionError("WebSocket closed")
for part in parts:
frame = self._build_frame(self.OP_BINARY, part, mask=True)
self.writer.write(frame)
await self.writer.drain()
async def recv(self) -> Optional[bytes]:
"""
Receive the next data frame. Handles ping/pong/close
@ -343,6 +361,85 @@ def _dc_from_init(data: bytes) -> Tuple[Optional[int], bool]:
return None, False
def _patch_init_dc(data: bytes, dc: int) -> bytes:
"""
Patch dc_id in the 64-byte MTProto init packet.
Mobile clients with useSecret=0 leave bytes 60-61 as random.
The WS relay needs a valid dc_id to route correctly.
"""
if len(data) < 64:
return data
new_dc = struct.pack('<h', dc)
try:
key_raw = bytes(data[8:40])
iv = bytes(data[40:56])
cipher = Cipher(algorithms.AES(key_raw), modes.CTR(iv))
enc = cipher.encryptor()
ks = enc.update(b'\x00' * 64) + enc.finalize()
patched = bytearray(data[:64])
patched[60] = ks[60] ^ new_dc[0]
patched[61] = ks[61] ^ new_dc[1]
log.debug("init patched: dc_id -> %d", dc)
if len(data) > 64:
return bytes(patched) + data[64:]
return bytes(patched)
except Exception:
return data
class _MsgSplitter:
"""
Splits client TCP data into individual MTProto abridged-protocol
messages so each can be sent as a separate WebSocket frame.
The Telegram WS relay processes one MTProto message per WS frame.
Mobile clients batches multiple messages in a single TCP write (e.g.
msgs_ack + req_DH_params). If sent as one WS frame, the relay
only processes the first message DH handshake never completes.
"""
def __init__(self, init_data: bytes):
key_raw = bytes(init_data[8:40])
iv = bytes(init_data[40:56])
cipher = Cipher(algorithms.AES(key_raw), modes.CTR(iv))
self._dec = cipher.encryptor()
self._dec.update(b'\x00' * 64) # skip init packet
def split(self, chunk: bytes) -> List[bytes]:
"""Decrypt to find message boundaries, return split ciphertext."""
plain = self._dec.update(chunk)
boundaries = []
pos = 0
while pos < len(plain):
first = plain[pos]
if first == 0x7f:
if pos + 4 > len(plain):
break
msg_len = (
struct.unpack_from('<I', plain, pos + 1)[0] & 0xFFFFFF
) * 4
pos += 4
else:
msg_len = first * 4
pos += 1
if msg_len == 0 or pos + msg_len > len(plain):
break
pos += msg_len
boundaries.append(pos)
if len(boundaries) <= 1:
return [chunk]
parts = []
prev = 0
for b in boundaries:
parts.append(chunk[prev:b])
prev = b
if prev < len(chunk):
parts.append(chunk[prev:])
return parts
def _ws_domains(dc: int, is_media) -> List[str]:
"""
Return domain names to try for WebSocket connection to a DC.
@ -381,7 +478,8 @@ _stats = Stats()
async def _bridge_ws(reader, writer, ws: RawWebSocket, label,
dc=None, dst=None, port=None, is_media=False):
dc=None, dst=None, port=None, is_media=False,
splitter: _MsgSplitter = None):
"""Bidirectional TCP <-> WebSocket forwarding."""
dc_tag = f"DC{dc}{'m' if is_media else ''}" if dc else "DC?"
dst_tag = f"{dst}:{port}" if dst else "?"
@ -396,13 +494,20 @@ async def _bridge_ws(reader, writer, ws: RawWebSocket, label,
nonlocal up_bytes, up_packets
try:
while True:
chunk = await reader.read(65536)
chunk = await reader.read(131072)
if not chunk:
break
_stats.bytes_up += len(chunk)
up_bytes += len(chunk)
up_packets += 1
await ws.send(chunk)
if splitter:
parts = splitter.split(chunk)
if len(parts) > 1:
await ws.send_batch(parts)
else:
await ws.send(parts[0])
else:
await ws.send(chunk)
except (asyncio.CancelledError, ConnectionError, OSError):
return
except Exception as e:
@ -419,7 +524,10 @@ async def _bridge_ws(reader, writer, ws: RawWebSocket, label,
down_bytes += len(data)
down_packets += 1
writer.write(data)
await writer.drain()
# drain only when kernel buffer is filling up
buf = writer.transport.get_write_buffer_size()
if buf > 262144:
await writer.drain()
except (asyncio.CancelledError, ConnectionError, OSError):
return
except Exception as e:
@ -639,8 +747,15 @@ async def _handle_client(reader, writer):
# -- Extract DC ID --
dc, is_media = _dc_from_init(init)
init_patched = False
# Android (may be ios too) with useSecret=0 has random dc_id bytes — patch it
if dc is None and dst in _IP_TO_DC:
dc = _IP_TO_DC.get(dst)
dc, is_media = _IP_TO_DC.get(dst)
if is_media and dc > 0: dc = -dc
if dc in _dc_opt:
init = _patch_init_dc(init, dc)
init_patched = True
if dc is None or dc not in _dc_opt:
log.warning("[%s] unknown DC%s for %s:%d -> TCP passthrough",
@ -745,12 +860,20 @@ async def _handle_client(reader, writer):
_dc_fail_until.pop(dc_key, None)
_stats.connections_ws += 1
splitter = None
if init_patched:
try:
splitter = _MsgSplitter(init)
except Exception:
pass
# Send the buffered init packet
await ws.send(init)
# Bidirectional bridge
await _bridge_ws(reader, writer, ws, label,
dc=dc, dst=dst, port=port, is_media=is_media)
dc=dc, dst=dst, port=port, is_media=is_media,
splitter=splitter)
except asyncio.TimeoutError:
log.warning("[%s] timeout during SOCKS5 handshake", label)