From 6a80ca85e3c639fe7d55c888585135de0878760e Mon Sep 17 00:00:00 2001 From: Flowseal Date: Thu, 19 Mar 2026 22:07:47 +0300 Subject: [PATCH 1/5] Optimizations --- proxy/tg_ws_proxy.py | 80 ++++++++++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 32 deletions(-) diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index 8bd0c45..0d63920 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -85,6 +85,8 @@ _dc_fail_until: Dict[Tuple[int, bool], float] = {} _DC_FAIL_COOLDOWN = 30.0 # seconds to keep reduced WS timeout after failure _WS_FAIL_TIMEOUT = 2.0 # quick-retry timeout after a recent WS failure +_ZERO_64 = b'\x00' * 64 + _ssl_ctx = ssl.create_default_context() _ssl_ctx.check_hostname = False @@ -129,6 +131,15 @@ def _xor_mask(data: bytes, mask: bytes) -> bytes: return (int.from_bytes(data, 'big') ^ int.from_bytes(mask_rep, 'big')).to_bytes(n, 'big') +# Pre-compiled struct formats for WS frame building +_st_BB = struct.Struct('>BB') +_st_BBH = struct.Struct('>BBH') +_st_BBQ = struct.Struct('>BBQ') +_st_BB4s = struct.Struct('>BB4s') +_st_BBH4s = struct.Struct('>BBH4s') +_st_BBQ4s = struct.Struct('>BBQ4s') + + class RawWebSocket: """ Lightweight WebSocket client over asyncio reader/writer streams. @@ -302,25 +313,23 @@ class RawWebSocket: @staticmethod def _build_frame(opcode: int, data: bytes, mask: bool = False) -> bytes: - header = bytearray() - header.append(0x80 | opcode) # FIN=1 + opcode length = len(data) - mask_bit = 0x80 if mask else 0x00 + fb = 0x80 | opcode + if not mask: + if length < 126: + return _st_BB.pack(fb, length) + data + if length < 65536: + return _st_BBH.pack(fb, 126, length) + data + return _st_BBQ.pack(fb, 127, length) + data + + mask_key = os.urandom(4) + masked = _xor_mask(data, mask_key) if length < 126: - header.append(mask_bit | length) - elif length < 65536: - header.append(mask_bit | 126) - header.extend(struct.pack('>H', length)) - else: - header.append(mask_bit | 127) - header.extend(struct.pack('>Q', length)) - - if mask: - mask_key = os.urandom(4) - header.extend(mask_key) - return bytes(header) + _xor_mask(data, mask_key) - return bytes(header) + data + return _st_BB4s.pack(fb, 0x80 | length, mask_key) + masked + if length < 65536: + return _st_BBH4s.pack(fb, 0x80 | 126, length, mask_key) + masked + return _st_BBQ4s.pack(fb, 0x80 | 127, length, mask_key) + masked async def _read_frame(self) -> Tuple[int, bytes]: hdr = await self.reader.readexactly(2) @@ -375,8 +384,8 @@ def _dc_from_init(data: bytes) -> Tuple[Optional[int], bool]: iv = bytes(data[40:56]) cipher = Cipher(algorithms.AES(key), modes.CTR(iv)) encryptor = cipher.encryptor() - keystream = encryptor.update(b'\x00' * 64) + encryptor.finalize() - plain = bytes(a ^ b for a, b in zip(data[56:64], keystream[56:64])) + keystream = encryptor.update(_ZERO_64) + encryptor.finalize() + plain = (int.from_bytes(data[56:64], 'big') ^ int.from_bytes(keystream[56:64], 'big')).to_bytes(8, 'big') proto = struct.unpack(' bytes: iv = bytes(data[40:56]) cipher = Cipher(algorithms.AES(key_raw), modes.CTR(iv)) enc = cipher.encryptor() - ks = enc.update(b'\x00' * 64) + enc.finalize() + ks = enc.update(_ZERO_64) + enc.finalize() patched = bytearray(data[:64]) patched[60] = ks[60] ^ new_dc[0] patched[61] = ks[61] ^ new_dc[1] @@ -434,7 +443,7 @@ class _MsgSplitter: iv = bytes(init_data[40:56]) cipher = Cipher(algorithms.AES(key_raw), modes.CTR(iv)) self._dec = cipher.encryptor() - self._dec.update(b'\x00' * 64) # skip init packet + self._dec.update(_ZERO_64) # skip init packet def split(self, chunk: bytes) -> List[bytes]: """Decrypt to find message boundaries, return split ciphertext.""" @@ -617,8 +626,9 @@ async def _bridge_ws(reader, writer, ws: RawWebSocket, label, chunk = await reader.read(65536) if not chunk: break - _stats.bytes_up += len(chunk) - up_bytes += len(chunk) + n = len(chunk) + _stats.bytes_up += n + up_bytes += n up_packets += 1 if splitter: parts = splitter.split(chunk) @@ -640,8 +650,9 @@ async def _bridge_ws(reader, writer, ws: RawWebSocket, label, data = await ws.recv() if data is None: break - _stats.bytes_down += len(data) - down_bytes += len(data) + n = len(data) + _stats.bytes_down += n + down_bytes += n down_packets += 1 writer.write(data) # drain only when kernel buffer is filling up @@ -687,26 +698,27 @@ async def _bridge_tcp(reader, writer, remote_reader, remote_writer, label, dc=None, dst=None, port=None, is_media=False): """Bidirectional TCP <-> TCP forwarding (for fallback).""" - async def forward(src, dst_w, tag): + async def forward(src, dst_w, is_up): try: while True: data = await src.read(65536) if not data: break - if 'up' in tag: - _stats.bytes_up += len(data) + n = len(data) + if is_up: + _stats.bytes_up += n else: - _stats.bytes_down += len(data) + _stats.bytes_down += n dst_w.write(data) await dst_w.drain() except asyncio.CancelledError: pass except Exception as e: - log.debug("[%s] %s ended: %s", label, tag, e) + log.debug("[%s] forward ended: %s", label, e) tasks = [ - asyncio.create_task(forward(reader, remote_writer, 'up')), - asyncio.create_task(forward(remote_reader, writer, 'down')), + asyncio.create_task(forward(reader, remote_writer, True)), + asyncio.create_task(forward(remote_reader, writer, False)), ] try: await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) @@ -747,8 +759,12 @@ async def _pipe(r, w): pass +_SOCKS5_REPLIES = {s: bytes([0x05, s, 0x00, 0x01, 0, 0, 0, 0, 0, 0]) + for s in (0x00, 0x05, 0x07, 0x08)} + + def _socks5_reply(status): - return bytes([0x05, status, 0x00, 0x01]) + b'\x00' * 6 + return _SOCKS5_REPLIES[status] async def _tcp_fallback(reader, writer, dst, port, init, label, From c1452c23da03f143883112ab545dea5add71723c Mon Sep 17 00:00:00 2001 From: Flowseal Date: Fri, 20 Mar 2026 22:57:15 +0300 Subject: [PATCH 2/5] Optimizations --- proxy/tg_ws_proxy.py | 54 ++++++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index 0d63920..f25738c 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -131,13 +131,19 @@ def _xor_mask(data: bytes, mask: bytes) -> bytes: return (int.from_bytes(data, 'big') ^ int.from_bytes(mask_rep, 'big')).to_bytes(n, 'big') -# Pre-compiled struct formats for WS frame building +# Pre-compiled struct formats _st_BB = struct.Struct('>BB') _st_BBH = struct.Struct('>BBH') _st_BBQ = struct.Struct('>BBQ') _st_BB4s = struct.Struct('>BB4s') _st_BBH4s = struct.Struct('>BBH4s') _st_BBQ4s = struct.Struct('>BBQ4s') +_st_H = struct.Struct('>H') +_st_Q = struct.Struct('>Q') +_st_I_net = struct.Struct('!I') +_st_Ih = struct.Struct(' Tuple[int, bytes]: hdr = await self.reader.readexactly(2) opcode = hdr[0] & 0x0F - is_masked = bool(hdr[1] & 0x80) length = hdr[1] & 0x7F if length == 126: - length = struct.unpack('>H', - await self.reader.readexactly(2))[0] + length = _st_H.unpack( + await self.reader.readexactly(2))[0] elif length == 127: - length = struct.unpack('>Q', - await self.reader.readexactly(8))[0] + length = _st_Q.unpack( + await self.reader.readexactly(8))[0] - if is_masked: + if hdr[1] & 0x80: mask_key = await self.reader.readexactly(4) payload = await self.reader.readexactly(length) return opcode, _xor_mask(payload, mask_key) @@ -363,7 +368,7 @@ def _human_bytes(n: int) -> str: def _is_telegram_ip(ip: str) -> bool: try: - n = struct.unpack('!I', _socket.inet_aton(ip))[0] + n = _st_I_net.unpack(_socket.inet_aton(ip))[0] return any(lo <= n <= hi for lo, hi in _TG_RANGES) except OSError: return False @@ -380,17 +385,14 @@ def _dc_from_init(data: bytes) -> Tuple[Optional[int], bool]: Returns (dc_id, is_media). """ try: - key = bytes(data[8:40]) - iv = bytes(data[40:56]) - cipher = Cipher(algorithms.AES(key), modes.CTR(iv)) + cipher = Cipher(algorithms.AES(data[8:40]), modes.CTR(data[40:56])) encryptor = cipher.encryptor() - keystream = encryptor.update(_ZERO_64) + encryptor.finalize() + keystream = encryptor.update(_ZERO_64) plain = (int.from_bytes(data[56:64], 'big') ^ int.from_bytes(keystream[56:64], 'big')).to_bytes(8, 'big') - proto = struct.unpack(' bytes: new_dc = struct.pack(' len(plain): + if pos + 4 > plain_len: break msg_len = ( - struct.unpack_from(' len(plain): + if msg_len == 0 or pos + msg_len > plain_len: break pos += msg_len boundaries.append(pos) @@ -832,7 +832,7 @@ async def _handle_client(reader, writer): writer.close() return - port = struct.unpack('!H', await reader.readexactly(2))[0] + port = _st_H.unpack(await reader.readexactly(2))[0] if ':' in dst: log.error( From ed85e2a284ab17bf1703a3a7e76fb52df6452faa Mon Sep 17 00:00:00 2001 From: Flowseal Date: Sat, 21 Mar 2026 09:26:34 +0300 Subject: [PATCH 3/5] keepalive for stale mitigation --- proxy/tg_ws_proxy.py | 28 ++++- stress_test.py | 246 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 269 insertions(+), 5 deletions(-) create mode 100644 stress_test.py diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index f25738c..934ce17 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -618,6 +618,7 @@ async def _bridge_ws(reader, writer, ws: RawWebSocket, label, up_packets = 0 down_packets = 0 start_time = asyncio.get_event_loop().time() + last_recv_time = start_time async def tcp_to_ws(): nonlocal up_bytes, up_packets @@ -644,31 +645,48 @@ async def _bridge_ws(reader, writer, ws: RawWebSocket, label, log.debug("[%s] tcp->ws ended: %s", label, e) async def ws_to_tcp(): - nonlocal down_bytes, down_packets + nonlocal down_bytes, down_packets, last_recv_time try: while True: data = await ws.recv() if data is None: break + last_recv_time = asyncio.get_event_loop().time() n = len(data) _stats.bytes_down += n down_bytes += n down_packets += 1 writer.write(data) - # drain only when kernel buffer is filling up - buf = writer.transport.get_write_buffer_size() - if buf > _SEND_BUF: - await writer.drain() + await writer.drain() except (asyncio.CancelledError, ConnectionError, OSError): return except Exception as e: log.debug("[%s] ws->tcp ended: %s", label, e) + async def ws_keepalive(): + try: + while not ws._closed: + await asyncio.sleep(2) + idle = asyncio.get_event_loop().time() - last_recv_time + if idle >= 2 and not ws._closed: + try: + ws.writer.write( + ws._build_frame(ws.OP_PING, b'', mask=True)) + await ws.writer.drain() + log.debug("[%s] %s WS PING (idle %.1fs)", + label, dc_tag, idle) + except Exception: + break + except asyncio.CancelledError: + pass + + ka_task = asyncio.create_task(ws_keepalive()) tasks = [asyncio.create_task(tcp_to_ws()), asyncio.create_task(ws_to_tcp())] try: await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) finally: + ka_task.cancel() for t in tasks: t.cancel() for t in tasks: diff --git a/stress_test.py b/stress_test.py new file mode 100644 index 0000000..f2fd88c --- /dev/null +++ b/stress_test.py @@ -0,0 +1,246 @@ +""" +Stress-test: сравнение OLD vs NEW реализаций горячих функций прокси. + +Тестируются: + 1. _build_frame — сборка WS-фрейма (masked binary) + 2. _build_frame — сборка WS-фрейма (unmasked) + 3. _socks5_reply — генерация SOCKS5-ответа + 4. _dc_from_init XOR-часть (bytes(a^b for …) vs int.from_bytes) + 5. mask key generation (os.urandom vs PRNG) +""" + +import gc +import os +import random +import struct +import time + +# ── Размеры данных, типичные для Telegram ────────────────────────── +SMALL = 64 # init-пакет / ack +MEDIUM = 1024 # текстовое сообщение +LARGE = 65536 # фото / голосовое + + +# ═══════════════════════════════════════════════════════════════════ +# XOR mask (не менялся — для полноты) +# ═══════════════════════════════════════════════════════════════════ + +def xor_mask(data: bytes, mask: bytes) -> bytes: + if not data: + return data + n = len(data) + mask_rep = (mask * (n // 4 + 1))[:n] + return (int.from_bytes(data, 'big') ^ int.from_bytes(mask_rep, 'big')).to_bytes(n, 'big') + + +# ═══════════════════════════════════════════════════════════════════ +# _build_frame +# ═══════════════════════════════════════════════════════════════════ + +def build_frame_old(opcode: int, data: bytes, mask: bool = False) -> bytes: + """Старая: bytearray + append/extend + os.urandom.""" + header = bytearray() + header.append(0x80 | opcode) + length = len(data) + mask_bit = 0x80 if mask else 0x00 + + if length < 126: + header.append(mask_bit | length) + elif length < 65536: + header.append(mask_bit | 126) + header.extend(struct.pack('>H', length)) + else: + header.append(mask_bit | 127) + header.extend(struct.pack('>Q', length)) + + if mask: + mask_key = os.urandom(4) + header.extend(mask_key) + return bytes(header) + xor_mask(data, mask_key) + return bytes(header) + data + + +# ── Новая: pre-compiled struct + PRNG ────────────────────────────── +_st_BB = struct.Struct('>BB') +_st_BBH = struct.Struct('>BBH') +_st_BBQ = struct.Struct('>BBQ') +_st_BB4s = struct.Struct('>BB4s') +_st_BBH4s = struct.Struct('>BBH4s') +_st_BBQ4s = struct.Struct('>BBQ4s') + +_mask_rng = random.Random(int.from_bytes(os.urandom(16), 'big')) +_mask_pack = struct.Struct('>I').pack + +def _random_mask_key() -> bytes: + return _mask_pack(_mask_rng.getrandbits(32)) + +def build_frame_new(opcode: int, data: bytes, mask: bool = False) -> bytes: + """Новая: struct.pack + PRNG mask.""" + length = len(data) + fb = 0x80 | opcode + + if not mask: + if length < 126: + return _st_BB.pack(fb, length) + data + if length < 65536: + return _st_BBH.pack(fb, 126, length) + data + return _st_BBQ.pack(fb, 127, length) + data + + mask_key = _random_mask_key() + masked = xor_mask(data, mask_key) + if length < 126: + return _st_BB4s.pack(fb, 0x80 | length, mask_key) + masked + if length < 65536: + return _st_BBH4s.pack(fb, 0x80 | 126, length, mask_key) + masked + return _st_BBQ4s.pack(fb, 0x80 | 127, length, mask_key) + masked + + +# ═══════════════════════════════════════════════════════════════════ +# _socks5_reply +# ═══════════════════════════════════════════════════════════════════ + +def socks5_reply_old(status): + return bytes([0x05, status, 0x00, 0x01]) + b'\x00' * 6 + +_SOCKS5_REPLIES = {s: bytes([0x05, s, 0x00, 0x01, 0, 0, 0, 0, 0, 0]) + for s in (0x00, 0x05, 0x07, 0x08)} + +def socks5_reply_new(status): + return _SOCKS5_REPLIES[status] + + +# ═══════════════════════════════════════════════════════════════════ +# dc_from_init XOR (8 байт keystream ^ data) +# ═══════════════════════════════════════════════════════════════════ + +def dc_xor_old(data8: bytes, ks8: bytes) -> bytes: + """Старая: генераторное выражение.""" + return bytes(a ^ b for a, b in zip(data8, ks8)) + +def dc_xor_new(data8: bytes, ks8: bytes) -> bytes: + """Новая: int.from_bytes.""" + return (int.from_bytes(data8, 'big') ^ int.from_bytes(ks8, 'big')).to_bytes(8, 'big') + + +# ═══════════════════════════════════════════════════════════════════ +# mask key: os.urandom(4) vs PRNG +# ═══════════════════════════════════════════════════════════════════ + +def mask_key_old() -> bytes: + return os.urandom(4) + +def mask_key_new() -> bytes: + return _random_mask_key() + + +# ═══════════════════════════════════════════════════════════════════ +# Бенчмарк +# ═══════════════════════════════════════════════════════════════════ + +def bench(func, args_list: list, iters: int) -> float: + gc.collect() + for i in range(min(100, iters)): + func(*args_list[i % len(args_list)]) + start = time.perf_counter() + for i in range(iters): + func(*args_list[i % len(args_list)]) + elapsed = time.perf_counter() - start + return elapsed / iters * 1_000_000 # мкс + + +def compare(name: str, old_fn, new_fn, args_list: list, iters: int): + t_old = bench(old_fn, args_list, iters) + t_new = bench(new_fn, args_list, iters) + speedup = t_old / t_new if t_new > 0 else float('inf') + marker = '✅' if speedup >= 1.0 else '⚠️' + print(f" {name:.<42s} OLD {t_old:8.3f} мкс | NEW {t_new:8.3f} мкс | {speedup:5.2f}x {marker}") + + +# ═══════════════════════════════════════════════════════════════════ + +def main(): + print("=" * 74) + print(" Stress Test: OLD vs NEW (горячие функции tg_ws_proxy)") + print("=" * 74) + + N = 500_000 + + # # ── 1. _build_frame masked ──────────────────────────────────── + # print(f"\n── _build_frame masked ({N:,} итераций) ──") + # for size, label in [(SMALL, "64B"), (MEDIUM, "1KB"), (LARGE, "64KB")]: + # data_list = [(0x2, os.urandom(size), True) for _ in range(1000)] + # compare(f"build_frame masked {label}", + # build_frame_old, build_frame_new, data_list, N) + + # # ── 2. _build_frame unmasked ────────────────────────────────── + # print(f"\n── _build_frame unmasked ({N:,} итераций) ──") + # for size, label in [(SMALL, "64B"), (MEDIUM, "1KB"), (LARGE, "64KB")]: + # data_list = [(0x2, os.urandom(size), False) for _ in range(1000)] + # compare(f"build_frame unmasked {label}", + # build_frame_old, build_frame_new, data_list, N) + + # # ── 3. mask key generation ──────────────────────────────────── + # print(f"\n── mask key: os.urandom(4) vs PRNG ({N:,} итераций) ──") + # compare("mask_key", mask_key_old, mask_key_new, [()] * 100, N) + + # # ── 4. _socks5_reply ───────────────────────────────────────── + N2 = 2_000_000 + # print(f"\n── _socks5_reply ({N2:,} итераций) ──") + # compare("socks5_reply", socks5_reply_old, socks5_reply_new, + # [(s,) for s in (0x00, 0x05, 0x07, 0x08)], N2) + + # # ── 5. dc_from_init XOR (8 bytes) ──────────────────────────── + # print(f"\n── dc_xor 8B: generator vs int.from_bytes ({N2:,} итераций) ──") + # compare("dc_xor_8B", dc_xor_old, dc_xor_new, + # [(os.urandom(8), os.urandom(8)) for _ in range(1000)], N2) + + # ── 6. _read_frame struct.unpack vs pre-compiled ───────────── + print(f"\n── struct unpack read-path ({N2:,} итераций) ──") + _st_H_pre = struct.Struct('>H') + _st_Q_pre = struct.Struct('>Q') + h_bufs = [(os.urandom(2),) for _ in range(1000)] + q_bufs = [(os.urandom(8),) for _ in range(1000)] + compare("unpack >H", + lambda b: struct.unpack('>H', b), + lambda b: _st_H_pre.unpack(b), + h_bufs, N2) + compare("unpack >Q", + lambda b: struct.unpack('>Q', b), + lambda b: _st_Q_pre.unpack(b), + q_bufs, N2) + + # ── 7. dc_from_init: 2x unpack vs 1x merged ───────────────── + print(f"\n── dc_from_init unpack: 2 calls vs 1 merged ({N2:,} итераций) ──") + _st_Ih = struct.Struct(' Date: Sun, 22 Mar 2026 02:54:03 +0300 Subject: [PATCH 4/5] logrotate #366; configurable pool and buffer sizes --- linux.py | 106 +++++++++++++++++++++++++++---------------- macos.py | 46 +++++++++++++++++-- proxy/tg_ws_proxy.py | 45 +++++++++++++++--- windows.py | 60 ++++++++++++++++++++++-- 4 files changed, 204 insertions(+), 53 deletions(-) diff --git a/linux.py b/linux.py index 9b2ff9e..664c948 100644 --- a/linux.py +++ b/linux.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio as _asyncio import json import logging +import logging.handlers import os import subprocess import sys @@ -32,6 +33,9 @@ DEFAULT_CONFIG = { "host": "127.0.0.1", "dc_ip": ["2:149.154.167.220", "4:149.154.167.220"], "verbose": False, + "log_max_mb": 5, + "buf_kb": 256, + "pool_size": 4, } @@ -149,12 +153,17 @@ def save_config(cfg: dict): json.dump(cfg, f, indent=2, ensure_ascii=False) -def setup_logging(verbose: bool = False): +def setup_logging(verbose: bool = False, log_max_mb: float = 5): _ensure_dirs() root = logging.getLogger() root.setLevel(logging.DEBUG if verbose else logging.INFO) - fh = logging.FileHandler(str(LOG_FILE), encoding="utf-8") + fh = logging.handlers.RotatingFileHandler( + str(LOG_FILE), + maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024), + backupCount=0, + encoding='utf-8', + ) fh.setLevel(logging.DEBUG) fh.setFormatter( logging.Formatter( @@ -261,6 +270,13 @@ def start_proxy(): return log.info("Starting proxy on %s:%d ...", host, port) + + buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"]) + pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"]) + tg_ws_proxy._RECV_BUF = max(4, buf_kb) * 1024 + tg_ws_proxy._SEND_BUF = tg_ws_proxy._RECV_BUF + tg_ws_proxy._WS_POOL_SIZE = max(0, pool_size) + _proxy_thread = threading.Thread( target=_run_proxy_thread, args=(port, dc_opt, verbose, host), @@ -363,7 +379,7 @@ def _edit_config_dialog(): TEXT_SECONDARY = "#707579" FONT_FAMILY = "Sans" - w, h = 420, 480 + w, h = 420, 540 sw = root.winfo_screenwidth() sh = root.winfo_screenheight() root.geometry(f"{w}x{h}+{(sw - w) // 2}+{(sh - h) // 2}") @@ -455,14 +471,29 @@ def _edit_config_dialog(): border_color=FIELD_BORDER, ).pack(anchor="w", pady=(0, 8)) - # Info label - ctk.CTkLabel( - frame, - text="Изменения вступят в силу после перезапуска прокси.", - font=(FONT_FAMILY, 11), - text_color=TEXT_SECONDARY, - anchor="w", - ).pack(anchor="w", pady=(0, 16)) + # Advanced: buf_kb, pool_size, log_max_mb + adv_frame = ctk.CTkFrame(frame, fg_color="transparent") + adv_frame.pack(anchor="w", fill="x", pady=(4, 8)) + + for col, (lbl, key, w_) in enumerate([ + ("Буфер (KB, 256 default)", "buf_kb", 120), + ("WS пулов (4 default)", "pool_size", 120), + ("Log size (MB, 5 def)", "log_max_mb", 120), + ]): + col_frame = ctk.CTkFrame(adv_frame, fg_color="transparent") + col_frame.pack(side="left", padx=(0, 10)) + ctk.CTkLabel(col_frame, text=lbl, font=(FONT_FAMILY, 11), + text_color=TEXT_SECONDARY, anchor="w").pack(anchor="w") + ctk.CTkEntry(col_frame, width=w_, height=30, font=(FONT_FAMILY, 12), + corner_radius=8, fg_color=FIELD_BG, + border_color=FIELD_BORDER, border_width=1, + text_color=TEXT_PRIMARY, + textvariable=ctk.StringVar( + value=str(cfg.get(key, DEFAULT_CONFIG[key])) + )).pack(anchor="w") + + _adv_entries = list(adv_frame.winfo_children()) + _adv_keys = ["buf_kb", "pool_size", "log_max_mb"] def on_save(): import socket as _sock @@ -499,6 +530,17 @@ def _edit_config_dialog(): "dc_ip": lines, "verbose": verbose_var.get(), } + + for i, key in enumerate(_adv_keys): + col_frame = _adv_entries[i] + entry = col_frame.winfo_children()[1] + try: + val = float(entry.get().strip()) + if key in ("buf_kb", "pool_size"): + val = int(val) + new_cfg[key] = val + except ValueError: + new_cfg[key] = DEFAULT_CONFIG[key] save_config(new_cfg) _config.update(new_cfg) log.info("Config saved: %s", new_cfg) @@ -521,33 +563,18 @@ def _edit_config_dialog(): root.destroy() btn_frame = ctk.CTkFrame(frame, fg_color="transparent") - btn_frame.pack(fill="x") - ctk.CTkButton( - btn_frame, - text="Сохранить", - width=140, - height=38, - font=(FONT_FAMILY, 14, "bold"), - corner_radius=10, - fg_color=TG_BLUE, - hover_color=TG_BLUE_HOVER, - text_color="#ffffff", - command=on_save, - ).pack(side="left", padx=(0, 10)) - ctk.CTkButton( - btn_frame, - text="Отмена", - width=140, - height=38, - font=(FONT_FAMILY, 14), - corner_radius=10, - fg_color=FIELD_BG, - hover_color=FIELD_BORDER, - text_color=TEXT_PRIMARY, - border_width=1, - border_color=FIELD_BORDER, - command=on_cancel, - ).pack(side="left") + btn_frame.pack(fill="x", pady=(20, 0)) + ctk.CTkButton(btn_frame, text="Сохранить", height=38, + font=(FONT_FAMILY, 14, "bold"), corner_radius=10, + fg_color=TG_BLUE, hover_color=TG_BLUE_HOVER, + text_color="#ffffff", + command=on_save).pack(side="left", fill="x", expand=True, padx=(0, 8)) + ctk.CTkButton(btn_frame, text="Отмена", height=38, + font=(FONT_FAMILY, 14), corner_radius=10, + fg_color=FIELD_BG, hover_color=FIELD_BORDER, + text_color=TEXT_PRIMARY, border_width=1, + border_color=FIELD_BORDER, + command=on_cancel).pack(side="right", fill="x", expand=True) root.mainloop() @@ -798,7 +825,8 @@ def run_tray(): except Exception: pass - setup_logging(_config.get("verbose", False)) + setup_logging(_config.get("verbose", False), + log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"])) log.info("TG WS Proxy tray app starting") log.info("Config: %s", _config) log.info("Log file: %s", LOG_FILE) diff --git a/macos.py b/macos.py index e1806cf..8241b38 100644 --- a/macos.py +++ b/macos.py @@ -2,6 +2,7 @@ from __future__ import annotations import json import logging +import logging.handlers import os import psutil import subprocess @@ -43,6 +44,9 @@ DEFAULT_CONFIG = { "host": "127.0.0.1", "dc_ip": ["2:149.154.167.220", "4:149.154.167.220"], "verbose": False, + "log_max_mb": 5, + "buf_kb": 256, + "pool_size": 4, } _proxy_thread: Optional[threading.Thread] = None @@ -153,12 +157,17 @@ def save_config(cfg: dict): json.dump(cfg, f, indent=2, ensure_ascii=False) -def setup_logging(verbose: bool = False): +def setup_logging(verbose: bool = False, log_max_mb: float = 5): _ensure_dirs() root = logging.getLogger() root.setLevel(logging.DEBUG if verbose else logging.INFO) - fh = logging.FileHandler(str(LOG_FILE), encoding="utf-8") + fh = logging.handlers.RotatingFileHandler( + str(LOG_FILE), + maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024), + backupCount=0, + encoding='utf-8', + ) fh.setLevel(logging.DEBUG) fh.setFormatter(logging.Formatter( "%(asctime)s %(levelname)-5s %(name)s %(message)s", @@ -290,6 +299,13 @@ def start_proxy(): return log.info("Starting proxy on %s:%d ...", host, port) + + buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"]) + pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"]) + tg_ws_proxy._RECV_BUF = max(4, buf_kb) * 1024 + tg_ws_proxy._SEND_BUF = tg_ws_proxy._RECV_BUF + tg_ws_proxy._WS_POOL_SIZE = max(0, pool_size) + _proxy_thread = threading.Thread( target=_run_proxy_thread, args=(port, dc_opt, verbose, host), @@ -438,11 +454,34 @@ def _edit_config_dialog(): # Verbose verbose = _ask_yes_no("Включить подробное логирование (verbose)?") + # Advanced settings + adv_str = _osascript_input( + "Расширенные настройки (буфер KB, WS пул, лог MB):\n" + "Формат: buf_kb,pool_size,log_max_mb", + f"{cfg.get('buf_kb', DEFAULT_CONFIG['buf_kb'])}," + f"{cfg.get('pool_size', DEFAULT_CONFIG['pool_size'])}," + f"{cfg.get('log_max_mb', DEFAULT_CONFIG['log_max_mb'])}") + + adv = {} + if adv_str: + parts = [s.strip() for s in adv_str.split(',')] + keys = [("buf_kb", int), ("pool_size", int), + ("log_max_mb", float)] + for i, (k, typ) in enumerate(keys): + if i < len(parts): + try: + adv[k] = typ(parts[i]) + except ValueError: + pass + new_cfg = { "host": host, "port": port, "dc_ip": dc_lines, "verbose": verbose, + "buf_kb": adv.get("buf_kb", cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])), + "pool_size": adv.get("pool_size", cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])), + "log_max_mb": adv.get("log_max_mb", cfg.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"])), } save_config(new_cfg) log.info("Config saved: %s", new_cfg) @@ -581,7 +620,8 @@ def run_menubar(): except Exception: pass - setup_logging(_config.get("verbose", False)) + setup_logging(_config.get("verbose", False), + log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"])) log.info("TG WS Proxy menubar app starting") log.info("Config: %s", _config) log.info("Log file: %s", LOG_FILE) diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index 934ce17..0c5427b 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -4,6 +4,7 @@ import argparse import asyncio import base64 import logging +import logging.handlers import os import socket as _socket import ssl @@ -144,6 +145,7 @@ _st_I_net = struct.Struct('!I') _st_Ih = struct.Struct('= 2 and not ws._closed: try: - ws.writer.write( - ws._build_frame(ws.OP_PING, b'', mask=True)) + ws.writer.write(_WS_PING_FRAME) await ws.writer.drain() log.debug("[%s] %s WS PING (idle %.1fs)", label, dc_tag, idle) @@ -1156,6 +1158,16 @@ def main(): ' --dc-ip 2:149.154.167.220') ap.add_argument('-v', '--verbose', action='store_true', help='Debug logging') + ap.add_argument('--log-file', type=str, default=None, metavar='PATH', + help='Log to file with rotation (default: stderr only)') + ap.add_argument('--log-max-mb', type=float, default=5, metavar='MB', + help='Max log file size in MB before rotation (default 5)') + ap.add_argument('--log-backups', type=int, default=0, metavar='N', + help='Number of rotated log files to keep (default 0)') + ap.add_argument('--buf-kb', type=int, default=256, metavar='KB', + help='Socket send/recv buffer size in KB (default 256)') + ap.add_argument('--pool-size', type=int, default=4, metavar='N', + help='WS connection pool size per DC (default 4, min 0)') args = ap.parse_args() if not args.dc_ip: @@ -1167,11 +1179,30 @@ def main(): log.error(str(e)) sys.exit(1) - logging.basicConfig( - level=logging.DEBUG if args.verbose else logging.INFO, - format='%(asctime)s %(levelname)-5s %(message)s', - datefmt='%H:%M:%S', - ) + log_level = logging.DEBUG if args.verbose else logging.INFO + log_fmt = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s', + datefmt='%H:%M:%S') + root = logging.getLogger() + root.setLevel(log_level) + + console = logging.StreamHandler() + console.setFormatter(log_fmt) + root.addHandler(console) + + if args.log_file: + fh = logging.handlers.RotatingFileHandler( + args.log_file, + maxBytes=max(32 * 1024, args.log_max_mb * 1024 * 1024), + backupCount=max(0, args.log_backups), + encoding='utf-8', + ) + fh.setFormatter(log_fmt) + root.addHandler(fh) + + global _RECV_BUF, _SEND_BUF, _WS_POOL_SIZE + _RECV_BUF = max(4, args.buf_kb) * 1024 + _SEND_BUF = _RECV_BUF + _WS_POOL_SIZE = max(0, args.pool_size) try: asyncio.run(_run(args.port, dc_opt, host=args.host)) diff --git a/windows.py b/windows.py index 568fce0..6eaad3f 100644 --- a/windows.py +++ b/windows.py @@ -3,6 +3,7 @@ from __future__ import annotations import ctypes import json import logging +import logging.handlers import os import winreg import psutil @@ -38,6 +39,9 @@ DEFAULT_CONFIG = { "dc_ip": ["2:149.154.167.220", "4:149.154.167.220"], "verbose": False, "autostart": False, + "log_max_mb": 5, + "buf_kb": 256, + "pool_size": 4, } @@ -148,12 +152,17 @@ def save_config(cfg: dict): json.dump(cfg, f, indent=2, ensure_ascii=False) -def setup_logging(verbose: bool = False): +def setup_logging(verbose: bool = False, log_max_mb: float = 5): _ensure_dirs() root = logging.getLogger() root.setLevel(logging.DEBUG if verbose else logging.INFO) - fh = logging.FileHandler(str(LOG_FILE), encoding="utf-8") + fh = logging.handlers.RotatingFileHandler( + str(LOG_FILE), + maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024), + backupCount=0, + encoding='utf-8', + ) fh.setLevel(logging.DEBUG) fh.setFormatter(logging.Formatter( "%(asctime)s %(levelname)-5s %(name)s %(message)s", @@ -301,6 +310,13 @@ def start_proxy(): return log.info("Starting proxy on %s:%d ...", host, port) + + buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"]) + pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"]) + tg_ws_proxy._RECV_BUF = max(4, buf_kb) * 1024 + tg_ws_proxy._SEND_BUF = tg_ws_proxy._RECV_BUF + tg_ws_proxy._WS_POOL_SIZE = max(0, pool_size) + _proxy_thread = threading.Thread( target=_run_proxy_thread, args=(port, dc_opt, verbose, host), @@ -395,7 +411,7 @@ def _edit_config_dialog(): TEXT_SECONDARY = "#707579" FONT_FAMILY = "Segoe UI" - w, h = 420, 460 + w, h = 420, 540 if _supports_autostart(): h += 70 @@ -450,6 +466,30 @@ def _edit_config_dialog(): corner_radius=6, border_width=2, border_color=FIELD_BORDER).pack(anchor="w", pady=(0, 8)) + # Advanced: buf_kb, pool_size, log_max_mb + adv_frame = ctk.CTkFrame(frame, fg_color="transparent") + adv_frame.pack(anchor="w", fill="x", pady=(4, 8)) + + for col, (lbl, key, w_) in enumerate([ + ("Буфер (KB, 256 default)", "buf_kb", 120), + ("WS пулов (4 default)", "pool_size", 120), + ("Log size (MB, 5 def)", "log_max_mb", 120), + ]): + col_frame = ctk.CTkFrame(adv_frame, fg_color="transparent") + col_frame.pack(side="left", padx=(0, 10)) + ctk.CTkLabel(col_frame, text=lbl, font=(FONT_FAMILY, 11), + text_color=TEXT_SECONDARY, anchor="w").pack(anchor="w") + ctk.CTkEntry(col_frame, width=w_, height=30, font=(FONT_FAMILY, 12), + corner_radius=8, fg_color=FIELD_BG, + border_color=FIELD_BORDER, border_width=1, + text_color=TEXT_PRIMARY, + textvariable=ctk.StringVar( + value=str(cfg.get(key, DEFAULT_CONFIG[key])) + )).pack(anchor="w") + + _adv_entries = list(adv_frame.winfo_children()) + _adv_keys = ["buf_kb", "pool_size", "log_max_mb"] + autostart_var = None if _supports_autostart(): autostart_var = ctk.BooleanVar(value=cfg["autostart"]) @@ -495,6 +535,17 @@ def _edit_config_dialog(): "verbose": verbose_var.get(), "autostart": (autostart_var.get() if autostart_var is not None else False), } + + for i, key in enumerate(_adv_keys): + col_frame = _adv_entries[i] + entry = col_frame.winfo_children()[1] + try: + val = float(entry.get().strip()) + if key in ("buf_kb", "pool_size"): + val = int(val) + new_cfg[key] = val + except ValueError: + new_cfg[key] = DEFAULT_CONFIG[key] save_config(new_cfg) _config.update(new_cfg) log.info("Config saved: %s", new_cfg) @@ -740,7 +791,8 @@ def run_tray(): except Exception: pass - setup_logging(_config.get("verbose", False)) + setup_logging(_config.get("verbose", False), + log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"])) log.info("TG WS Proxy tray app starting") log.info("Config: %s", _config) log.info("Log file: %s", LOG_FILE) From afb7c5f56d9ac3195214fa56423daf58a377792a Mon Sep 17 00:00:00 2001 From: Flowseal Date: Sun, 22 Mar 2026 08:00:14 +0300 Subject: [PATCH 5/5] revert keepalive mechanism --- proxy/tg_ws_proxy.py | 23 +---------------------- 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index 0c5427b..b6e5539 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -145,7 +145,6 @@ _st_I_net = struct.Struct('!I') _st_Ih = struct.Struct('ws ended: %s", label, e) async def ws_to_tcp(): - nonlocal down_bytes, down_packets, last_recv_time + nonlocal down_bytes, down_packets try: while True: data = await ws.recv() if data is None: break - last_recv_time = asyncio.get_event_loop().time() n = len(data) _stats.bytes_down += n down_bytes += n @@ -666,29 +663,11 @@ async def _bridge_ws(reader, writer, ws: RawWebSocket, label, except Exception as e: log.debug("[%s] ws->tcp ended: %s", label, e) - async def ws_keepalive(): - try: - while not ws._closed: - await asyncio.sleep(2) - idle = asyncio.get_event_loop().time() - last_recv_time - if idle >= 2 and not ws._closed: - try: - ws.writer.write(_WS_PING_FRAME) - await ws.writer.drain() - log.debug("[%s] %s WS PING (idle %.1fs)", - label, dc_tag, idle) - except Exception: - break - except asyncio.CancelledError: - pass - - ka_task = asyncio.create_task(ws_keepalive()) tasks = [asyncio.create_task(tcp_to_ws()), asyncio.create_task(ws_to_tcp())] try: await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) finally: - ka_task.cancel() for t in tasks: t.cancel() for t in tasks: