tg-ws-proxy/utils/tray_proxy_runner.py

178 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Запуск asyncio-прокси в отдельном потоке (общий для tray entrypoints)."""
from __future__ import annotations
import asyncio as _asyncio
import threading
import time
from typing import Any, Callable, Dict, Mapping, Optional, Tuple
import proxy.tg_ws_proxy as tg_ws_proxy
from utils.tray_diagnostics import try_bind_listen_socket
from utils.tray_proxy_state import ProxyRuntimeState
ProxyStopState = Tuple[Any, Any] # (loop, Event)
class ProxyThreadRunner:
"""Управляет потоком с tg_ws_proxy._run и корректной остановкой."""
def __init__(
self,
*,
default_config: Mapping[str, Any],
get_config: Callable[[], Dict[str, Any]],
log: Any,
show_error: Callable[[str], None],
join_timeout: float = 2.0,
warn_on_join_stuck: bool = False,
runtime_state: Optional[ProxyRuntimeState] = None,
check_port_before_start: bool = True,
) -> None:
self._default = dict(default_config)
self._get_config = get_config
self._log = log
self._show_error = show_error
self._join_timeout = join_timeout
self._warn_on_join_stuck = warn_on_join_stuck
self._runtime_state = runtime_state
self._check_port_before_start = check_port_before_start
self._thread: Optional[threading.Thread] = None
self._async_stop: Optional[ProxyStopState] = None
@property
def async_stop(self) -> Optional[ProxyStopState]:
return self._async_stop
def _run_proxy_thread(
self,
port: int,
dc_opt: Dict[int, str],
verbose: bool,
host: str = "127.0.0.1",
) -> None:
loop = _asyncio.new_event_loop()
_asyncio.set_event_loop(loop)
stop_ev = _asyncio.Event()
self._async_stop = (loop, stop_ev)
had_exception = False
def _on_listening() -> None:
if self._runtime_state is not None:
self._runtime_state.set_listening()
try:
loop.run_until_complete(
tg_ws_proxy._run(
port,
dc_opt,
stop_event=stop_ev,
host=host,
on_listening=_on_listening,
)
)
except Exception as exc:
had_exception = True
self._log.error("Proxy thread crashed: %s", exc)
msg = str(exc)
winerr = getattr(exc, "winerror", None)
port_busy = (
"Address already in use" in msg
or winerr in (10048, 10013)
or "10048" in msg
or "10013" in msg
)
if self._runtime_state is not None:
self._runtime_state.set_error(
"Порт занят" if port_busy else msg
)
if port_busy:
self._show_error(
"Не удалось запустить прокси:\n"
"Порт уже используется (возможно, уже открыт TG WS Proxy).\n\n"
"Закройте второй экземпляр или другое приложение на этом порту, "
"или смените порт в настройках."
)
finally:
loop.close()
self._async_stop = None
if self._runtime_state is not None:
self._runtime_state.mark_idle_after_thread(
had_exception=had_exception
)
def start(self) -> None:
if self._thread and self._thread.is_alive():
self._log.info("Proxy already running")
return
cfg = self._get_config()
port = cfg.get("port", self._default["port"])
host = cfg.get("host", self._default["host"])
dc_ip_list = cfg.get("dc_ip", self._default["dc_ip"])
verbose = bool(cfg.get("verbose", False))
if self._runtime_state is not None:
self._runtime_state.reset_for_start()
try:
dc_opt = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
self._log.error("Bad config dc_ip: %s", e)
if self._runtime_state is not None:
self._runtime_state.set_error(str(e))
self._show_error(f"Ошибка конфигурации:\n{e}")
return
if self._check_port_before_start:
ok, err = try_bind_listen_socket(host, port)
if not ok:
self._log.warning("Port bind probe failed: %s", err)
if self._runtime_state is not None:
self._runtime_state.set_error(err)
self._show_error(
"Не удалось запустить прокси:\n"
f"{err}\n\n"
"Измените порт в настройках или закройте программу, "
"занимающую этот порт."
)
return
self._log.info("Starting proxy on %s:%d ...", host, port)
buf_kb = cfg.get("buf_kb", self._default["buf_kb"])
pool_size = cfg.get("pool_size", self._default["pool_size"])
tg_ws_proxy._RECV_BUF = max(4, buf_kb) * 1024
tg_ws_proxy._SEND_BUF = tg_ws_proxy._RECV_BUF
tg_ws_proxy._WS_POOL_SIZE = max(0, pool_size)
self._thread = threading.Thread(
target=self._run_proxy_thread,
args=(port, dc_opt, verbose, host),
daemon=True,
name="proxy",
)
self._thread.start()
def stop(self) -> None:
if self._runtime_state is not None:
self._runtime_state.set_stopping()
if self._async_stop:
loop, stop_ev = self._async_stop
loop.call_soon_threadsafe(stop_ev.set)
if self._thread:
self._thread.join(timeout=self._join_timeout)
if self._warn_on_join_stuck and self._thread.is_alive():
self._log.warning(
"Proxy thread did not finish within timeout; "
"the process may still exit shortly"
)
self._thread = None
self._log.info("Proxy stopped")
def restart(self) -> None:
self._log.info("Restarting proxy...")
self.stop()
time.sleep(0.3)
self.start()