From c4a044542cb29854155cf5e355ca695ca1b744f1 Mon Sep 17 00:00:00 2001 From: Flowseal Date: Sun, 29 Mar 2026 15:21:45 +0300 Subject: [PATCH] fixes --- proxy/tg_ws_proxy.py | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index 0346699..dc245dd 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -484,6 +484,7 @@ def _ws_domains(dc: int, is_media) -> List[str]: class Stats: def __init__(self): self.connections_total = 0 + self.connections_active = 0 self.connections_ws = 0 self.connections_tcp_fallback = 0 self.connections_bad = 0 @@ -497,7 +498,9 @@ class Stats: pool_total = self.pool_hits + self.pool_misses pool_s = (f"{self.pool_hits}/{pool_total}" if pool_total else "n/a") - return (f"total={self.connections_total} ws={self.connections_ws} " + return (f"total={self.connections_total} " + f"active={self.connections_active} " + f"ws={self.connections_ws} " f"tcp_fb={self.connections_tcp_fallback} " f"bad={self.connections_bad} " f"err={self.ws_errors} " @@ -528,7 +531,8 @@ class _WsPool: while bucket: ws, created = bucket.popleft() age = now - created - if age > self.WS_POOL_MAX_AGE or ws._closed: + if (age > self.WS_POOL_MAX_AGE or ws._closed + or ws.writer.transport.is_closing()): asyncio.create_task(self._quiet_close(ws)) continue _stats.pool_hits += 1 @@ -618,7 +622,7 @@ async def _bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label, down_bytes = 0 up_packets = 0 down_packets = 0 - start_time = asyncio.get_event_loop().time() + start_time = asyncio.get_running_loop().time() async def tcp_to_ws(): nonlocal up_bytes, up_packets @@ -684,7 +688,7 @@ async def _bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label, await t except BaseException: pass - elapsed = asyncio.get_event_loop().time() - start_time + elapsed = asyncio.get_running_loop().time() - start_time log.info("[%s] %s WS session closed: " "^%s (%d pkts) v%s (%d pkts) in %.1fs", label, dc_tag, @@ -782,6 +786,7 @@ def _fallback_ip(dc: int) -> Optional[str]: async def _handle_client(reader, writer, secret: bytes): _stats.connections_total += 1 + _stats.connections_active += 1 peer = writer.get_extra_info('peername') label = f"{peer[0]}:{peer[1]}" if peer else "?" @@ -869,10 +874,12 @@ async def _handle_client(reader, writer, secret: bytes): if dc not in proxy_config.dc_redirects or dc_key in ws_blacklist: fallback_dst = _fallback_ip(dc) if fallback_dst: - log.info("[%s] DC%d not in config -> TCP fallback %s:443" - if dc not in proxy_config.dc_redirects else - "[%s] DC%d%s WS blacklisted -> TCP fallback %s:443", - label, dc, fallback_dst) + if dc not in proxy_config.dc_redirects: + log.info("[%s] DC%d not in config -> TCP fallback %s:443", + label, dc, fallback_dst) + else: + log.info("[%s] DC%d%s WS blacklisted -> TCP fallback %s:443", + label, dc, media_tag, fallback_dst) await _tcp_fallback(reader, writer, fallback_dst, 443, relay_init, label, dc=dc, is_media=is_media, @@ -991,8 +998,9 @@ async def _handle_client(reader, writer, secret: bytes): else: log.error("[%s] unexpected OS error: %s", label, exc) except Exception as exc: - log.error("[%s] unexpected: %s", label, exc.with_traceback()) + log.error("[%s] unexpected: %s", label, exc, exc_info=True) finally: + _stats.connections_active -= 1 try: writer.close() except BaseException: @@ -1007,9 +1015,10 @@ async def _run(stop_event: Optional[asyncio.Event] = None): global _server_instance, _server_stop_event _server_stop_event = stop_event - print(proxy_config.secret) + secret_bytes = bytes.fromhex(proxy_config.secret) + def client_cb(r, w): - asyncio.create_task(_handle_client(r, w, bytes.fromhex(proxy_config.secret))) + asyncio.create_task(_handle_client(r, w, secret_bytes)) server = await asyncio.start_server(client_cb, proxy_config.host, proxy_config.port) _server_instance = server @@ -1155,19 +1164,19 @@ def main(): log.error("Secret must be exactly 32 hex characters") sys.exit(1) try: - secret = bytes.fromhex(secret_hex) + bytes.fromhex(secret_hex) except ValueError: log.error("Secret must be valid hex") sys.exit(1) else: - secret = os.urandom(16).hex() - log.info("Generated secret: %s", secret.hex()) + secret_hex = os.urandom(16).hex() + log.info("Generated secret: %s", secret_hex) global proxy_config proxy_config = ProxyConfig( port=args.port, host=args.host, - secret=secret, + secret=secret_hex, dc_redirects=dc_redirects, buffer_size=max(4, args.buf_kb) * 1024, pool_size=max(0, args.pool_size) @@ -1194,7 +1203,7 @@ def main(): root.addHandler(fh) try: - asyncio.run(_run(args.port, dc_redirects, secret, host=args.host)) + asyncio.run(_run()) except KeyboardInterrupt: log.info("Shutting down. Final stats: %s", _stats.summary())