fix: остановка прокси из трея, пул WS и проверка обновлений (#443)

This commit is contained in:
deexsed
2026-03-27 10:54:33 +03:00
committed by GitHub
parent b3a9bc6a8f
commit 0d11062c92
3 changed files with 193 additions and 49 deletions

View File

@@ -4,6 +4,7 @@ import argparse
import asyncio
import base64
import logging
from collections import deque
import logging.handlers
import os
import socket as _socket
@@ -559,12 +560,15 @@ class Stats:
self.pool_misses = 0
def summary(self) -> str:
pool_total = self.pool_hits + self.pool_misses
pool_s = (
f"{self.pool_hits}/{pool_total}" if pool_total else "n/a")
return (f"total={self.connections_total} ws={self.connections_ws} "
f"tcp_fb={self.connections_tcp_fallback} "
f"http_skip={self.connections_http_rejected} "
f"pass={self.connections_passthrough} "
f"err={self.ws_errors} "
f"pool={self.pool_hits}/{self.pool_hits+self.pool_misses} "
f"pool={pool_s} "
f"up={_human_bytes(self.bytes_up)} "
f"down={_human_bytes(self.bytes_down)}")
@@ -574,7 +578,7 @@ _stats = Stats()
class _WsPool:
def __init__(self):
self._idle: Dict[Tuple[int, bool], list] = {}
self._idle: Dict[Tuple[int, bool], deque] = {}
self._refilling: Set[Tuple[int, bool]] = set()
async def get(self, dc: int, is_media: bool,
@@ -583,9 +587,12 @@ class _WsPool:
key = (dc, is_media)
now = time.monotonic()
bucket = self._idle.get(key, [])
bucket = self._idle.get(key)
if bucket is None:
bucket = deque()
self._idle[key] = bucket
while bucket:
ws, created = bucket.pop(0)
ws, created = bucket.popleft()
age = now - created
if age > _WS_POOL_MAX_AGE or ws._closed:
asyncio.create_task(self._quiet_close(ws))
@@ -609,7 +616,7 @@ class _WsPool:
async def _refill(self, key, target_ip, domains):
dc, is_media = key
try:
bucket = self._idle.setdefault(key, [])
bucket = self._idle.setdefault(key, deque())
needed = _WS_POOL_SIZE - len(bucket)
if needed <= 0:
return
@@ -1140,34 +1147,50 @@ async def _run(port: int, dc_opt: Dict[int, Optional[str]],
log.info("=" * 60)
async def log_stats():
while True:
await asyncio.sleep(60)
bl = ', '.join(
f'DC{d}{"m" if m else ""}'
for d, m in sorted(_ws_blacklist)) or 'none'
log.info("stats: %s | ws_bl: %s", _stats.summary(), bl)
try:
while True:
await asyncio.sleep(60)
bl = ', '.join(
f'DC{d}{"m" if m else ""}'
for d, m in sorted(_ws_blacklist)) or 'none'
log.info("stats: %s | ws_bl: %s", _stats.summary(), bl)
except asyncio.CancelledError:
raise
asyncio.create_task(log_stats())
log_stats_task = asyncio.create_task(log_stats())
await _ws_pool.warmup(dc_opt)
if stop_event:
async def wait_stop():
await stop_event.wait()
server.close()
me = asyncio.current_task()
for task in list(asyncio.all_tasks()):
if task is not me:
task.cancel()
try:
await server.wait_closed()
except asyncio.CancelledError:
pass
asyncio.create_task(wait_stop())
async with server:
try:
async with server:
if stop_event:
serve_task = asyncio.create_task(server.serve_forever())
stop_task = asyncio.create_task(stop_event.wait())
done, _pending = await asyncio.wait(
(serve_task, stop_task),
return_when=asyncio.FIRST_COMPLETED,
)
if stop_task in done:
server.close()
await server.wait_closed()
if not serve_task.done():
serve_task.cancel()
try:
await serve_task
except asyncio.CancelledError:
pass
else:
stop_task.cancel()
try:
await stop_task
except asyncio.CancelledError:
pass
else:
await server.serve_forever()
finally:
log_stats_task.cancel()
try:
await server.serve_forever()
await log_stats_task
except asyncio.CancelledError:
pass
_server_instance = None