diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index 4d1a0b8..e23065f 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -4,6 +4,7 @@ import argparse import asyncio import base64 import logging +from collections import deque import logging.handlers import os import socket as _socket @@ -559,12 +560,15 @@ class Stats: self.pool_misses = 0 def summary(self) -> str: + pool_total = self.pool_hits + self.pool_misses + pool_s = ( + f"{self.pool_hits}/{pool_total}" if pool_total else "n/a") return (f"total={self.connections_total} ws={self.connections_ws} " f"tcp_fb={self.connections_tcp_fallback} " f"http_skip={self.connections_http_rejected} " f"pass={self.connections_passthrough} " f"err={self.ws_errors} " - f"pool={self.pool_hits}/{self.pool_hits+self.pool_misses} " + f"pool={pool_s} " f"up={_human_bytes(self.bytes_up)} " f"down={_human_bytes(self.bytes_down)}") @@ -574,7 +578,7 @@ _stats = Stats() class _WsPool: def __init__(self): - self._idle: Dict[Tuple[int, bool], list] = {} + self._idle: Dict[Tuple[int, bool], deque] = {} self._refilling: Set[Tuple[int, bool]] = set() async def get(self, dc: int, is_media: bool, @@ -583,9 +587,12 @@ class _WsPool: key = (dc, is_media) now = time.monotonic() - bucket = self._idle.get(key, []) + bucket = self._idle.get(key) + if bucket is None: + bucket = deque() + self._idle[key] = bucket while bucket: - ws, created = bucket.pop(0) + ws, created = bucket.popleft() age = now - created if age > _WS_POOL_MAX_AGE or ws._closed: asyncio.create_task(self._quiet_close(ws)) @@ -609,7 +616,7 @@ class _WsPool: async def _refill(self, key, target_ip, domains): dc, is_media = key try: - bucket = self._idle.setdefault(key, []) + bucket = self._idle.setdefault(key, deque()) needed = _WS_POOL_SIZE - len(bucket) if needed <= 0: return @@ -1140,34 +1147,50 @@ async def _run(port: int, dc_opt: Dict[int, Optional[str]], log.info("=" * 60) async def log_stats(): - while True: - await asyncio.sleep(60) - bl = ', '.join( - f'DC{d}{"m" if m else ""}' - for d, m in sorted(_ws_blacklist)) or 'none' - log.info("stats: %s | ws_bl: %s", _stats.summary(), bl) + try: + while True: + await asyncio.sleep(60) + bl = ', '.join( + f'DC{d}{"m" if m else ""}' + for d, m in sorted(_ws_blacklist)) or 'none' + log.info("stats: %s | ws_bl: %s", _stats.summary(), bl) + except asyncio.CancelledError: + raise - asyncio.create_task(log_stats()) + log_stats_task = asyncio.create_task(log_stats()) await _ws_pool.warmup(dc_opt) - if stop_event: - async def wait_stop(): - await stop_event.wait() - server.close() - me = asyncio.current_task() - for task in list(asyncio.all_tasks()): - if task is not me: - task.cancel() - try: - await server.wait_closed() - except asyncio.CancelledError: - pass - asyncio.create_task(wait_stop()) - - async with server: + try: + async with server: + if stop_event: + serve_task = asyncio.create_task(server.serve_forever()) + stop_task = asyncio.create_task(stop_event.wait()) + done, _pending = await asyncio.wait( + (serve_task, stop_task), + return_when=asyncio.FIRST_COMPLETED, + ) + if stop_task in done: + server.close() + await server.wait_closed() + if not serve_task.done(): + serve_task.cancel() + try: + await serve_task + except asyncio.CancelledError: + pass + else: + stop_task.cancel() + try: + await stop_task + except asyncio.CancelledError: + pass + else: + await server.serve_forever() + finally: + log_stats_task.cancel() try: - await server.serve_forever() + await log_stats_task except asyncio.CancelledError: pass _server_instance = None diff --git a/ui/ctk_tray_ui.py b/ui/ctk_tray_ui.py index 5661827..fc5b63e 100644 --- a/ui/ctk_tray_ui.py +++ b/ui/ctk_tray_ui.py @@ -51,7 +51,7 @@ _TIP_LOG_MB = ( ) _TIP_AUTOSTART = ( "Запускать TG WS Proxy при входе в Windows. " - "Если вы переместите программу в другую папку, автозапуска сбросится" + "Если вы переместите программу в другую папку, автозапуск сбросится" ) _TIP_CHECK_UPDATES = ( "При запуске проверять наличие обновлений" diff --git a/utils/update_check.py b/utils/update_check.py index 76c8770..026dd41 100644 --- a/utils/update_check.py +++ b/utils/update_check.py @@ -1,11 +1,18 @@ """ Минимальная проверка новой версии через GitHub Releases API (без сторонних зависимостей). + +Ограничение частоты запросов: не чаще одного раза в час на машину (кэш в каталоге +данных приложения). Поддерживается If-None-Match (ETag) для ответа 304. """ from __future__ import annotations import json +import os +import sys +import time from itertools import zip_longest -from typing import Any, Dict, Optional +from pathlib import Path +from typing import Any, Dict, Optional, Tuple from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen @@ -13,6 +20,9 @@ REPO = "Flowseal/tg-ws-proxy" RELEASES_LATEST_API = f"https://api.github.com/repos/{REPO}/releases/latest" RELEASES_PAGE_URL = f"https://github.com/{REPO}/releases/latest" +# Не чаще одного полного запроса к API в час (без учёта 304 с тем же ETag). +_MIN_FETCH_INTERVAL_SEC = 3600.0 + _state: Dict[str, Any] = { "checked": False, "has_update": False, @@ -23,6 +33,39 @@ _state: Dict[str, Any] = { } +def _cache_file() -> Optional[Path]: + try: + if sys.platform == "win32": + root = Path(os.environ.get("APPDATA", str(Path.home()))) / "TgWsProxy" + elif sys.platform == "darwin": + root = Path.home() / "Library/Application Support/TgWsProxy" + else: + xdg = os.environ.get("XDG_CONFIG_HOME") + root = (Path(xdg).expanduser() if xdg else Path.home() / ".config") / "TgWsProxy" + root.mkdir(parents=True, exist_ok=True) + return root / ".update_check_cache.json" + except OSError: + return None + + +def _load_cache(path: Optional[Path]) -> Dict[str, Any]: + if not path or not path.is_file(): + return {} + try: + return json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return {} + + +def _save_cache(path: Optional[Path], data: Dict[str, Any]) -> None: + if not path: + return + try: + path.write_text(json.dumps(data), encoding="utf-8") + except OSError: + pass + + def _parse_version_tuple(s: str) -> tuple: s = (s or "").strip().lstrip("vV") if not s: @@ -52,18 +95,56 @@ def _version_gt(a: str, b: str) -> bool: return False -def fetch_latest_release(timeout: float = 12.0) -> Optional[dict]: +def _apply_release_tag( + tag: str, html_url: str, current_version: str, +) -> None: + global _state + if not tag: + _state["has_update"] = False + _state["ahead_of_release"] = False + _state["latest"] = None + _state["html_url"] = html_url.strip() or RELEASES_PAGE_URL + return + latest_clean = tag.lstrip("vV") + cur = (current_version or "").strip().lstrip("vV") + _state["latest"] = latest_clean + _state["html_url"] = html_url.strip() or RELEASES_PAGE_URL + _state["has_update"] = _version_gt(latest_clean, cur) + _state["ahead_of_release"] = bool(latest_clean) and _version_gt( + cur, latest_clean + ) + + +def fetch_latest_release( + timeout: float = 12.0, + etag: Optional[str] = None, +) -> Tuple[Optional[dict], Optional[str], int]: + """ + GET releases/latest. Возвращает (data или None при 304, etag или None, HTTP-код). + """ + headers = { + "Accept": "application/vnd.github+json", + "User-Agent": "tg-ws-proxy-update-check", + } + if etag: + headers["If-None-Match"] = etag req = Request( RELEASES_LATEST_API, - headers={ - "Accept": "application/vnd.github+json", - "User-Agent": "tg-ws-proxy-update-check", - }, + headers=headers, method="GET", ) - with urlopen(req, timeout=timeout) as resp: - raw = resp.read().decode("utf-8", errors="replace") - return json.loads(raw) + try: + with urlopen(req, timeout=timeout) as resp: + code = getattr(resp, "status", None) or resp.getcode() + new_etag = resp.headers.get("ETag") + raw = resp.read().decode("utf-8", errors="replace") + return json.loads(raw), new_etag, int(code) + except HTTPError as e: + if e.code == 304: + hdrs = e.headers + new_etag = hdrs.get("ETag") if hdrs else None + return None, new_etag or etag, 304 + raise def run_check(current_version: str) -> None: @@ -71,8 +152,41 @@ def run_check(current_version: str) -> None: global _state _state["checked"] = True _state["error"] = None + + cache_path = _cache_file() + cache = _load_cache(cache_path) + now = time.time() + last_attempt = float(cache.get("last_attempt_at") or 0) + + if last_attempt and (now - last_attempt) < _MIN_FETCH_INTERVAL_SEC: + tag = (cache.get("tag_name") or "").strip() + if tag: + _apply_release_tag(tag, cache.get("html_url") or "", current_version) + return + err = cache.get("last_error") + _state["error"] = ( + err if err else "Проверка обновлений отложена (интервал между запросами)." + ) + _state["has_update"] = False + _state["ahead_of_release"] = False + _state["latest"] = None + _state["html_url"] = RELEASES_PAGE_URL + return + + etag = (cache.get("etag") or "").strip() or None try: - data = fetch_latest_release() + data, new_etag, code = fetch_latest_release(etag=etag) + cache["last_attempt_at"] = now + if code == 304: + tag = (cache.get("tag_name") or "").strip() + url = (cache.get("html_url") or "").strip() or RELEASES_PAGE_URL + _apply_release_tag(tag, url, current_version) + if new_etag: + cache["etag"] = new_etag + _save_cache(cache_path, cache) + return + + assert data is not None tag = (data.get("tag_name") or "").strip() html_url = (data.get("html_url") or "").strip() or RELEASES_PAGE_URL if not tag: @@ -80,17 +194,24 @@ def run_check(current_version: str) -> None: _state["ahead_of_release"] = False _state["latest"] = None _state["html_url"] = html_url - return - latest_clean = tag.lstrip("vV") - cur = (current_version or "").strip().lstrip("vV") - _state["latest"] = latest_clean - _state["html_url"] = html_url - _state["has_update"] = _version_gt(latest_clean, cur) - _state["ahead_of_release"] = bool(latest_clean) and _version_gt( - cur, latest_clean - ) + else: + _apply_release_tag(tag, html_url, current_version) + if new_etag: + cache["etag"] = new_etag + cache["tag_name"] = tag + cache["html_url"] = html_url + cache.pop("last_error", None) + _save_cache(cache_path, cache) except (HTTPError, URLError, OSError, TimeoutError, ValueError, json.JSONDecodeError) as e: - _state["error"] = str(e) + cache["last_attempt_at"] = now + msg = str(e) + if isinstance(e, HTTPError) and e.code == 403: + msg = ( + "GitHub API вернул 403 (лимит или доступ). Повторите позже." + ) + cache["last_error"] = msg + _save_cache(cache_path, cache) + _state["error"] = msg _state["has_update"] = False _state["ahead_of_release"] = False _state["latest"] = None