diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index f25738c..934ce17 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -618,6 +618,7 @@ async def _bridge_ws(reader, writer, ws: RawWebSocket, label, up_packets = 0 down_packets = 0 start_time = asyncio.get_event_loop().time() + last_recv_time = start_time async def tcp_to_ws(): nonlocal up_bytes, up_packets @@ -644,31 +645,48 @@ async def _bridge_ws(reader, writer, ws: RawWebSocket, label, log.debug("[%s] tcp->ws ended: %s", label, e) async def ws_to_tcp(): - nonlocal down_bytes, down_packets + nonlocal down_bytes, down_packets, last_recv_time try: while True: data = await ws.recv() if data is None: break + last_recv_time = asyncio.get_event_loop().time() n = len(data) _stats.bytes_down += n down_bytes += n down_packets += 1 writer.write(data) - # drain only when kernel buffer is filling up - buf = writer.transport.get_write_buffer_size() - if buf > _SEND_BUF: - await writer.drain() + await writer.drain() except (asyncio.CancelledError, ConnectionError, OSError): return except Exception as e: log.debug("[%s] ws->tcp ended: %s", label, e) + async def ws_keepalive(): + try: + while not ws._closed: + await asyncio.sleep(2) + idle = asyncio.get_event_loop().time() - last_recv_time + if idle >= 2 and not ws._closed: + try: + ws.writer.write( + ws._build_frame(ws.OP_PING, b'', mask=True)) + await ws.writer.drain() + log.debug("[%s] %s WS PING (idle %.1fs)", + label, dc_tag, idle) + except Exception: + break + except asyncio.CancelledError: + pass + + ka_task = asyncio.create_task(ws_keepalive()) tasks = [asyncio.create_task(tcp_to_ws()), asyncio.create_task(ws_to_tcp())] try: await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) finally: + ka_task.cancel() for t in tasks: t.cancel() for t in tasks: diff --git a/stress_test.py b/stress_test.py new file mode 100644 index 0000000..f2fd88c --- /dev/null +++ b/stress_test.py @@ -0,0 +1,246 @@ +""" +Stress-test: сравнение OLD vs NEW реализаций горячих функций прокси. + +Тестируются: + 1. _build_frame — сборка WS-фрейма (masked binary) + 2. _build_frame — сборка WS-фрейма (unmasked) + 3. _socks5_reply — генерация SOCKS5-ответа + 4. _dc_from_init XOR-часть (bytes(a^b for …) vs int.from_bytes) + 5. mask key generation (os.urandom vs PRNG) +""" + +import gc +import os +import random +import struct +import time + +# ── Размеры данных, типичные для Telegram ────────────────────────── +SMALL = 64 # init-пакет / ack +MEDIUM = 1024 # текстовое сообщение +LARGE = 65536 # фото / голосовое + + +# ═══════════════════════════════════════════════════════════════════ +# XOR mask (не менялся — для полноты) +# ═══════════════════════════════════════════════════════════════════ + +def xor_mask(data: bytes, mask: bytes) -> bytes: + if not data: + return data + n = len(data) + mask_rep = (mask * (n // 4 + 1))[:n] + return (int.from_bytes(data, 'big') ^ int.from_bytes(mask_rep, 'big')).to_bytes(n, 'big') + + +# ═══════════════════════════════════════════════════════════════════ +# _build_frame +# ═══════════════════════════════════════════════════════════════════ + +def build_frame_old(opcode: int, data: bytes, mask: bool = False) -> bytes: + """Старая: bytearray + append/extend + os.urandom.""" + header = bytearray() + header.append(0x80 | opcode) + length = len(data) + mask_bit = 0x80 if mask else 0x00 + + if length < 126: + header.append(mask_bit | length) + elif length < 65536: + header.append(mask_bit | 126) + header.extend(struct.pack('>H', length)) + else: + header.append(mask_bit | 127) + header.extend(struct.pack('>Q', length)) + + if mask: + mask_key = os.urandom(4) + header.extend(mask_key) + return bytes(header) + xor_mask(data, mask_key) + return bytes(header) + data + + +# ── Новая: pre-compiled struct + PRNG ────────────────────────────── +_st_BB = struct.Struct('>BB') +_st_BBH = struct.Struct('>BBH') +_st_BBQ = struct.Struct('>BBQ') +_st_BB4s = struct.Struct('>BB4s') +_st_BBH4s = struct.Struct('>BBH4s') +_st_BBQ4s = struct.Struct('>BBQ4s') + +_mask_rng = random.Random(int.from_bytes(os.urandom(16), 'big')) +_mask_pack = struct.Struct('>I').pack + +def _random_mask_key() -> bytes: + return _mask_pack(_mask_rng.getrandbits(32)) + +def build_frame_new(opcode: int, data: bytes, mask: bool = False) -> bytes: + """Новая: struct.pack + PRNG mask.""" + length = len(data) + fb = 0x80 | opcode + + if not mask: + if length < 126: + return _st_BB.pack(fb, length) + data + if length < 65536: + return _st_BBH.pack(fb, 126, length) + data + return _st_BBQ.pack(fb, 127, length) + data + + mask_key = _random_mask_key() + masked = xor_mask(data, mask_key) + if length < 126: + return _st_BB4s.pack(fb, 0x80 | length, mask_key) + masked + if length < 65536: + return _st_BBH4s.pack(fb, 0x80 | 126, length, mask_key) + masked + return _st_BBQ4s.pack(fb, 0x80 | 127, length, mask_key) + masked + + +# ═══════════════════════════════════════════════════════════════════ +# _socks5_reply +# ═══════════════════════════════════════════════════════════════════ + +def socks5_reply_old(status): + return bytes([0x05, status, 0x00, 0x01]) + b'\x00' * 6 + +_SOCKS5_REPLIES = {s: bytes([0x05, s, 0x00, 0x01, 0, 0, 0, 0, 0, 0]) + for s in (0x00, 0x05, 0x07, 0x08)} + +def socks5_reply_new(status): + return _SOCKS5_REPLIES[status] + + +# ═══════════════════════════════════════════════════════════════════ +# dc_from_init XOR (8 байт keystream ^ data) +# ═══════════════════════════════════════════════════════════════════ + +def dc_xor_old(data8: bytes, ks8: bytes) -> bytes: + """Старая: генераторное выражение.""" + return bytes(a ^ b for a, b in zip(data8, ks8)) + +def dc_xor_new(data8: bytes, ks8: bytes) -> bytes: + """Новая: int.from_bytes.""" + return (int.from_bytes(data8, 'big') ^ int.from_bytes(ks8, 'big')).to_bytes(8, 'big') + + +# ═══════════════════════════════════════════════════════════════════ +# mask key: os.urandom(4) vs PRNG +# ═══════════════════════════════════════════════════════════════════ + +def mask_key_old() -> bytes: + return os.urandom(4) + +def mask_key_new() -> bytes: + return _random_mask_key() + + +# ═══════════════════════════════════════════════════════════════════ +# Бенчмарк +# ═══════════════════════════════════════════════════════════════════ + +def bench(func, args_list: list, iters: int) -> float: + gc.collect() + for i in range(min(100, iters)): + func(*args_list[i % len(args_list)]) + start = time.perf_counter() + for i in range(iters): + func(*args_list[i % len(args_list)]) + elapsed = time.perf_counter() - start + return elapsed / iters * 1_000_000 # мкс + + +def compare(name: str, old_fn, new_fn, args_list: list, iters: int): + t_old = bench(old_fn, args_list, iters) + t_new = bench(new_fn, args_list, iters) + speedup = t_old / t_new if t_new > 0 else float('inf') + marker = '✅' if speedup >= 1.0 else '⚠️' + print(f" {name:.<42s} OLD {t_old:8.3f} мкс | NEW {t_new:8.3f} мкс | {speedup:5.2f}x {marker}") + + +# ═══════════════════════════════════════════════════════════════════ + +def main(): + print("=" * 74) + print(" Stress Test: OLD vs NEW (горячие функции tg_ws_proxy)") + print("=" * 74) + + N = 500_000 + + # # ── 1. _build_frame masked ──────────────────────────────────── + # print(f"\n── _build_frame masked ({N:,} итераций) ──") + # for size, label in [(SMALL, "64B"), (MEDIUM, "1KB"), (LARGE, "64KB")]: + # data_list = [(0x2, os.urandom(size), True) for _ in range(1000)] + # compare(f"build_frame masked {label}", + # build_frame_old, build_frame_new, data_list, N) + + # # ── 2. _build_frame unmasked ────────────────────────────────── + # print(f"\n── _build_frame unmasked ({N:,} итераций) ──") + # for size, label in [(SMALL, "64B"), (MEDIUM, "1KB"), (LARGE, "64KB")]: + # data_list = [(0x2, os.urandom(size), False) for _ in range(1000)] + # compare(f"build_frame unmasked {label}", + # build_frame_old, build_frame_new, data_list, N) + + # # ── 3. mask key generation ──────────────────────────────────── + # print(f"\n── mask key: os.urandom(4) vs PRNG ({N:,} итераций) ──") + # compare("mask_key", mask_key_old, mask_key_new, [()] * 100, N) + + # # ── 4. _socks5_reply ───────────────────────────────────────── + N2 = 2_000_000 + # print(f"\n── _socks5_reply ({N2:,} итераций) ──") + # compare("socks5_reply", socks5_reply_old, socks5_reply_new, + # [(s,) for s in (0x00, 0x05, 0x07, 0x08)], N2) + + # # ── 5. dc_from_init XOR (8 bytes) ──────────────────────────── + # print(f"\n── dc_xor 8B: generator vs int.from_bytes ({N2:,} итераций) ──") + # compare("dc_xor_8B", dc_xor_old, dc_xor_new, + # [(os.urandom(8), os.urandom(8)) for _ in range(1000)], N2) + + # ── 6. _read_frame struct.unpack vs pre-compiled ───────────── + print(f"\n── struct unpack read-path ({N2:,} итераций) ──") + _st_H_pre = struct.Struct('>H') + _st_Q_pre = struct.Struct('>Q') + h_bufs = [(os.urandom(2),) for _ in range(1000)] + q_bufs = [(os.urandom(8),) for _ in range(1000)] + compare("unpack >H", + lambda b: struct.unpack('>H', b), + lambda b: _st_H_pre.unpack(b), + h_bufs, N2) + compare("unpack >Q", + lambda b: struct.unpack('>Q', b), + lambda b: _st_Q_pre.unpack(b), + q_bufs, N2) + + # ── 7. dc_from_init: 2x unpack vs 1x merged ───────────────── + print(f"\n── dc_from_init unpack: 2 calls vs 1 merged ({N2:,} итераций) ──") + _st_Ih = struct.Struct('