mirror of
https://github.com/Flowseal/tg-ws-proxy.git
synced 2026-05-22 23:41:44 +03:00
keepalive for stale mitigation
This commit is contained in:
@@ -618,6 +618,7 @@ async def _bridge_ws(reader, writer, ws: RawWebSocket, label,
|
||||
up_packets = 0
|
||||
down_packets = 0
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
last_recv_time = start_time
|
||||
|
||||
async def tcp_to_ws():
|
||||
nonlocal up_bytes, up_packets
|
||||
@@ -644,31 +645,48 @@ async def _bridge_ws(reader, writer, ws: RawWebSocket, label,
|
||||
log.debug("[%s] tcp->ws ended: %s", label, e)
|
||||
|
||||
async def ws_to_tcp():
|
||||
nonlocal down_bytes, down_packets
|
||||
nonlocal down_bytes, down_packets, last_recv_time
|
||||
try:
|
||||
while True:
|
||||
data = await ws.recv()
|
||||
if data is None:
|
||||
break
|
||||
last_recv_time = asyncio.get_event_loop().time()
|
||||
n = len(data)
|
||||
_stats.bytes_down += n
|
||||
down_bytes += n
|
||||
down_packets += 1
|
||||
writer.write(data)
|
||||
# drain only when kernel buffer is filling up
|
||||
buf = writer.transport.get_write_buffer_size()
|
||||
if buf > _SEND_BUF:
|
||||
await writer.drain()
|
||||
await writer.drain()
|
||||
except (asyncio.CancelledError, ConnectionError, OSError):
|
||||
return
|
||||
except Exception as e:
|
||||
log.debug("[%s] ws->tcp ended: %s", label, e)
|
||||
|
||||
async def ws_keepalive():
|
||||
try:
|
||||
while not ws._closed:
|
||||
await asyncio.sleep(2)
|
||||
idle = asyncio.get_event_loop().time() - last_recv_time
|
||||
if idle >= 2 and not ws._closed:
|
||||
try:
|
||||
ws.writer.write(
|
||||
ws._build_frame(ws.OP_PING, b'', mask=True))
|
||||
await ws.writer.drain()
|
||||
log.debug("[%s] %s WS PING (idle %.1fs)",
|
||||
label, dc_tag, idle)
|
||||
except Exception:
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
ka_task = asyncio.create_task(ws_keepalive())
|
||||
tasks = [asyncio.create_task(tcp_to_ws()),
|
||||
asyncio.create_task(ws_to_tcp())]
|
||||
try:
|
||||
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
||||
finally:
|
||||
ka_task.cancel()
|
||||
for t in tasks:
|
||||
t.cancel()
|
||||
for t in tasks:
|
||||
|
||||
Reference in New Issue
Block a user