From fed772049b5ae4566983fcbf22ad20d674311145 Mon Sep 17 00:00:00 2001 From: Flowseal Date: Tue, 23 Jun 2026 14:55:25 +0300 Subject: [PATCH] session close reason --- proxy/bridge.py | 23 +++++++++++++++++------ proxy/raw_websocket.py | 25 +++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/proxy/bridge.py b/proxy/bridge.py index d66c3db..7452df6 100644 --- a/proxy/bridge.py +++ b/proxy/bridge.py @@ -282,9 +282,10 @@ async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label, up_packets = 0 down_packets = 0 start_time = asyncio.get_running_loop().time() + close_reason = 'normal' async def tcp_to_ws(): - nonlocal up_bytes, up_packets + nonlocal up_bytes, up_packets, close_reason try: while True: chunk = await reader.read(65536) @@ -310,17 +311,22 @@ async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label, await ws.send(parts[0]) else: await ws.send(chunk) - except (asyncio.CancelledError, ConnectionError, OSError): + except asyncio.CancelledError: return + except (ConnectionError, OSError) as e: + close_reason = f"client: {type(e).__name__}" except Exception as e: + close_reason = f"client: {type(e).__name__}: {e}" log.debug("[%s] tcp->ws ended: %s", label, e) async def ws_to_tcp(): - nonlocal down_bytes, down_packets + nonlocal down_bytes, down_packets, close_reason try: while True: data = await ws.recv() if data is None: + if close_reason == 'normal': + close_reason = 'upstream: ws_close' break n = len(data) stats.bytes_down += n @@ -330,9 +336,14 @@ async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label, data = ctx.clt_enc.update(plain) writer.write(data) await writer.drain() - except (asyncio.CancelledError, ConnectionError, OSError): + except asyncio.CancelledError: return + except (ConnectionError, OSError) as e: + close_reason = f"upstream: {type(e).__name__}" + except asyncio.IncompleteReadError: + close_reason = 'upstream: tcp_reset' except Exception as e: + close_reason = f"upstream: {type(e).__name__}: {e}" log.debug("[%s] ws->tcp ended: %s", label, e) tasks = [asyncio.create_task(tcp_to_ws()), @@ -348,9 +359,9 @@ async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label, except BaseException: pass elapsed = asyncio.get_running_loop().time() - start_time - log.info("[%s] %s WS session closed: " + log.info("[%s] %s WS session closed (%s): " "^%s (%d pkts) v%s (%d pkts) in %.1fs", - label, dc_tag, + label, dc_tag, close_reason, human_bytes(up_bytes), up_packets, human_bytes(down_bytes), down_packets, elapsed) diff --git a/proxy/raw_websocket.py b/proxy/raw_websocket.py index 30d07e9..7dc9aa3 100644 --- a/proxy/raw_websocket.py +++ b/proxy/raw_websocket.py @@ -1,5 +1,6 @@ import os import ssl +import logging import base64 import struct import asyncio @@ -8,6 +9,8 @@ import socket as _socket from typing import List, Optional, Tuple from .config import proxy_config +log = logging.getLogger('tg-mtproto-proxy') + _st_BB = struct.Struct('>BB') _st_BBH = struct.Struct('>BBH') @@ -160,6 +163,9 @@ class RawWebSocket: if opcode == self.OP_CLOSE: self._closed = True + code, reason = self._parse_close(payload) + log.debug("WS OP_CLOSE from upstream: code=%s reason=%r", + code, reason) try: self.writer.write(self._build_frame( self.OP_CLOSE, @@ -202,6 +208,25 @@ class RawWebSocket: except Exception: pass + _WS_CLOSE_REASONS = { + 1000: 'normal', 1001: 'going_away', 1002: 'protocol_error', + 1003: 'unsupported_data', 1006: 'abnormal', 1007: 'bad_data', + 1008: 'policy_violation', 1009: 'too_big', 1010: 'missing_extension', + 1011: 'internal_error', + } + + @classmethod + def _parse_close(cls, payload: Optional[bytes]) -> Tuple[Optional[int], str]: + if not payload or len(payload) < 2: + return None, '' + try: + code = int.from_bytes(payload[:2], 'big') + text = payload[2:].decode('utf-8', errors='replace') + name = cls._WS_CLOSE_REASONS.get(code) + return code, f"{text} ({name})" if name else text + except Exception: + return None, '' + @staticmethod def _build_frame(opcode: int, data: bytes, mask: bool = False) -> bytes: