tg-ws-proxy/utils/tray_common.py

538 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
import json
import logging
import logging.handlers
import os
import socket as _socket
import sys
import threading
import time
from pathlib import Path
from typing import Any, Callable, Dict, Optional, Tuple
import psutil
import proxy.tg_ws_proxy as tg_ws_proxy
from proxy import __version__
from utils.default_config import default_tray_config
from utils.tray_proxy_state import ProxyRuntimeState
log = logging.getLogger("tg-ws-tray")
APP_NAME = "TgWsProxy"
def _app_dir() -> Path:
if sys.platform == "win32":
return Path(os.environ.get("APPDATA", Path.home())) / APP_NAME
if sys.platform == "darwin":
return Path.home() / "Library" / "Application Support" / APP_NAME
return Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / APP_NAME
APP_DIR = _app_dir()
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done_mtproto"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
DEFAULT_CONFIG: Dict[str, Any] = default_tray_config()
IS_FROZEN = bool(getattr(sys, "frozen", False))
def ensure_dirs() -> None:
APP_DIR.mkdir(parents=True, exist_ok=True)
# single-instance lock
_lock_file_path: Optional[Path] = None
def _same_process(meta: dict, proc: psutil.Process, script_hint: str) -> bool:
try:
lock_ct = float(meta.get("create_time", 0.0))
if lock_ct > 0 and abs(lock_ct - proc.create_time()) > 1.0:
return False
except Exception:
return False
if IS_FROZEN:
try:
return os.path.basename(sys.executable).lower() == proc.name().lower()
except Exception:
return APP_NAME.lower() in proc.name().lower()
try:
for arg in proc.cmdline():
if script_hint in arg:
return True
except Exception:
pass
try:
proc_exe = proc.exe()
if proc_exe and sys.executable:
if os.path.normcase(os.path.abspath(proc_exe)) == os.path.normcase(
os.path.abspath(sys.executable)
):
try:
joined = " ".join(proc.cmdline())
except Exception:
joined = ""
if script_hint and script_hint.lower() in joined.lower():
return True
except Exception:
pass
return False
def acquire_lock(script_hint: str = "") -> bool:
global _lock_file_path
ensure_dirs()
for f in list(APP_DIR.glob("*.lock")):
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
meta: dict = {}
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
pass
if not psutil.pid_exists(pid):
f.unlink(missing_ok=True)
continue
try:
proc = psutil.Process(pid)
except psutil.NoSuchProcess:
f.unlink(missing_ok=True)
continue
except Exception:
# PID жив, но не удалось получить процесс: безопаснее считать lock занятым.
if psutil.pid_exists(pid):
return False
f.unlink(missing_ok=True)
continue
# Если lock записан старым процессом (PID reused) — считаем lock устаревшим.
try:
lock_ct = float(meta.get("create_time", 0.0))
if lock_ct > 0 and abs(lock_ct - proc.create_time()) > 1.0:
f.unlink(missing_ok=True)
continue
except Exception:
pass
try:
if _same_process(meta, proc, script_hint):
return False
except Exception:
pass
# PID живой, но мы не уверены, что это "наш" процесс:
# не удаляем lock, чтобы не запускать второй экземпляр.
try:
if proc.is_running():
return False
except Exception:
return False
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
lock_file.write_text(
json.dumps({"create_time": proc.create_time()}, ensure_ascii=False),
encoding="utf-8",
)
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
def release_lock() -> None:
global _lock_file_path
if _lock_file_path:
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
# config
def load_config() -> dict:
ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict) -> None:
ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
# logging
_LOG_FMT_FILE = "%(asctime)s %(levelname)-5s %(name)s %(message)s"
_LOG_FMT_CONSOLE = "%(asctime)s %(levelname)-5s %(message)s"
def setup_logging(verbose: bool = False, log_max_mb: float = 5) -> None:
ensure_dirs()
level = logging.DEBUG if verbose else logging.INFO
root = logging.getLogger()
root.setLevel(level)
fh = logging.handlers.RotatingFileHandler(
str(LOG_FILE),
maxBytes=max(32 * 1024, int(log_max_mb * 1024 * 1024)),
backupCount=0,
encoding="utf-8",
)
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(_LOG_FMT_FILE, datefmt="%Y-%m-%d %H:%M:%S"))
root.addHandler(fh)
if not IS_FROZEN:
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(level)
ch.setFormatter(logging.Formatter(_LOG_FMT_CONSOLE, datefmt="%H:%M:%S"))
root.addHandler(ch)
# icon
def make_icon_image(size: int = 64, *, color: Tuple[int, ...] = (0, 136, 204, 255)):
from PIL import Image, ImageDraw, ImageFont
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = 2
draw.ellipse([margin, margin, size - margin, size - margin], fill=color)
for path in _font_paths():
try:
font = ImageFont.truetype(path, size=int(size * 0.55))
break
except Exception:
continue
else:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
draw.text(
((size - tw) // 2 - bbox[0], (size - th) // 2 - bbox[1]),
"T",
fill=(255, 255, 255, 255),
font=font,
)
return img
def _font_paths():
if sys.platform == "win32":
return ["arial.ttf"]
if sys.platform == "darwin":
return ["/System/Library/Fonts/Helvetica.ttc"]
return [
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/TTF/DejaVuSans-Bold.ttf",
]
def load_icon():
from PIL import Image
icon_path = Path(__file__).parents[1] / "icon.ico"
if icon_path.exists():
try:
return Image.open(str(icon_path))
except Exception:
pass
return make_icon_image(64)
# proxy lifecycle
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[Tuple[asyncio.AbstractEventLoop, asyncio.Event]] = None
_proxy_state = ProxyRuntimeState()
def _run_proxy_thread(on_port_busy: Callable[[str], None]) -> None:
global _async_stop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
stop_ev = asyncio.Event()
_async_stop = (loop, stop_ev)
had_exception = False
def _on_listening() -> None:
_proxy_state.set_listening()
try:
loop.run_until_complete(tg_ws_proxy._run(stop_event=stop_ev, on_listening=_on_listening))
except Exception as exc:
had_exception = True
log.error("Proxy thread crashed: %s", exc)
msg = str(exc)
if "Address already in use" in msg or "10048" in msg or "10013" in msg:
_proxy_state.set_error("Порт занят")
on_port_busy(
"Не удалось запустить прокси:\n"
"Порт уже используется другим приложением.\n\n"
"Закройте приложение, использующее этот порт, "
"или измените порт в настройках прокси и перезапустите."
)
else:
_proxy_state.set_error(msg)
finally:
loop.close()
_async_stop = None
_proxy_state.mark_idle_after_thread(had_exception=had_exception)
def apply_proxy_config(cfg: dict) -> bool:
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
try:
dc_redirects = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
return False
pc = tg_ws_proxy.proxy_config
pc.port = cfg.get("port", DEFAULT_CONFIG["port"])
pc.host = cfg.get("host", DEFAULT_CONFIG["host"])
pc.secret = cfg.get("secret", DEFAULT_CONFIG["secret"])
pc.dc_redirects = dc_redirects
pc.buffer_size = max(4, cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])) * 1024
pc.pool_size = max(0, cfg.get("pool_size", DEFAULT_CONFIG["pool_size"]))
return True
def start_proxy(cfg: dict, on_error: Callable[[str], None]) -> None:
global _proxy_thread
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
_proxy_state.reset_for_start()
if not apply_proxy_config(cfg):
_proxy_state.set_error("Ошибка конфигурации DC -> IP.")
on_error("Ошибка конфигурации DC → IP.")
return
pc = tg_ws_proxy.proxy_config
log.info("Starting proxy on %s:%d ...", pc.host, pc.port)
_proxy_thread = threading.Thread(
target=_run_proxy_thread, args=(on_error,), daemon=True, name="proxy"
)
_proxy_thread.start()
def stop_proxy() -> None:
global _proxy_thread, _async_stop
_proxy_state.set_stopping()
if _proxy_thread and _proxy_thread.is_alive() and _async_stop is None:
# Коротко ждем инициализации _async_stop в только что запущенном потоке,
# чтобы корректно отправить stop_event и не получить двойной запуск.
for _ in range(50):
if _async_stop is not None or not _proxy_thread.is_alive():
break
time.sleep(0.01)
if _async_stop:
loop, stop_ev = _async_stop
loop.call_soon_threadsafe(stop_ev.set)
if _proxy_thread:
_proxy_thread.join(timeout=5)
_proxy_thread = None
log.info("Proxy stopped")
def restart_proxy(cfg: dict, on_error: Callable[[str], None]) -> None:
log.info("Restarting proxy...")
stop_proxy()
time.sleep(0.3)
start_proxy(cfg, on_error)
def get_proxy_state() -> ProxyRuntimeState:
return _proxy_state
def tg_proxy_url(cfg: dict) -> str:
host = cfg.get("host", DEFAULT_CONFIG["host"])
port = cfg.get("port", DEFAULT_CONFIG["port"])
secret = cfg.get("secret", DEFAULT_CONFIG["secret"])
link_host = tg_ws_proxy.get_link_host(host)
return f"tg://proxy?server={link_host}&port={port}&secret=dd{secret}"
_IPV6_WARNING = (
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает или в логах присутствуют ошибки, "
"связанные с попытками подключения по IPv6 - "
"попробуйте отключить в настройках прокси Telegram попытку соединения "
"по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 "
"в системе.\n\n"
"Это предупреждение будет показано только один раз."
)
def _has_ipv6() -> bool:
try:
for addr in _socket.getaddrinfo(_socket.gethostname(), None, _socket.AF_INET6):
ip = addr[4][0]
if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"):
return True
except Exception:
pass
try:
s = _socket.socket(_socket.AF_INET6, _socket.SOCK_STREAM)
s.bind(("::1", 0))
s.close()
return True
except Exception:
return False
def check_ipv6_warning(show_info: Callable[[str, str], None]) -> None:
ensure_dirs()
if IPV6_WARN_MARKER.exists() or not _has_ipv6():
return
IPV6_WARN_MARKER.touch()
threading.Thread(
target=lambda: show_info(_IPV6_WARNING, "TG WS Proxy"),
daemon=True,
).start()
# update check
def maybe_notify_update(
cfg: dict,
is_exiting: Callable[[], bool],
ask_open: Callable[[str, str], bool],
) -> None:
if not cfg.get("check_updates", True):
return
def _work():
time.sleep(1.5)
if is_exiting():
return
try:
from utils.update_check import RELEASES_PAGE_URL, get_status, run_check
import webbrowser
run_check(__version__)
st = get_status()
if not st.get("has_update"):
return
url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ver = st.get("latest") or "?"
if ask_open(
f"Доступна новая версия: {ver}\n\nОткрыть страницу релиза в браузере?",
"TG WS Proxy — обновление",
):
webbrowser.open(url)
except Exception as exc:
log.debug("Update check failed: %s", exc)
threading.Thread(target=_work, daemon=True, name="update-check").start()
# ctk thread (windows / linux)
_ctk_root: Any = None
_ctk_root_ready = threading.Event()
def ensure_ctk_thread(ctk: Any) -> bool:
global _ctk_root
if ctk is None:
return False
if _ctk_root_ready.is_set():
return True
def _run():
global _ctk_root
from ui.ctk_theme import apply_ctk_appearance, install_tkinter_variable_del_guard
install_tkinter_variable_del_guard()
apply_ctk_appearance(ctk)
_ctk_root = ctk.CTk()
_ctk_root.withdraw()
_ctk_root_ready.set()
_ctk_root.mainloop()
threading.Thread(target=_run, daemon=True, name="ctk-root").start()
_ctk_root_ready.wait(timeout=5.0)
return _ctk_root is not None
def ctk_run_dialog(build_fn: Callable[[threading.Event], None]) -> None:
if _ctk_root is None:
return
done = threading.Event()
def _invoke():
try:
build_fn(done)
except Exception:
log.exception("CTk dialog failed")
done.set()
_ctk_root.after(0, _invoke)
done.wait()
import gc
gc.collect()
def quit_ctk() -> None:
if _ctk_root is not None:
try:
_ctk_root.after(0, _ctk_root.quit)
except Exception:
pass
# common bootstrap
def bootstrap(cfg: dict) -> None:
save_config(cfg)
if LOG_FILE.exists():
try:
LOG_FILE.unlink()
except Exception:
pass
setup_logging(
cfg.get("verbose", False),
log_max_mb=cfg.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]),
)
log.info("TG WS Proxy версия %s starting", __version__)
log.info("Config: %s", cfg)
log.info("Log file: %s", LOG_FILE)