session close reason

This commit is contained in:
Flowseal
2026-06-23 14:55:25 +03:00
parent 91d39a5ebe
commit fed772049b
2 changed files with 42 additions and 6 deletions
+17 -6
View File
@@ -282,9 +282,10 @@ async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label,
up_packets = 0
down_packets = 0
start_time = asyncio.get_running_loop().time()
close_reason = 'normal'
async def tcp_to_ws():
nonlocal up_bytes, up_packets
nonlocal up_bytes, up_packets, close_reason
try:
while True:
chunk = await reader.read(65536)
@@ -310,17 +311,22 @@ async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label,
await ws.send(parts[0])
else:
await ws.send(chunk)
except (asyncio.CancelledError, ConnectionError, OSError):
except asyncio.CancelledError:
return
except (ConnectionError, OSError) as e:
close_reason = f"client: {type(e).__name__}"
except Exception as e:
close_reason = f"client: {type(e).__name__}: {e}"
log.debug("[%s] tcp->ws ended: %s", label, e)
async def ws_to_tcp():
nonlocal down_bytes, down_packets
nonlocal down_bytes, down_packets, close_reason
try:
while True:
data = await ws.recv()
if data is None:
if close_reason == 'normal':
close_reason = 'upstream: ws_close'
break
n = len(data)
stats.bytes_down += n
@@ -330,9 +336,14 @@ async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label,
data = ctx.clt_enc.update(plain)
writer.write(data)
await writer.drain()
except (asyncio.CancelledError, ConnectionError, OSError):
except asyncio.CancelledError:
return
except (ConnectionError, OSError) as e:
close_reason = f"upstream: {type(e).__name__}"
except asyncio.IncompleteReadError:
close_reason = 'upstream: tcp_reset'
except Exception as e:
close_reason = f"upstream: {type(e).__name__}: {e}"
log.debug("[%s] ws->tcp ended: %s", label, e)
tasks = [asyncio.create_task(tcp_to_ws()),
@@ -348,9 +359,9 @@ async def bridge_ws_reencrypt(reader, writer, ws: RawWebSocket, label,
except BaseException:
pass
elapsed = asyncio.get_running_loop().time() - start_time
log.info("[%s] %s WS session closed: "
log.info("[%s] %s WS session closed (%s): "
"^%s (%d pkts) v%s (%d pkts) in %.1fs",
label, dc_tag,
label, dc_tag, close_reason,
human_bytes(up_bytes), up_packets,
human_bytes(down_bytes), down_packets,
elapsed)
+25
View File
@@ -1,5 +1,6 @@
import os
import ssl
import logging
import base64
import struct
import asyncio
@@ -8,6 +9,8 @@ import socket as _socket
from typing import List, Optional, Tuple
from .config import proxy_config
log = logging.getLogger('tg-mtproto-proxy')
_st_BB = struct.Struct('>BB')
_st_BBH = struct.Struct('>BBH')
@@ -160,6 +163,9 @@ class RawWebSocket:
if opcode == self.OP_CLOSE:
self._closed = True
code, reason = self._parse_close(payload)
log.debug("WS OP_CLOSE from upstream: code=%s reason=%r",
code, reason)
try:
self.writer.write(self._build_frame(
self.OP_CLOSE,
@@ -202,6 +208,25 @@ class RawWebSocket:
except Exception:
pass
_WS_CLOSE_REASONS = {
1000: 'normal', 1001: 'going_away', 1002: 'protocol_error',
1003: 'unsupported_data', 1006: 'abnormal', 1007: 'bad_data',
1008: 'policy_violation', 1009: 'too_big', 1010: 'missing_extension',
1011: 'internal_error',
}
@classmethod
def _parse_close(cls, payload: Optional[bytes]) -> Tuple[Optional[int], str]:
if not payload or len(payload) < 2:
return None, ''
try:
code = int.from_bytes(payload[:2], 'big')
text = payload[2:].decode('utf-8', errors='replace')
name = cls._WS_CLOSE_REASONS.get(code)
return code, f"{text} ({name})" if name else text
except Exception:
return None, ''
@staticmethod
def _build_frame(opcode: int, data: bytes,
mask: bool = False) -> bytes: