diff --git a/linux.py b/linux.py index 8311040..1c96d63 100644 --- a/linux.py +++ b/linux.py @@ -1,398 +1,44 @@ from __future__ import annotations -import asyncio as _asyncio -import json -import logging -import logging.handlers import os import subprocess import sys import threading -import webbrowser import time -from pathlib import Path -from typing import Dict, Optional +from typing import Optional import customtkinter as ctk -import psutil import pyperclip import pystray -from PIL import Image, ImageDraw, ImageFont +from PIL import Image, ImageTk import proxy.tg_ws_proxy as tg_ws_proxy -from proxy.tg_ws_proxy import proxy_config -from proxy import __version__ -from utils.default_config import default_tray_config +from utils.tray_common import ( + APP_NAME, DEFAULT_CONFIG, FIRST_RUN_MARKER, LOG_FILE, + acquire_lock, bootstrap, check_ipv6_warning, ctk_run_dialog, + ensure_ctk_thread, ensure_dirs, load_config, load_icon, log, + maybe_notify_update, quit_ctk, release_lock, restart_proxy, + save_config, start_proxy, stop_proxy, tg_proxy_url, +) from ui.ctk_tray_ui import ( - install_tray_config_buttons, - install_tray_config_form, - populate_first_run_window, - tray_settings_scroll_and_footer, + install_tray_config_buttons, install_tray_config_form, + populate_first_run_window, tray_settings_scroll_and_footer, validate_config_form, ) from ui.ctk_theme import ( - CONFIG_DIALOG_FRAME_PAD, - CONFIG_DIALOG_SIZE, - FIRST_RUN_SIZE, - create_ctk_toplevel, - ctk_theme_for_platform, - main_content_frame, + CONFIG_DIALOG_FRAME_PAD, CONFIG_DIALOG_SIZE, FIRST_RUN_SIZE, + create_ctk_toplevel, ctk_theme_for_platform, main_content_frame, ) -APP_NAME = "TgWsProxy" -APP_DIR = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / APP_NAME -CONFIG_FILE = APP_DIR / "config.json" -LOG_FILE = APP_DIR / "proxy.log" -FIRST_RUN_MARKER = APP_DIR / ".first_run_done" -IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned" - - -DEFAULT_CONFIG = default_tray_config() - - -_proxy_thread: Optional[threading.Thread] = None -_async_stop: Optional[object] = None _tray_icon: Optional[object] = None _config: dict = {} -_exiting: bool = False -_lock_file_path: Optional[Path] = None +_exiting = False -_ctk_root = None -_ctk_root_ready = threading.Event() +# dialogs (tkinter messagebox) -log = logging.getLogger("tg-ws-tray") - -def _same_process(lock_meta: dict, proc: psutil.Process) -> bool: - try: - lock_ct = float(lock_meta.get("create_time", 0.0)) - proc_ct = float(proc.create_time()) - if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0: - return False - except Exception: - return False - - try: - cmdline = proc.cmdline() - for arg in cmdline: - if "linux.py" in arg: - return True - except Exception: - pass - - frozen = bool(getattr(sys, "frozen", False)) - if frozen: - return APP_NAME.lower() in proc.name().lower() - - return False - - -def _release_lock(): - global _lock_file_path - if not _lock_file_path: - return - try: - _lock_file_path.unlink(missing_ok=True) - except Exception: - pass - _lock_file_path = None - - -def _acquire_lock() -> bool: - global _lock_file_path - _ensure_dirs() - lock_files = list(APP_DIR.glob("*.lock")) - - for f in lock_files: - pid = None - meta: dict = {} - - try: - pid = int(f.stem) - except Exception: - f.unlink(missing_ok=True) - continue - - try: - raw = f.read_text(encoding="utf-8").strip() - if raw: - meta = json.loads(raw) - except Exception: - meta = {} - - try: - proc = psutil.Process(pid) - if _same_process(meta, proc): - return False - except Exception: - pass - - f.unlink(missing_ok=True) - - lock_file = APP_DIR / f"{os.getpid()}.lock" - try: - proc = psutil.Process(os.getpid()) - payload = { - "create_time": proc.create_time(), - } - lock_file.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8") - except Exception: - lock_file.touch() - - _lock_file_path = lock_file - return True - - -def _ensure_dirs(): - APP_DIR.mkdir(parents=True, exist_ok=True) - - -def load_config() -> dict: - _ensure_dirs() - if CONFIG_FILE.exists(): - try: - with open(CONFIG_FILE, "r", encoding="utf-8") as f: - data = json.load(f) - for k, v in DEFAULT_CONFIG.items(): - data.setdefault(k, v) - return data - except Exception as exc: - log.warning("Failed to load config: %s", exc) - return dict(DEFAULT_CONFIG) - - -def save_config(cfg: dict): - _ensure_dirs() - with open(CONFIG_FILE, "w", encoding="utf-8") as f: - json.dump(cfg, f, indent=2, ensure_ascii=False) - - -def setup_logging(verbose: bool = False, log_max_mb: float = 5): - _ensure_dirs() - root = logging.getLogger() - root.setLevel(logging.DEBUG if verbose else logging.INFO) - - fh = logging.handlers.RotatingFileHandler( - str(LOG_FILE), - maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024), - backupCount=0, - encoding='utf-8', - ) - fh.setLevel(logging.DEBUG) - fh.setFormatter( - logging.Formatter( - "%(asctime)s %(levelname)-5s %(name)s %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - ) - root.addHandler(fh) - - if not getattr(sys, "frozen", False): - ch = logging.StreamHandler(sys.stdout) - ch.setLevel(logging.DEBUG if verbose else logging.INFO) - ch.setFormatter( - logging.Formatter( - "%(asctime)s %(levelname)-5s %(message)s", datefmt="%H:%M:%S" - ) - ) - root.addHandler(ch) - - -def _make_icon_image(size: int = 64): - if Image is None: - raise RuntimeError("Pillow is required for tray icon") - img = Image.new("RGBA", (size, size), (0, 0, 0, 0)) - draw = ImageDraw.Draw(img) - - margin = 2 - draw.ellipse( - [margin, margin, size - margin, size - margin], fill=(0, 136, 204, 255) - ) - - try: - font = ImageFont.truetype( - "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", - size=int(size * 0.55), - ) - except Exception: - try: - font = ImageFont.truetype( - "/usr/share/fonts/TTF/DejaVuSans-Bold.ttf", size=int(size * 0.55) - ) - except Exception: - font = ImageFont.load_default() - bbox = draw.textbbox((0, 0), "T", font=font) - tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] - tx = (size - tw) // 2 - bbox[0] - ty = (size - th) // 2 - bbox[1] - draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font) - - return img - - -def _load_icon(): - icon_path = Path(__file__).parent / "icon.ico" - if icon_path.exists() and Image: - try: - return Image.open(str(icon_path)) - except Exception: - pass - return _make_icon_image() - - -def _apply_linux_ctk_window_icon(root) -> None: - """PhotoImage храним на root — иначе GC может убрать картинку до закрытия окна.""" - icon_img = _load_icon() - if icon_img: - from PIL import ImageTk - - root._ctk_icon_photo = ImageTk.PhotoImage(icon_img.resize((64, 64))) - root.iconphoto(False, root._ctk_icon_photo) - - -def _ensure_ctk_thread() -> bool: - """Start the persistent hidden CTk root in its own thread (once).""" - global _ctk_root - if _ctk_root_ready.is_set(): - return True - - def _run(): - global _ctk_root - from ui.ctk_theme import ( - apply_ctk_appearance, - _install_tkinter_variable_del_guard, - ) - _install_tkinter_variable_del_guard() - apply_ctk_appearance(ctk) - _ctk_root = ctk.CTk() - _ctk_root.withdraw() - _ctk_root_ready.set() - _ctk_root.mainloop() - - threading.Thread(target=_run, daemon=True, name="ctk-root").start() - _ctk_root_ready.wait(timeout=5.0) - return _ctk_root is not None - - -def _ctk_run_dialog(build_fn) -> None: - """Schedule build_fn(done_event) on the CTk thread and block until done_event is set.""" - if _ctk_root is None: - return - done = threading.Event() - - def _invoke(): - try: - build_fn(done) - except Exception: - log.exception("CTk dialog failed") - done.set() - - _ctk_root.after(0, _invoke) - done.wait() - - -def _run_proxy_thread(): - global _async_stop - - loop = _asyncio.new_event_loop() - _asyncio.set_event_loop(loop) - - stop_ev = _asyncio.Event() - _async_stop = (loop, stop_ev) - - try: - loop.run_until_complete( - tg_ws_proxy._run(stop_event=stop_ev) - ) - except Exception as exc: - log.error("Proxy thread crashed: %s", exc) - if "Address already in use" in str(exc): - _show_error( - "Не удалось запустить прокси:\nПорт уже используется другим приложением.\n\nЗакройте приложение, использующее этот порт, или измените порт в настройках прокси и перезапустите." - ) - finally: - loop.close() - _async_stop = None - - -def start_proxy(): - global _proxy_thread, _config - if _proxy_thread and _proxy_thread.is_alive(): - log.info("Proxy already running") - return - - cfg = _config - port = cfg.get("port", DEFAULT_CONFIG["port"]) - host = cfg.get("host", DEFAULT_CONFIG["host"]) - secret = cfg.get("secret", DEFAULT_CONFIG["secret"]) - dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"]) - buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"]) - pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"]) - - try: - dc_redirects = tg_ws_proxy.parse_dc_ip_list(dc_ip_list) - except ValueError as e: - log.error("Bad config dc_ip: %s", e) - _show_error(f"Ошибка конфигурации:\n{e}") - return - - proxy_config.port = port - proxy_config.host = host - proxy_config.secret = secret - proxy_config.dc_redirects = dc_redirects - proxy_config.buffer_size = max(4, buf_kb) * 1024 - proxy_config.pool_size = max(0, pool_size) - - log.info("Starting proxy on %s:%d ...", host, port) - - _proxy_thread = threading.Thread( - target=_run_proxy_thread, - daemon=True, - name="proxy", - ) - _proxy_thread.start() - - -def stop_proxy(): - global _proxy_thread, _async_stop - if _async_stop: - loop, stop_ev = _async_stop - loop.call_soon_threadsafe(stop_ev.set) - if _proxy_thread: - _proxy_thread.join(timeout=2) - _proxy_thread = None - log.info("Proxy stopped") - - -def restart_proxy(): - log.info("Restarting proxy...") - stop_proxy() - time.sleep(0.3) - start_proxy() - - -def _show_error(text: str, title: str = "TG WS Proxy — Ошибка"): - import tkinter as _tk - from tkinter import messagebox as _mb - - root = _tk.Tk() - root.withdraw() - _mb.showerror(title, text, parent=root) - root.destroy() - - -def _show_info(text: str, title: str = "TG WS Proxy"): - import tkinter as _tk - from tkinter import messagebox as _mb - - root = _tk.Tk() - root.withdraw() - _mb.showinfo(title, text, parent=root) - root.destroy() - - -def _ask_yes_no_dialog(text: str, title: str = "TG WS Proxy") -> bool: +def _msgbox(kind: str, text: str, title: str, **kw): import tkinter as _tk from tkinter import messagebox as _mb @@ -402,178 +48,142 @@ def _ask_yes_no_dialog(text: str, title: str = "TG WS Proxy") -> bool: root.attributes("-topmost", True) except Exception: pass - r = _mb.askyesno(title, text, parent=root) + result = getattr(_mb, kind)(title, text, parent=root, **kw) root.destroy() - return bool(r) + return result -def _maybe_notify_update_async(): - def _work(): - time.sleep(1.5) - if _exiting: - return - if not _config.get("check_updates", True): - return - try: - from utils.update_check import RELEASES_PAGE_URL, get_status, run_check - run_check(__version__) - st = get_status() - if not st.get("has_update"): - return - url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL - ver = st.get("latest") or "?" - text = ( - f"Доступна новая версия: {ver}\n\n" - f"Открыть страницу релиза в браузере?" - ) - if _ask_yes_no_dialog(text, "TG WS Proxy — обновление"): - webbrowser.open(url) - except Exception as exc: - log.debug("Update check failed: %s", exc) - - threading.Thread(target=_work, daemon=True, name="update-check").start() +def _show_error(text: str, title: str = "TG WS Proxy — Ошибка") -> None: + _msgbox("showerror", text, title) -def _on_open_in_telegram(icon=None, item=None): - host = _config.get("host", DEFAULT_CONFIG["host"]) - port = _config.get("port", DEFAULT_CONFIG["port"]) - secret = _config.get("secret", DEFAULT_CONFIG["secret"]) +def _show_info(text: str, title: str = "TG WS Proxy") -> None: + _msgbox("showinfo", text, title) - url = f"tg://proxy?server={host}&port={port}&secret=dd{secret}" + +def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool: + return bool(_msgbox("askyesno", text, title)) + + +def _apply_window_icon(root) -> None: + icon_img = load_icon() + if icon_img: + root._ctk_icon_photo = ImageTk.PhotoImage(icon_img.resize((64, 64))) + root.iconphoto(False, root._ctk_icon_photo) + + +# tray callbacks + + +def _on_open_in_telegram(icon=None, item=None) -> None: + url = tg_proxy_url(_config) log.info("Copying %s", url) - try: pyperclip.copy(url) _show_info( - f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}", - "TG WS Proxy", + f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}" ) except Exception as exc: log.error("Clipboard copy failed: %s", exc) _show_error(f"Не удалось скопировать ссылку:\n{exc}") -def _on_restart(icon=None, item=None): - threading.Thread(target=restart_proxy, daemon=True).start() +def _on_restart(icon=None, item=None) -> None: + threading.Thread( + target=lambda: restart_proxy(_config, _show_error), daemon=True + ).start() -def _on_edit_config(icon=None, item=None): +def _on_edit_config(icon=None, item=None) -> None: threading.Thread(target=_edit_config_dialog, daemon=True).start() -def _edit_config_dialog(): - if not _ensure_ctk_thread(): - _show_error("customtkinter не установлен.") - return - - cfg = dict(_config) - - def _build(done: threading.Event): - theme = ctk_theme_for_platform() - w, h = CONFIG_DIALOG_SIZE - root = create_ctk_toplevel( - ctk, - title="TG WS Proxy — Настройки", - width=w, - height=h, - theme=theme, - after_create=_apply_linux_ctk_window_icon, - ) - - fpx, fpy = CONFIG_DIALOG_FRAME_PAD - frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy) - scroll, footer = tray_settings_scroll_and_footer(ctk, frame, theme) - widgets = install_tray_config_form( - ctk, scroll, theme, cfg, DEFAULT_CONFIG, - show_autostart=False, - ) - - def _finish(): - root.destroy() - done.set() - - def on_save(): - merged = validate_config_form( - widgets, DEFAULT_CONFIG, include_autostart=False) - if isinstance(merged, str): - _show_error(merged) - return - - new_cfg = merged - save_config(new_cfg) - _config.update(new_cfg) - log.info("Config saved: %s", new_cfg) - _tray_icon.menu = _build_menu() - - from tkinter import messagebox - do_restart = messagebox.askyesno( - "Перезапустить?", - "Настройки сохранены.\n\nПерезапустить прокси сейчас?", - parent=root) - _finish() - if do_restart: - threading.Thread( - target=restart_proxy, daemon=True).start() - - def on_cancel(): - _finish() - - root.protocol("WM_DELETE_WINDOW", on_cancel) - install_tray_config_buttons( - ctk, footer, theme, on_save=on_save, on_cancel=on_cancel) - - _ctk_run_dialog(_build) - - -def _on_open_logs(icon=None, item=None): +def _on_open_logs(icon=None, item=None) -> None: log.info("Opening log file: %s", LOG_FILE) if LOG_FILE.exists(): - env = os.environ.copy() - env.pop("VIRTUAL_ENV", None) - env.pop("PYTHONPATH", None) - env.pop("PYTHONHOME", None) - + env = {k: v for k, v in os.environ.items() if k not in ("VIRTUAL_ENV", "PYTHONPATH", "PYTHONHOME")} subprocess.Popen( - ["xdg-open", str(LOG_FILE)], - env=env, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - stdin=subprocess.DEVNULL, - start_new_session=True, + ["xdg-open", str(LOG_FILE)], env=env, + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, start_new_session=True, ) else: - _show_info("Файл логов ещё не создан.", "TG WS Proxy") + _show_info("Файл логов ещё не создан.") -def _on_exit(icon=None, item=None): +def _on_exit(icon=None, item=None) -> None: global _exiting if _exiting: os._exit(0) return _exiting = True log.info("User requested exit") - - if _ctk_root is not None: - try: - _ctk_root.after(0, _ctk_root.quit) - except Exception: - pass - - def _force_exit(): - time.sleep(3) - os._exit(0) - - threading.Thread(target=_force_exit, daemon=True, name="force-exit").start() - + quit_ctk() + threading.Thread(target=lambda: (time.sleep(3), os._exit(0)), daemon=True, name="force-exit").start() if icon: icon.stop() -def _show_first_run(): - _ensure_dirs() +# settings dialog + + +def _edit_config_dialog() -> None: + if not ensure_ctk_thread(ctk): + _show_error("customtkinter не установлен.") + return + + cfg = dict(_config) + + def _build(done: threading.Event) -> None: + theme = ctk_theme_for_platform() + w, h = CONFIG_DIALOG_SIZE + root = create_ctk_toplevel( + ctk, title="TG WS Proxy — Настройки", width=w, height=h, theme=theme, + after_create=_apply_window_icon, + ) + fpx, fpy = CONFIG_DIALOG_FRAME_PAD + frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy) + scroll, footer = tray_settings_scroll_and_footer(ctk, frame, theme) + widgets = install_tray_config_form(ctk, scroll, theme, cfg, DEFAULT_CONFIG, show_autostart=False) + + def _finish() -> None: + root.destroy() + done.set() + + def on_save() -> None: + merged = validate_config_form(widgets, DEFAULT_CONFIG, include_autostart=False) + if isinstance(merged, str): + _show_error(merged) + return + save_config(merged) + _config.update(merged) + log.info("Config saved: %s", merged) + _tray_icon.menu = _build_menu() + + from tkinter import messagebox + do_restart = messagebox.askyesno( + "Перезапустить?", + "Настройки сохранены.\n\nПерезапустить прокси сейчас?", + parent=root, + ) + _finish() + if do_restart: + threading.Thread(target=lambda: restart_proxy(_config, _show_error), daemon=True).start() + + root.protocol("WM_DELETE_WINDOW", _finish) + install_tray_config_buttons(ctk, footer, theme, on_save=on_save, on_cancel=_finish) + + ctk_run_dialog(_build) + + +# first run + + +def _show_first_run() -> None: + ensure_dirs() if FIRST_RUN_MARKER.exists(): return - if not _ensure_ctk_thread(): + if not ensure_ctk_thread(ctk): FIRST_RUN_MARKER.touch() return @@ -581,90 +191,35 @@ def _show_first_run(): port = _config.get("port", DEFAULT_CONFIG["port"]) secret = _config.get("secret", DEFAULT_CONFIG["secret"]) - def _build(done: threading.Event): + def _build(done: threading.Event) -> None: theme = ctk_theme_for_platform() w, h = FIRST_RUN_SIZE root = create_ctk_toplevel( - ctk, - title="TG WS Proxy", - width=w, - height=h, - theme=theme, - after_create=_apply_linux_ctk_window_icon, + ctk, title="TG WS Proxy", width=w, height=h, theme=theme, + after_create=_apply_window_icon, ) - def on_done(open_tg: bool): + def on_done(open_tg: bool) -> None: FIRST_RUN_MARKER.touch() root.destroy() done.set() if open_tg: _on_open_in_telegram() - populate_first_run_window( - ctk, root, theme, host=host, port=port, secret=secret, - on_done=on_done) + populate_first_run_window(ctk, root, theme, host=host, port=port, secret=secret, on_done=on_done) - _ctk_run_dialog(_build) + ctk_run_dialog(_build) -def _has_ipv6_enabled() -> bool: - import socket as _sock - - try: - addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6) - for addr in addrs: - ip = addr[4][0] - if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"): - return True - except Exception: - pass - try: - s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM) - s.bind(("::1", 0)) - s.close() - return True - except Exception: - return False - - -def _check_ipv6_warning(): - _ensure_dirs() - if IPV6_WARN_MARKER.exists(): - return - if not _has_ipv6_enabled(): - return - - IPV6_WARN_MARKER.touch() - - threading.Thread(target=_show_ipv6_dialog, daemon=True).start() - - -def _show_ipv6_dialog(): - _show_info( - "На вашем компьютере включена поддержка подключения по IPv6.\n\n" - "Telegram может пытаться подключаться через IPv6, " - "что не поддерживается и может привести к ошибкам.\n\n" - "Если прокси не работает или в логах присутствуют ошибки, " - "связанные с попытками подключения по IPv6 - " - "попробуйте отключить в настройках прокси Telegram попытку соединения " - "по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 " - "в системе.\n\n" - "Это предупреждение будет показано только один раз.", - "TG WS Proxy", - ) +# tray menu def _build_menu(): - if pystray is None: - return None host = _config.get("host", DEFAULT_CONFIG["host"]) port = _config.get("port", DEFAULT_CONFIG["port"]) link_host = tg_ws_proxy.get_link_host(host) - return pystray.Menu( - pystray.MenuItem( - f"Открыть в Telegram ({link_host}:{port})", _on_open_in_telegram, default=True - ), + pystray.MenuItem(f"Открыть в Telegram ({link_host}:{port})", _on_open_in_telegram, default=True), pystray.Menu.SEPARATOR, pystray.MenuItem("Перезапустить прокси", _on_restart), pystray.MenuItem("Настройки...", _on_edit_config), @@ -674,27 +229,18 @@ def _build_menu(): ) -def run_tray(): +# entry point + + +def run_tray() -> None: global _tray_icon, _config _config = load_config() - save_config(_config) - - if LOG_FILE.exists(): - try: - LOG_FILE.unlink() - except Exception: - pass - - setup_logging(_config.get("verbose", False), - log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"])) - log.info("TG WS Proxy версия %s, tray app starting", __version__) - log.info("Config: %s", _config) - log.info("Log file: %s", LOG_FILE) + bootstrap(_config) if pystray is None or Image is None: log.error("pystray or Pillow not installed; running in console mode") - start_proxy() + start_proxy(_config, _show_error) try: while True: time.sleep(1) @@ -702,16 +248,12 @@ def run_tray(): stop_proxy() return - start_proxy() - - _maybe_notify_update_async() - + start_proxy(_config, _show_error) + maybe_notify_update(_config, lambda: _exiting, _ask_yes_no) _show_first_run() - _check_ipv6_warning() - - icon_image = _load_icon() - _tray_icon = pystray.Icon(APP_NAME, icon_image, "TG WS Proxy", menu=_build_menu()) + check_ipv6_warning(_show_info) + _tray_icon = pystray.Icon(APP_NAME, load_icon(), "TG WS Proxy", menu=_build_menu()) log.info("Tray icon running") _tray_icon.run() @@ -719,15 +261,14 @@ def run_tray(): log.info("Tray app exited") -def main(): - if not _acquire_lock(): +def main() -> None: + if not acquire_lock("linux.py"): _show_info("Приложение уже запущено.", os.path.basename(sys.argv[0])) return - try: run_tray() finally: - _release_lock() + release_lock() if __name__ == "__main__": diff --git a/macos.py b/macos.py index 8141e15..a2379f1 100644 --- a/macos.py +++ b/macos.py @@ -1,18 +1,13 @@ from __future__ import annotations -import json -import logging -import logging.handlers import os -import psutil import subprocess import sys import threading import time import webbrowser -import asyncio as _asyncio from pathlib import Path -from typing import Dict, Optional +from typing import Optional try: import rumps @@ -30,261 +25,135 @@ except ImportError: pyperclip = None import proxy.tg_ws_proxy as tg_ws_proxy -from proxy.tg_ws_proxy import proxy_config from proxy import __version__ -from utils.default_config import default_tray_config +from utils.tray_common import ( + APP_DIR, APP_NAME, DEFAULT_CONFIG, FIRST_RUN_MARKER, IPV6_WARN_MARKER, + LOG_FILE, acquire_lock, apply_proxy_config, ensure_dirs, load_config, + log, release_lock, save_config, setup_logging, stop_proxy, tg_proxy_url, +) -APP_NAME = "TgWsProxy" -APP_DIR = Path.home() / "Library" / "Application Support" / APP_NAME -CONFIG_FILE = APP_DIR / "config.json" -LOG_FILE = APP_DIR / "proxy.log" -FIRST_RUN_MARKER = APP_DIR / ".first_run_done" -IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned" MENUBAR_ICON_PATH = APP_DIR / "menubar_icon.png" -DEFAULT_CONFIG = default_tray_config() - _proxy_thread: Optional[threading.Thread] = None _async_stop: Optional[object] = None _app: Optional[object] = None _config: dict = {} _exiting: bool = False -_lock_file_path: Optional[Path] = None -log = logging.getLogger("tg-ws-tray") +# osascript dialogs -# Single-instance lock - -def _same_process(lock_meta: dict, proc: psutil.Process) -> bool: - try: - lock_ct = float(lock_meta.get("create_time", 0.0)) - proc_ct = float(proc.create_time()) - if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0: - return False - except Exception: - return False - - frozen = bool(getattr(sys, "frozen", False)) - if frozen: - return APP_NAME.lower() in proc.name().lower() - return False +def _esc(text: str) -> str: + return text.replace("\\", "\\\\").replace('"', '\\"') -def _release_lock(): - global _lock_file_path - if not _lock_file_path: - return - try: - _lock_file_path.unlink(missing_ok=True) - except Exception: - pass - _lock_file_path = None +def _osascript(script: str) -> str: + r = subprocess.run(["osascript", "-e", script], capture_output=True, text=True) + return r.stdout.strip() -def _acquire_lock() -> bool: - global _lock_file_path - _ensure_dirs() - lock_files = list(APP_DIR.glob("*.lock")) - - for f in lock_files: - pid = None - meta: dict = {} - - try: - pid = int(f.stem) - except Exception: - f.unlink(missing_ok=True) - continue - - try: - raw = f.read_text(encoding="utf-8").strip() - if raw: - meta = json.loads(raw) - except Exception: - meta = {} - - try: - proc = psutil.Process(pid) - if _same_process(meta, proc): - return False - except Exception: - pass - - f.unlink(missing_ok=True) - - lock_file = APP_DIR / f"{os.getpid()}.lock" - try: - proc = psutil.Process(os.getpid()) - payload = {"create_time": proc.create_time()} - lock_file.write_text(json.dumps(payload, ensure_ascii=False), - encoding="utf-8") - except Exception: - lock_file.touch() - - _lock_file_path = lock_file - return True - - -# Filesystem helpers - -def _ensure_dirs(): - APP_DIR.mkdir(parents=True, exist_ok=True) - - -def load_config() -> dict: - _ensure_dirs() - if CONFIG_FILE.exists(): - try: - with open(CONFIG_FILE, "r", encoding="utf-8") as f: - data = json.load(f) - for k, v in DEFAULT_CONFIG.items(): - data.setdefault(k, v) - return data - except Exception as exc: - log.warning("Failed to load config: %s", exc) - return dict(DEFAULT_CONFIG) - - -def save_config(cfg: dict): - _ensure_dirs() - with open(CONFIG_FILE, "w", encoding="utf-8") as f: - json.dump(cfg, f, indent=2, ensure_ascii=False) - - -def setup_logging(verbose: bool = False, log_max_mb: float = 5): - _ensure_dirs() - root = logging.getLogger() - root.setLevel(logging.DEBUG if verbose else logging.INFO) - - fh = logging.handlers.RotatingFileHandler( - str(LOG_FILE), - maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024), - backupCount=0, - encoding='utf-8', +def _show_error(text: str, title: str = "TG WS Proxy") -> None: + _osascript( + f'display dialog "{_esc(text)}" with title "{_esc(title)}" ' + f'buttons {{"OK"}} default button "OK" with icon stop' ) - fh.setLevel(logging.DEBUG) - fh.setFormatter(logging.Formatter( - "%(asctime)s %(levelname)-5s %(name)s %(message)s", - datefmt="%Y-%m-%d %H:%M:%S")) - root.addHandler(fh) - - if not getattr(sys, "frozen", False): - ch = logging.StreamHandler(sys.stdout) - ch.setLevel(logging.DEBUG if verbose else logging.INFO) - ch.setFormatter(logging.Formatter( - "%(asctime)s %(levelname)-5s %(message)s", - datefmt="%H:%M:%S")) - root.addHandler(ch) -# Menubar icon +def _show_info(text: str, title: str = "TG WS Proxy") -> None: + _osascript( + f'display dialog "{_esc(text)}" with title "{_esc(title)}" ' + f'buttons {{"OK"}} default button "OK" with icon note' + ) + + +def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool: + return _ask_yes_no_close(text, title) is True + + +def _ask_yes_no_close(text: str, title: str = "TG WS Proxy") -> Optional[bool]: + r = subprocess.run( + [ + "osascript", "-e", + f'button returned of (display dialog "{_esc(text)}" ' + f'with title "{_esc(title)}" ' + f'buttons {{"Закрыть", "Нет", "Да"}} ' + f'default button "Да" cancel button "Закрыть" with icon note)', + ], + capture_output=True, text=True, + ) + if r.returncode != 0: + return None + btn = r.stdout.strip() + if btn == "Да": + return True + if btn == "Нет": + return False + return None + + +def _osascript_input(prompt: str, default: str, title: str = "TG WS Proxy") -> Optional[str]: + r = subprocess.run( + [ + "osascript", "-e", + f'text returned of (display dialog "{_esc(prompt)}" ' + f'default answer "{_esc(default)}" ' + f'with title "{_esc(title)}" ' + f'buttons {{"Закрыть", "OK"}} ' + f'default button "OK" cancel button "Закрыть")', + ], + capture_output=True, text=True, + ) + if r.returncode != 0: + return None + return r.stdout.rstrip("\r\n") + + +# menubar icon + def _make_menubar_icon(size: int = 44): if Image is None: return None img = Image.new("RGBA", (size, size), (0, 0, 0, 0)) draw = ImageDraw.Draw(img) - margin = size // 11 - draw.ellipse([margin, margin, size - margin, size - margin], - fill=(0, 0, 0, 255)) - + draw.ellipse([margin, margin, size - margin, size - margin], fill=(0, 0, 0, 255)) try: - font = ImageFont.truetype( - "/System/Library/Fonts/Helvetica.ttc", - size=int(size * 0.55)) + font = ImageFont.truetype("/System/Library/Fonts/Helvetica.ttc", size=int(size * 0.55)) except Exception: font = ImageFont.load_default() - bbox = draw.textbbox((0, 0), "T", font=font) tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] - tx = (size - tw) // 2 - bbox[0] - ty = (size - th) // 2 - bbox[1] - draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font) + draw.text( + ((size - tw) // 2 - bbox[0], (size - th) // 2 - bbox[1]), + "T", fill=(255, 255, 255, 255), font=font, + ) return img -# Generate menubar icon PNG if it does not exist. -def _ensure_menubar_icon(): + +def _ensure_menubar_icon() -> None: if MENUBAR_ICON_PATH.exists(): return - _ensure_dirs() + ensure_dirs() img = _make_menubar_icon(44) if img: img.save(str(MENUBAR_ICON_PATH), "PNG") -# Native macOS dialogs +# proxy lifecycle (macOS-local) -def _escape_osascript_text(text: str) -> str: - return text.replace('\\', '\\\\').replace('"', '\\"') +import asyncio as _asyncio -def _osascript(script: str) -> str: - r = subprocess.run( - ['osascript', '-e', script], - capture_output=True, text=True) - return r.stdout.strip() - - -def _show_error(text: str, title: str = "TG WS Proxy"): - text_esc = _escape_osascript_text(text) - title_esc = _escape_osascript_text(title) - _osascript( - f'display dialog "{text_esc}" with title "{title_esc}" ' - f'buttons {{"OK"}} default button "OK" with icon stop') - - -def _show_info(text: str, title: str = "TG WS Proxy"): - text_esc = _escape_osascript_text(text) - title_esc = _escape_osascript_text(title) - _osascript( - f'display dialog "{text_esc}" with title "{title_esc}" ' - f'buttons {{"OK"}} default button "OK" with icon note') - - -def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool: - result = _ask_yes_no_close(text, title) - return result is True - - -def _ask_yes_no_close(text: str, - title: str = "TG WS Proxy") -> Optional[bool]: - text_esc = _escape_osascript_text(text) - title_esc = _escape_osascript_text(title) - r = subprocess.run( - ['osascript', '-e', - f'button returned of (display dialog "{text_esc}" ' - f'with title "{title_esc}" ' - f'buttons {{"Закрыть", "Нет", "Да"}} ' - f'default button "Да" cancel button "Закрыть" with icon note)'], - capture_output=True, text=True) - if r.returncode != 0: - return None - - result = r.stdout.strip() - if result == "Да": - return True - if result == "Нет": - return False - return None - - -# Proxy lifecycle - -def _run_proxy_thread(): +def _run_proxy_thread() -> None: global _async_stop - loop = _asyncio.new_event_loop() _asyncio.set_event_loop(loop) - stop_ev = _asyncio.Event() _async_stop = (loop, stop_ev) - try: - loop.run_until_complete( - tg_ws_proxy._run(stop_event=stop_ev)) + loop.run_until_complete(tg_ws_proxy._run(stop_event=stop_ev)) except Exception as exc: log.error("Proxy thread crashed: %s", exc) if "Address already in use" in str(exc): @@ -292,49 +161,28 @@ def _run_proxy_thread(): "Не удалось запустить прокси:\n" "Порт уже используется другим приложением.\n\n" "Закройте приложение, использующее этот порт, " - "или измените порт в настройках прокси и перезапустите.") + "или измените порт в настройках прокси и перезапустите." + ) finally: loop.close() _async_stop = None -def start_proxy(): - global _proxy_thread, _config +def _start_proxy() -> None: + global _proxy_thread if _proxy_thread and _proxy_thread.is_alive(): log.info("Proxy already running") return - - cfg = _config - port = cfg.get("port", DEFAULT_CONFIG["port"]) - host = cfg.get("host", DEFAULT_CONFIG["host"]) - secret = cfg.get("secret", DEFAULT_CONFIG["secret"]) - dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"]) - buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"]) - pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"]) - - try: - dc_redirects = tg_ws_proxy.parse_dc_ip_list(dc_ip_list) - except ValueError as e: - log.error("Bad config dc_ip: %s", e) - _show_error(f"Ошибка конфигурации:\n{e}") + if not apply_proxy_config(_config): + _show_error("Ошибка конфигурации DC → IP.") return - - proxy_config.port = port - proxy_config.host = host - proxy_config.secret = secret - proxy_config.dc_redirects = dc_redirects - proxy_config.buffer_size = max(4, buf_kb) * 1024 - proxy_config.pool_size = max(0, pool_size) - - log.info("Starting proxy on %s:%d ...", host, port) - - _proxy_thread = threading.Thread( - target=_run_proxy_thread, - daemon=True, name="proxy") + pc = tg_ws_proxy.proxy_config + log.info("Starting proxy on %s:%d ...", pc.host, pc.port) + _proxy_thread = threading.Thread(target=_run_proxy_thread, daemon=True, name="proxy") _proxy_thread.start() -def stop_proxy(): +def _stop_proxy() -> None: global _proxy_thread, _async_stop if _async_stop: loop, stop_ev = _async_stop @@ -345,24 +193,21 @@ def stop_proxy(): log.info("Proxy stopped") -def restart_proxy(): +def _restart_proxy() -> None: log.info("Restarting proxy...") - stop_proxy() + _stop_proxy() time.sleep(0.3) - start_proxy() + _start_proxy() -# Menu callbacks +# menu callbacks -def _on_open_in_telegram(_=None): - host = _config.get("host", DEFAULT_CONFIG["host"]) - port = _config.get("port", DEFAULT_CONFIG["port"]) - secret = _config.get("secret", DEFAULT_CONFIG["secret"]) - url = f"tg://proxy?server={host}&port={port}&secret=dd{secret}" +def _on_open_in_telegram(_=None) -> None: + url = tg_proxy_url(_config) log.info("Opening %s", url) try: - result = subprocess.call(['open', url]) + result = subprocess.call(["open", url]) if result != 0: raise RuntimeError("open command failed") except Exception: @@ -376,67 +221,45 @@ def _on_open_in_telegram(_=None): if pyperclip: pyperclip.copy(url) else: - subprocess.run(['pbcopy'], input=url.encode(), - check=True) + subprocess.run(["pbcopy"], input=url.encode(), check=True) _show_info( "Не удалось открыть Telegram автоматически.\n\n" - f"Ссылка скопирована в буфер обмена:\n{url}") + f"Ссылка скопирована в буфер обмена:\n{url}" + ) except Exception as exc: log.error("Clipboard copy failed: %s", exc) _show_error(f"Не удалось скопировать ссылку:\n{exc}") -def _on_restart(_=None): - def _do_restart(): +def _on_restart(_=None) -> None: + def _do(): global _config _config = load_config() if _app: _app.update_menu_title() - restart_proxy() + _restart_proxy() - threading.Thread(target=_do_restart, daemon=True).start() + threading.Thread(target=_do, daemon=True).start() -def _on_open_logs(_=None): +def _on_open_logs(_=None) -> None: log.info("Opening log file: %s", LOG_FILE) if LOG_FILE.exists(): - subprocess.call(['open', str(LOG_FILE)]) + subprocess.call(["open", str(LOG_FILE)]) else: _show_info("Файл логов ещё не создан.") -# Show a native text input dialog. Returns None if cancelled. -def _osascript_input(prompt: str, default: str, - title: str = "TG WS Proxy") -> Optional[str]: - prompt_esc = _escape_osascript_text(prompt) - default_esc = _escape_osascript_text(default) - title_esc = _escape_osascript_text(title) - r = subprocess.run( - ['osascript', '-e', - f'text returned of (display dialog "{prompt_esc}" ' - f'default answer "{default_esc}" ' - f'with title "{title_esc}" ' - f'buttons {{"Закрыть", "OK"}} ' - f'default button "OK" cancel button "Закрыть")'], - capture_output=True, text=True) - if r.returncode != 0: - return None - return r.stdout.rstrip("\r\n") - -def _on_edit_config(_=None): +def _on_edit_config(_=None) -> None: threading.Thread(target=_edit_config_dialog, daemon=True).start() def _check_updates_menu_title() -> str: on = bool(_config.get("check_updates", True)) - return ( - "✓ Проверять обновления при запуске" - if on - else "Проверять обновления при запуске (выкл)" - ) + return "✓ Проверять обновления при запуске" if on else "Проверять обновления при запуске (выкл)" -def _toggle_check_updates(_=None): +def _toggle_check_updates(_=None) -> None: global _config _config["check_updates"] = not bool(_config.get("check_updates", True)) save_config(_config) @@ -444,12 +267,15 @@ def _toggle_check_updates(_=None): _app._check_updates_item.title = _check_updates_menu_title() -def _on_open_release_page(_=None): +def _on_open_release_page(_=None) -> None: from utils.update_check import RELEASES_PAGE_URL webbrowser.open(RELEASES_PAGE_URL) -def _maybe_notify_update_async(): +# update check + + +def _maybe_notify_update_async() -> None: def _work(): time.sleep(1.5) if _exiting: @@ -465,8 +291,7 @@ def _maybe_notify_update_async(): url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL ver = st.get("latest") or "?" if _ask_yes_no( - f"Доступна новая версия: {ver}\n\n" - f"Открыть страницу релиза в браузере?", + f"Доступна новая версия: {ver}\n\nОткрыть страницу релиза в браузере?", "TG WS Proxy — обновление", ): webbrowser.open(url) @@ -476,18 +301,16 @@ def _maybe_notify_update_async(): threading.Thread(target=_work, daemon=True, name="update-check").start() -# Settings via native macOS dialogs -def _edit_config_dialog(): +# settings dialog + + +def _edit_config_dialog() -> None: cfg = load_config() - # Host - host = _osascript_input( - "IP-адрес прокси:", - cfg.get("host", DEFAULT_CONFIG["host"])) + host = _osascript_input("IP-адрес прокси:", cfg.get("host", DEFAULT_CONFIG["host"])) if host is None: return host = host.strip() - import socket as _sock try: _sock.inet_aton(host) @@ -495,10 +318,7 @@ def _edit_config_dialog(): _show_error("Некорректный IP-адрес.") return - # Port - port_str = _osascript_input( - "Порт прокси:", - str(cfg.get("port", DEFAULT_CONFIG["port"]))) + port_str = _osascript_input("Порт прокси:", str(cfg.get("port", DEFAULT_CONFIG["port"]))) if port_str is None: return try: @@ -509,10 +329,9 @@ def _edit_config_dialog(): _show_error("Порт должен быть числом 1-65535") return - # Secret secret_str = _osascript_input( - "MTProto Secret (32 hex символа):", - cfg.get("secret", DEFAULT_CONFIG["secret"])) + "MTProto Secret (32 hex символа):", cfg.get("secret", DEFAULT_CONFIG["secret"]) + ) if secret_str is None: return secret_str = secret_str.strip().lower() @@ -520,42 +339,39 @@ def _edit_config_dialog(): _show_error("Secret должен быть строкой из 32 шестнадцатеричных символов.") return - # DC-IP mappings dc_default = ", ".join(cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])) dc_str = _osascript_input( "DC → IP маппинги (через запятую, формат DC:IP):\n" "Например: 2:149.154.167.220, 4:149.154.167.220", - dc_default) + dc_default, + ) if dc_str is None: return - dc_lines = [s.strip() for s in dc_str.replace(',', '\n').splitlines() - if s.strip()] + dc_lines = [s.strip() for s in dc_str.replace(",", "\n").splitlines() if s.strip()] try: tg_ws_proxy.parse_dc_ip_list(dc_lines) except ValueError as e: _show_error(str(e)) return - # Verbose verbose = _ask_yes_no_close("Включить подробное логирование (verbose)?") if verbose is None: return - # Advanced settings adv_str = _osascript_input( "Расширенные настройки (буфер KB, WS пул, лог MB):\n" "Формат: buf_kb,pool_size,log_max_mb", f"{cfg.get('buf_kb', DEFAULT_CONFIG['buf_kb'])}," f"{cfg.get('pool_size', DEFAULT_CONFIG['pool_size'])}," - f"{cfg.get('log_max_mb', DEFAULT_CONFIG['log_max_mb'])}") + f"{cfg.get('log_max_mb', DEFAULT_CONFIG['log_max_mb'])}", + ) if adv_str is None: return adv = {} if adv_str: - parts = [s.strip() for s in adv_str.split(',')] - keys = [("buf_kb", int), ("pool_size", int), - ("log_max_mb", float)] + parts = [s.strip() for s in adv_str.split(",")] + keys = [("buf_kb", int), ("pool_size", int), ("log_max_mb", float)] for i, (k, typ) in enumerate(keys): if i < len(parts): try: @@ -572,6 +388,7 @@ def _edit_config_dialog(): "buf_kb": adv.get("buf_kb", cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])), "pool_size": adv.get("pool_size", cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])), "log_max_mb": adv.get("log_max_mb", cfg.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"])), + "check_updates": cfg.get("check_updates", True), } save_config(new_cfg) log.info("Config saved: %s", new_cfg) @@ -581,23 +398,22 @@ def _edit_config_dialog(): if _app: _app.update_menu_title() - if _ask_yes_no_close( - "Настройки сохранены.\n\nПерезапустить прокси сейчас?"): - restart_proxy() + if _ask_yes_no_close("Настройки сохранены.\n\nПерезапустить прокси сейчас?"): + _restart_proxy() -# First-run & IPv6 dialogs +# first run & ipv6 -def _show_first_run(): - _ensure_dirs() + +def _show_first_run() -> None: + ensure_dirs() if FIRST_RUN_MARKER.exists(): return host = _config.get("host", DEFAULT_CONFIG["host"]) port = _config.get("port", DEFAULT_CONFIG["port"]) secret = _config.get("secret", DEFAULT_CONFIG["secret"]) - - tg_url = f"tg://proxy?server={host}&port={port}&secret=dd{secret}" + tg_url = tg_proxy_url(_config) text = ( f"Прокси запущен и работает в строке меню.\n\n" @@ -613,49 +429,48 @@ def _show_first_run(): ) FIRST_RUN_MARKER.touch() - if _ask_yes_no(text, "TG WS Proxy"): _on_open_in_telegram() -def _has_ipv6_enabled() -> bool: - import socket as _sock - try: - addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6) - for addr in addrs: - ip = addr[4][0] - if ip and not ip.startswith('::1') and not ip.startswith('fe80::1'): - return True - except Exception: - pass - try: - s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM) - s.bind(('::1', 0)) - s.close() - return True - except Exception: - return False - - -def _check_ipv6_warning(): - _ensure_dirs() +def _check_ipv6_warning() -> None: + ensure_dirs() if IPV6_WARN_MARKER.exists(): return - if not _has_ipv6_enabled(): + + import socket as _sock + has = False + try: + for addr in _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6): + ip = addr[4][0] + if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"): + has = True + break + except Exception: + pass + if not has: + try: + s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM) + s.bind(("::1", 0)) + s.close() + has = True + except Exception: + pass + if not has: return IPV6_WARN_MARKER.touch() - _show_info( "На вашем компьютере включена поддержка подключения по IPv6.\n\n" "Telegram может пытаться подключаться через IPv6, " "что не поддерживается и может привести к ошибкам.\n\n" "Если прокси не работает, попробуйте отключить " "попытку соединения по IPv6 в настройках прокси Telegram.\n\n" - "Это предупреждение будет показано только один раз.") + "Это предупреждение будет показано только один раз." + ) -# rumps menubar app +# rumps app _TgWsProxyAppBase = rumps.App if rumps else object @@ -663,34 +478,25 @@ _TgWsProxyAppBase = rumps.App if rumps else object class TgWsProxyApp(_TgWsProxyAppBase): def __init__(self): _ensure_menubar_icon() - icon_path = (str(MENUBAR_ICON_PATH) - if MENUBAR_ICON_PATH.exists() else None) + icon_path = str(MENUBAR_ICON_PATH) if MENUBAR_ICON_PATH.exists() else None host = _config.get("host", DEFAULT_CONFIG["host"]) port = _config.get("port", DEFAULT_CONFIG["port"]) link_host = tg_ws_proxy.get_link_host(host) self._open_tg_item = rumps.MenuItem( - f"Открыть в Telegram ({link_host}:{port})", - callback=_on_open_in_telegram) - self._restart_item = rumps.MenuItem( - "Перезапустить прокси", - callback=_on_restart) - self._settings_item = rumps.MenuItem( - "Настройки...", - callback=_on_edit_config) - self._logs_item = rumps.MenuItem( - "Открыть логи", - callback=_on_open_logs) + f"Открыть в Telegram ({link_host}:{port})", callback=_on_open_in_telegram + ) + self._restart_item = rumps.MenuItem("Перезапустить прокси", callback=_on_restart) + self._settings_item = rumps.MenuItem("Настройки...", callback=_on_edit_config) + self._logs_item = rumps.MenuItem("Открыть логи", callback=_on_open_logs) self._release_page_item = rumps.MenuItem( - "Страница релиза на GitHub…", - callback=_on_open_release_page) + "Страница релиза на GitHub…", callback=_on_open_release_page + ) self._check_updates_item = rumps.MenuItem( - _check_updates_menu_title(), - callback=_toggle_check_updates) - self._version_item = rumps.MenuItem( - f"Версия {__version__}", - callback=lambda _: None) + _check_updates_menu_title(), callback=_toggle_check_updates + ) + self._version_item = rumps.MenuItem(f"Версия {__version__}", callback=lambda _: None) super().__init__( "TG WS Proxy", @@ -708,18 +514,20 @@ class TgWsProxyApp(_TgWsProxyAppBase): self._check_updates_item, None, self._version_item, - ]) + ], + ) - def update_menu_title(self): + def update_menu_title(self) -> None: host = _config.get("host", DEFAULT_CONFIG["host"]) port = _config.get("port", DEFAULT_CONFIG["port"]) link_host = tg_ws_proxy.get_link_host(host) - - self._open_tg_item.title = ( - f"Открыть в Telegram ({link_host}:{port})") + self._open_tg_item.title = f"Открыть в Telegram ({link_host}:{port})" -def run_menubar(): +# entry point + + +def run_menubar() -> None: global _app, _config _config = load_config() @@ -731,26 +539,26 @@ def run_menubar(): except Exception: pass - setup_logging(_config.get("verbose", False), - log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"])) + setup_logging( + _config.get("verbose", False), + log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]), + ) log.info("TG WS Proxy версия %s, menubar app starting", __version__) log.info("Config: %s", _config) log.info("Log file: %s", LOG_FILE) if rumps is None or Image is None: log.error("rumps or Pillow not installed; running in console mode") - start_proxy() + _start_proxy() try: while True: time.sleep(1) except KeyboardInterrupt: - stop_proxy() + _stop_proxy() return - start_proxy() - + _start_proxy() _maybe_notify_update_async() - _show_first_run() _check_ipv6_warning() @@ -758,19 +566,18 @@ def run_menubar(): log.info("Menubar app running") _app.run() - stop_proxy() + _stop_proxy() log.info("Menubar app exited") -def main(): - if not _acquire_lock(): +def main() -> None: + if not acquire_lock("macos.py"): _show_info("Приложение уже запущено.") return - try: run_menubar() finally: - _release_lock() + release_lock() if __name__ == "__main__": diff --git a/ui/ctk_theme.py b/ui/ctk_theme.py index f814150..ad76fad 100644 --- a/ui/ctk_theme.py +++ b/ui/ctk_theme.py @@ -1,8 +1,3 @@ -""" -Общая светлая тема и фабрика окон CustomTkinter для tray-приложений (Windows / Linux). -Цвета и отступы задаются в одном месте — правки темы не дублируются по платформам. -""" - from __future__ import annotations import sys @@ -13,11 +8,7 @@ from typing import Any, Callable, Optional, Tuple _tk_variable_del_guard_installed = False -def _install_tkinter_variable_del_guard() -> None: - """ - Убирает «Exception ignored» при выходе процесса: Tcl уже разрушен, а GC ещё - вызывает Variable.__del__ (StringVar и т.д.) — напр. окно CTk в фоновом потоке. - """ +def install_tkinter_variable_del_guard() -> None: global _tk_variable_del_guard_installed if _tk_variable_del_guard_installed: return @@ -32,7 +23,6 @@ def _install_tkinter_variable_del_guard() -> None: tkinter.Variable.__del__ = _safe_variable_del # type: ignore[assignment] _tk_variable_del_guard_installed = True -# Размеры и отступы (единые для диалогов настроек и первого запуска) CONFIG_DIALOG_SIZE: Tuple[int, int] = (460, 560) CONFIG_DIALOG_FRAME_PAD: Tuple[int, int] = (20, 14) FIRST_RUN_SIZE: Tuple[int, int] = (520, 440) @@ -41,8 +31,6 @@ FIRST_RUN_FRAME_PAD: Tuple[int, int] = (28, 24) @dataclass(frozen=True) class CtkTheme: - """Палитра Telegram-style и семейства шрифтов для UI и моноширинного текста.""" - tg_blue: str = "#3390ec" tg_blue_hover: str = "#2b7cd4" bg: str = "#ffffff" @@ -71,34 +59,6 @@ def center_ctk_geometry(root: Any, width: int, height: int) -> None: root.geometry(f"{width}x{height}+{(sw - width) // 2}+{(sh - height) // 2}") -def create_ctk_root( - ctk: Any, - *, - title: str, - width: int, - height: int, - theme: CtkTheme, - topmost: bool = True, - after_create: Optional[Callable[[Any], None]] = None, -) -> Any: - """ - Создаёт CTk: глобальная тема, заголовок, без ресайза, по центру экрана, фон из палитры. - after_create — опционально: установка иконки окна (различается по ОС). - """ - _install_tkinter_variable_del_guard() - apply_ctk_appearance(ctk) - root = ctk.CTk() - root.title(title) - root.resizable(False, False) - if topmost: - root.attributes("-topmost", True) - center_ctk_geometry(root, width, height) - root.configure(fg_color=theme.bg) - if after_create: - after_create(root) - return root - - def create_ctk_toplevel( ctk: Any, *, @@ -119,7 +79,17 @@ def create_ctk_toplevel( root.lift() root.focus_force() if after_create: - after_create(root) + _after_id = root.after(300, lambda: after_create(root)) + _orig_destroy = root.destroy + + def _safe_destroy(): + try: + root.after_cancel(_after_id) + except Exception: + pass + _orig_destroy() + + root.destroy = _safe_destroy return root diff --git a/ui/ctk_tooltip.py b/ui/ctk_tooltip.py index 16c6da7..d6d74ed 100644 --- a/ui/ctk_tooltip.py +++ b/ui/ctk_tooltip.py @@ -1,7 +1,3 @@ -""" -Всплывающие подсказки для CustomTkinter / tk: задержка, Toplevel без рамки, wrap. -""" - from __future__ import annotations import tkinter as tk @@ -9,8 +5,6 @@ from typing import Any, List, Optional class CtkTooltip: - """Показ текста при наведении на виджет.""" - def __init__( self, widget: Any, @@ -31,6 +25,8 @@ class CtkTooltip: widget.bind("", self._on_destroy, add="+") def _schedule(self, _event: Any = None) -> None: + if self.widget is None: + return self._cancel_after() self._after_id = self.widget.after(self.delay_ms, self._show) @@ -89,6 +85,7 @@ class CtkTooltip: def _on_destroy(self, _event: Any = None) -> None: self._hide() + self.widget = None def _is_windows() -> bool: @@ -104,11 +101,9 @@ def attach_ctk_tooltip( delay_ms: int = 450, wraplength: int = 320, ) -> None: - """Повесить подсказку на виджет (CTk или tk).""" CtkTooltip(widget, text, delay_ms=delay_ms, wraplength=wraplength) def attach_tooltip_to_widgets(widgets: List[Any], text: str, **kwargs: Any) -> None: - """Одна и та же подсказка на несколько виджетов (подпись + поле).""" for w in widgets: attach_ctk_tooltip(w, text, **kwargs) diff --git a/ui/ctk_tray_ui.py b/ui/ctk_tray_ui.py index cd26981..95e85fd 100644 --- a/ui/ctk_tray_ui.py +++ b/ui/ctk_tray_ui.py @@ -1,8 +1,3 @@ -""" -Общая разметка CustomTkinter для tray (Windows / Linux): настройки и первый запуск. -Логика сохранения и колбэки остаются в платформенных модулях. -""" - from __future__ import annotations import os @@ -21,7 +16,6 @@ from ui.ctk_theme import ( ) from ui.ctk_tooltip import attach_ctk_tooltip, attach_tooltip_to_widgets -# Подсказки для формы настроек (новые пользователи) _TIP_HOST = ( "Адрес, на котором прокси принимает подключения.\n" "Обычно 127.0.0.1 — локальная сеть, 0.0.0.0 - все интерфейсы" @@ -30,9 +24,7 @@ _TIP_PORT = ( "Порт прокси. В Telegram Desktop в настройках прокси должен быть " "указан тот же порт" ) -_TIP_SECRET = ( - "Секретный ключ для авторизации клиентов\n" -) +_TIP_SECRET = "Секретный ключ для авторизации клиентов" _TIP_DC = ( "Соответствие номера датацентра Telegram (DC) и IP-адреса сервера.\n" "Каждая строка: «номер:IP», например 2:149.154.167.220. " @@ -57,14 +49,60 @@ _TIP_AUTOSTART = ( "Запускать TG WS Proxy при входе в Windows. " "Если вы переместите программу в другую папку, автозапуск сбросится" ) -_TIP_CHECK_UPDATES = ( - "При запуске проверять наличие обновлений" -) +_TIP_CHECK_UPDATES = "При запуске проверять наличие обновлений" _TIP_SAVE = "Сохранить настройки" _TIP_CANCEL = "Закрыть окно без сохранения изменений" -# Внутренняя ширина полей относительно ширины окна настроек (см. CONFIG_DIALOG_SIZE) -_CONFIG_FORM_INNER_WIDTH = 396 +_INNER_W = 396 + + +def _entry(ctk, parent, theme, *, var=None, width=0, height=36, radius=10, **kw): + opts = dict( + font=(theme.ui_font_family, 13), corner_radius=radius, + fg_color=theme.bg, border_color=theme.field_border, + border_width=1, text_color=theme.text_primary, + ) + if var is not None: + opts["textvariable"] = var + if width: + opts["width"] = width + opts["height"] = height + opts.update(kw) + return ctk.CTkEntry(parent, **opts) + + +def _checkbox(ctk, parent, theme, text, variable): + return ctk.CTkCheckBox( + parent, text=text, variable=variable, + font=(theme.ui_font_family, 13), text_color=theme.text_primary, + fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover, + corner_radius=6, border_width=2, border_color=theme.field_border, + ) + + +def _label(ctk, parent, theme, text, *, size=12, bold=False, secondary=True, **kw): + weight = "bold" if bold else "normal" + return ctk.CTkLabel( + parent, text=text, + font=(theme.ui_font_family, size, weight), + text_color=theme.text_secondary if secondary else theme.text_primary, + anchor="w", **kw, + ) + + +def _labeled_entry(ctk, parent, theme, label_text, value, *, tip="", width=0, pack_fill=False): + col = ctk.CTkFrame(parent, fg_color="transparent") + lbl = _label(ctk, col, theme, label_text) + lbl.pack(anchor="w", pady=(0, 2)) + var = ctk.StringVar(value=str(value)) + ent = _entry(ctk, col, theme, var=var, width=width) + if pack_fill: + ent.pack(fill="x") + else: + ent.pack(anchor="w") + if tip: + attach_tooltip_to_widgets([lbl, ent, col], tip) + return col, var def tray_settings_scroll_and_footer( @@ -72,10 +110,6 @@ def tray_settings_scroll_and_footer( content_parent: Any, theme: CtkTheme, ) -> Tuple[Any, Any]: - """ - Нижняя панель под кнопки и прокручиваемая область для формы (форма не обрезает кнопки). - Возвращает (scroll_frame, footer_frame). - """ footer = ctk.CTkFrame(content_parent, fg_color=theme.bg) footer.pack(side="bottom", fill="x") scroll = ctk.CTkScrollableFrame( @@ -97,22 +131,12 @@ def _config_section( *, bottom_spacer: int = 6, ) -> Any: - """Заголовок секции и карточка с рамкой для группировки полей.""" wrap = ctk.CTkFrame(parent, fg_color="transparent") wrap.pack(fill="x", pady=(0, bottom_spacer)) - ctk.CTkLabel( - wrap, - text=title, - font=(theme.ui_font_family, 12, "bold"), - text_color=theme.text_primary, - anchor="w", - ).pack(anchor="w", pady=(0, 2)) + _label(ctk, wrap, theme, title, secondary=False, bold=True).pack(anchor="w", pady=(0, 2)) card = ctk.CTkFrame( - wrap, - fg_color=theme.field_bg, - corner_radius=10, - border_width=1, - border_color=theme.field_border, + wrap, fg_color=theme.field_bg, corner_radius=10, + border_width=1, border_color=theme.field_border, ) card.pack(fill="x") inner = ctk.CTkFrame(card, fg_color="transparent") @@ -143,153 +167,67 @@ def install_tray_config_form( show_autostart: bool = False, autostart_value: bool = False, ) -> TrayConfigFormWidgets: - """Поля настроек прокси внутри уже созданного `frame`.""" header = ctk.CTkFrame(frame, fg_color="transparent") header.pack(fill="x", pady=(0, 2)) ctk.CTkLabel( - header, - text="Настройки прокси", + header, text="Настройки прокси", font=(theme.ui_font_family, 17, "bold"), - text_color=theme.text_primary, - anchor="w", + text_color=theme.text_primary, anchor="w", ).pack(side="left") ctk.CTkLabel( - header, - text=f"v{__version__}", + header, text=f"v{__version__}", font=(theme.ui_font_family, 12), - text_color=theme.text_secondary, - anchor="e", + text_color=theme.text_secondary, anchor="e", ).pack(side="right") - inner_w = _CONFIG_FORM_INNER_WIDTH - conn = _config_section(ctk, frame, theme, "Подключение MTProto") host_row = ctk.CTkFrame(conn, fg_color="transparent") host_row.pack(fill="x") - host_col = ctk.CTkFrame(host_row, fg_color="transparent") + host_col, host_var = _labeled_entry( + ctk, host_row, theme, "IP-адрес", + cfg.get("host", default_config["host"]), + tip=_TIP_HOST, width=160, pack_fill=True, + ) host_col.pack(side="left", fill="x", expand=True, padx=(0, 10)) - host_lbl = ctk.CTkLabel( - host_col, - text="IP-адрес", - font=(theme.ui_font_family, 12), - text_color=theme.text_secondary, - anchor="w", - ) - host_lbl.pack(anchor="w", pady=(0, 2)) - host_var = ctk.StringVar(value=cfg.get("host", default_config["host"])) - host_entry = ctk.CTkEntry( - host_col, - textvariable=host_var, - width=160, - height=36, - font=(theme.ui_font_family, 13), - corner_radius=10, - fg_color=theme.bg, - border_color=theme.field_border, - border_width=1, - text_color=theme.text_primary, - ) - host_entry.pack(fill="x", pady=(0, 0)) - attach_tooltip_to_widgets([host_lbl, host_entry, host_col], _TIP_HOST) - port_col = ctk.CTkFrame(host_row, fg_color="transparent") + port_col, port_var = _labeled_entry( + ctk, host_row, theme, "Порт", + cfg.get("port", default_config["port"]), + tip=_TIP_PORT, width=100, + ) port_col.pack(side="left") - port_lbl = ctk.CTkLabel( - port_col, - text="Порт", - font=(theme.ui_font_family, 12), - text_color=theme.text_secondary, - anchor="w", - ) - port_lbl.pack(anchor="w", pady=(0, 2)) - port_var = ctk.StringVar(value=str(cfg.get("port", default_config["port"]))) - port_entry = ctk.CTkEntry( - port_col, - textvariable=port_var, - width=100, - height=36, - font=(theme.ui_font_family, 13), - corner_radius=10, - fg_color=theme.bg, - border_color=theme.field_border, - border_width=1, - text_color=theme.text_primary, - ) - port_entry.pack(anchor="w") - attach_tooltip_to_widgets([port_lbl, port_entry, port_col], _TIP_PORT) secret_row = ctk.CTkFrame(conn, fg_color="transparent") secret_row.pack(fill="x") - secret_col = ctk.CTkFrame(secret_row, fg_color="transparent") + secret_col, secret_var = _labeled_entry( + ctk, secret_row, theme, "Secret", + cfg.get("secret", default_config["secret"]), + tip=_TIP_SECRET, width=160, pack_fill=True, + ) secret_col.pack(side="left", fill="x", expand=True, padx=(0, 10)) - secret_lbl = ctk.CTkLabel( - secret_col, - text="Secret", - font=(theme.ui_font_family, 12), - text_color=theme.text_secondary, - anchor="w", - ) - secret_lbl.pack(anchor="w", pady=(0, 2)) - secret_var = ctk.StringVar(value=cfg.get("secret", default_config["secret"])) - secret_entry = ctk.CTkEntry( - secret_col, - textvariable=secret_var, - width=160, - height=36, - font=(theme.ui_font_family, 13), - corner_radius=10, - fg_color=theme.bg, - border_color=theme.field_border, - border_width=1, - text_color=theme.text_primary, - ) - secret_entry.pack(fill="x", pady=(0, 0)) - attach_tooltip_to_widgets([secret_lbl, secret_entry, secret_col], _TIP_SECRET) regen_col = ctk.CTkFrame(secret_row, fg_color="transparent") regen_col.pack(side="left", anchor="s") - ctk.CTkLabel( - regen_col, - text="", - font=(theme.ui_font_family, 12), - ).pack(pady=(0, 2)) + ctk.CTkLabel(regen_col, text="", font=(theme.ui_font_family, 12)).pack(pady=(0, 2)) ctk.CTkButton( - regen_col, - text="↺", - width=36, - height=36, - font=(theme.ui_font_family, 18), - corner_radius=10, - fg_color=theme.tg_blue, - hover_color=theme.tg_blue_hover, - text_color="#ffffff", - border_width=1, - border_color=theme.field_border, + regen_col, text="↺", width=36, height=36, + font=(theme.ui_font_family, 18), corner_radius=10, + fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover, + text_color="#ffffff", border_width=1, border_color=theme.field_border, command=lambda: secret_var.set(os.urandom(16).hex()), ).pack() dc_inner = _config_section(ctk, frame, theme, "Датацентры Telegram (DC → IP)") - dc_lbl = ctk.CTkLabel( - dc_inner, - text="По одному правилу на строку, формат: номер:IP", - font=(theme.ui_font_family, 11), - text_color=theme.text_secondary, - anchor="w", - ) + dc_lbl = _label(ctk, dc_inner, theme, "По одному правилу на строку, формат: номер:IP", size=11) dc_lbl.pack(anchor="w", pady=(0, 4)) dc_textbox = ctk.CTkTextbox( - dc_inner, - width=inner_w, - height=88, - font=(theme.mono_font_family, 12), - corner_radius=10, - fg_color=theme.bg, - border_color=theme.field_border, - border_width=1, - text_color=theme.text_primary, + dc_inner, width=_INNER_W, height=88, + font=(theme.mono_font_family, 12), corner_radius=10, + fg_color=theme.bg, border_color=theme.field_border, + border_width=1, text_color=theme.text_primary, ) dc_textbox.pack(fill="x") dc_textbox.insert("1.0", "\n".join(cfg.get("dc_ip", default_config["dc_ip"]))) @@ -298,18 +236,7 @@ def install_tray_config_form( log_inner = _config_section(ctk, frame, theme, "Логи и производительность") verbose_var = ctk.BooleanVar(value=cfg.get("verbose", False)) - verbose_cb = ctk.CTkCheckBox( - log_inner, - text="Подробное логирование (verbose)", - variable=verbose_var, - font=(theme.ui_font_family, 13), - text_color=theme.text_primary, - fg_color=theme.tg_blue, - hover_color=theme.tg_blue_hover, - corner_radius=6, - border_width=2, - border_color=theme.field_border, - ) + verbose_cb = _checkbox(ctk, log_inner, theme, "Подробное логирование (verbose)", verbose_var) verbose_cb.pack(anchor="w", pady=(0, 6)) attach_ctk_tooltip(verbose_cb, _TIP_VERBOSE) @@ -321,33 +248,17 @@ def install_tray_config_form( ("Пул WebSocket-сессий (по умолчанию 4)", "pool_size", _TIP_POOL), ("Макс. размер лога, МБ (по умолчанию 5)", "log_max_mb", _TIP_LOG_MB), ] - for lbl, key, tip in adv_rows: - col_frame = ctk.CTkFrame(adv_frame, fg_color="transparent") - col_frame.pack(fill="x", pady=(0, 0 if key == "log_max_mb" else 5)) - adv_l = ctk.CTkLabel( - col_frame, - text=lbl, - font=(theme.ui_font_family, 11), - text_color=theme.text_secondary, - anchor="w", - ) + for label_text, key, tip in adv_rows: + col = ctk.CTkFrame(adv_frame, fg_color="transparent") + col.pack(fill="x", pady=(0, 0 if key == "log_max_mb" else 5)) + adv_l = _label(ctk, col, theme, label_text, size=11) adv_l.pack(anchor="w", pady=(0, 2)) - adv_e = ctk.CTkEntry( - col_frame, - width=inner_w, - height=32, - font=(theme.ui_font_family, 13), - corner_radius=8, - fg_color=theme.bg, - border_color=theme.field_border, - border_width=1, - text_color=theme.text_primary, - textvariable=ctk.StringVar( - value=str(cfg.get(key, default_config[key])) - ), + adv_e = _entry( + ctk, col, theme, width=_INNER_W, height=32, radius=8, + textvariable=ctk.StringVar(value=str(cfg.get(key, default_config[key]))), ) adv_e.pack(fill="x") - attach_tooltip_to_widgets([adv_l, adv_e, col_frame], tip) + attach_tooltip_to_widgets([adv_l, adv_e, col], tip) adv_entries = list(adv_frame.winfo_children()) adv_keys = ("buf_kb", "pool_size", "log_max_mb") @@ -355,22 +266,9 @@ def install_tray_config_form( upd_inner = _config_section(ctk, frame, theme, "Обновления") st = get_status() check_updates_var = ctk.BooleanVar( - value=bool( - cfg.get("check_updates", default_config.get("check_updates", True)) - ) - ) - upd_cb = ctk.CTkCheckBox( - upd_inner, - text="Проверять обновления при запуске", - variable=check_updates_var, - font=(theme.ui_font_family, 13), - text_color=theme.text_primary, - fg_color=theme.tg_blue, - hover_color=theme.tg_blue_hover, - corner_radius=6, - border_width=2, - border_color=theme.field_border, + value=bool(cfg.get("check_updates", default_config.get("check_updates", True))) ) + upd_cb = _checkbox(ctk, upd_inner, theme, "Проверять обновления при запуске", check_updates_var) upd_cb.pack(anchor="w", pady=(0, 6)) attach_ctk_tooltip(upd_cb, _TIP_CHECK_UPDATES) @@ -391,73 +289,38 @@ def install_tray_config_form( else: upd_status = "Установлена последняя известная версия с GitHub." - ctk.CTkLabel( - upd_inner, - text=upd_status, - font=(theme.ui_font_family, 11), - text_color=theme.text_secondary, - anchor="w", - justify="left", - wraplength=inner_w, - ).pack(anchor="w", pady=(0, 8)) + _label(ctk, upd_inner, theme, upd_status, size=11, + justify="left", wraplength=_INNER_W).pack(anchor="w", pady=(0, 8)) rel_url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL - open_rel_btn = ctk.CTkButton( - upd_inner, - text="Открыть страницу релиза", - height=32, - font=(theme.ui_font_family, 13), - corner_radius=8, - fg_color=theme.field_bg, - hover_color=theme.field_border, - text_color=theme.text_primary, - border_width=1, + ctk.CTkButton( + upd_inner, text="Открыть страницу релиза", height=32, + font=(theme.ui_font_family, 13), corner_radius=8, + fg_color=theme.field_bg, hover_color=theme.field_border, + text_color=theme.text_primary, border_width=1, border_color=theme.field_border, command=lambda u=rel_url: webbrowser.open(u), - ) - open_rel_btn.pack(anchor="w") + ).pack(anchor="w") autostart_var = None if show_autostart: - sys_inner = _config_section( - ctk, frame, theme, "Запуск Windows", bottom_spacer=4 - ) + sys_inner = _config_section(ctk, frame, theme, "Запуск Windows", bottom_spacer=4) autostart_var = ctk.BooleanVar(value=autostart_value) - as_cb = ctk.CTkCheckBox( - sys_inner, - text="Автозапуск при включении компьютера", - variable=autostart_var, - font=(theme.ui_font_family, 13), - text_color=theme.text_primary, - fg_color=theme.tg_blue, - hover_color=theme.tg_blue_hover, - corner_radius=6, - border_width=2, - border_color=theme.field_border, - ) + as_cb = _checkbox(ctk, sys_inner, theme, "Автозапуск при включении компьютера", autostart_var) as_cb.pack(anchor="w", pady=(0, 4)) - as_hint = ctk.CTkLabel( - sys_inner, - text="Если переместить программу в другую папку, запись автозапуска может сброситься.", - font=(theme.ui_font_family, 11), - text_color=theme.text_secondary, - anchor="w", - justify="left", - wraplength=inner_w, + as_hint = _label( + ctk, sys_inner, theme, + "Если переместить программу в другую папку, запись автозапуска может сброситься.", + size=11, justify="left", wraplength=_INNER_W, ) as_hint.pack(anchor="w") attach_tooltip_to_widgets([as_cb, as_hint], _TIP_AUTOSTART) return TrayConfigFormWidgets( - host_var=host_var, - port_var=port_var, - secret_var=secret_var, - dc_textbox=dc_textbox, - verbose_var=verbose_var, - adv_entries=adv_entries, - adv_keys=adv_keys, - autostart_var=autostart_var, - check_updates_var=check_updates_var, + host_var=host_var, port_var=port_var, secret_var=secret_var, + dc_textbox=dc_textbox, verbose_var=verbose_var, + adv_entries=adv_entries, adv_keys=adv_keys, + autostart_var=autostart_var, check_updates_var=check_updates_var, ) @@ -466,7 +329,6 @@ def merge_adv_from_form( base: Dict[str, Any], default_config: dict, ) -> None: - """Дополняет base значениями buf_kb / pool_size / log_max_mb (in-place).""" for i, key in enumerate(widgets.adv_keys): col_frame = widgets.adv_entries[i] entry = col_frame.winfo_children()[1] @@ -485,9 +347,6 @@ def validate_config_form( *, include_autostart: bool, ) -> Union[dict, str]: - """ - Возвращает словарь полей конфига или строку ошибки для показа пользователю. - """ import socket as _sock host_val = widgets.host_var.get().strip() @@ -578,9 +437,6 @@ def populate_first_run_window( secret: str, on_done: Callable[[bool], None], ) -> None: - """ - Содержимое окна первого запуска. on_done(open_in_telegram) — по «Начать» и по закрытию окна. - """ tg_url = f"tg://proxy?server={host}&port={port}&secret=dd{secret}" fpx, fpy = FIRST_RUN_FRAME_PAD frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy) @@ -620,12 +476,8 @@ def populate_first_run_window( corner_radius=0).pack(fill="x", pady=(0, 12)) auto_var = ctk.BooleanVar(value=True) - ctk.CTkCheckBox(frame, text="Открыть прокси в Telegram сейчас", - variable=auto_var, font=(theme.ui_font_family, 13), - text_color=theme.text_primary, - fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover, - corner_radius=6, border_width=2, - border_color=theme.field_border).pack(anchor="w", pady=(0, 16)) + _checkbox(ctk, frame, theme, "Открыть прокси в Telegram сейчас", + auto_var).pack(anchor="w", pady=(0, 16)) def on_ok(): on_done(auto_var.get()) diff --git a/utils/default_config.py b/utils/default_config.py index c1152a9..cb893f6 100644 --- a/utils/default_config.py +++ b/utils/default_config.py @@ -21,7 +21,6 @@ _TRAY_DEFAULTS_COMMON: Dict[str, Any] = { def default_tray_config() -> Dict[str, Any]: - """Новая копия конфига по умолчанию для текущей ОС.""" cfg = dict(_TRAY_DEFAULTS_COMMON) cfg["secret"] = os.urandom(16).hex() diff --git a/utils/tray_common.py b/utils/tray_common.py new file mode 100644 index 0000000..4e1fc98 --- /dev/null +++ b/utils/tray_common.py @@ -0,0 +1,460 @@ +from __future__ import annotations + +import asyncio +import json +import logging +import logging.handlers +import os +import socket as _socket +import sys +import threading +import time +from pathlib import Path +from typing import Any, Callable, Dict, Optional, Tuple + +import psutil + +import proxy.tg_ws_proxy as tg_ws_proxy +from proxy import __version__ +from utils.default_config import default_tray_config + +log = logging.getLogger("tg-ws-tray") + +APP_NAME = "TgWsProxy" + + +def _app_dir() -> Path: + if sys.platform == "win32": + return Path(os.environ.get("APPDATA", Path.home())) / APP_NAME + if sys.platform == "darwin": + return Path.home() / "Library" / "Application Support" / APP_NAME + return Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / APP_NAME + + +APP_DIR = _app_dir() +CONFIG_FILE = APP_DIR / "config.json" +LOG_FILE = APP_DIR / "proxy.log" +FIRST_RUN_MARKER = APP_DIR / ".first_run_done" +IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned" + +DEFAULT_CONFIG: Dict[str, Any] = default_tray_config() + +IS_FROZEN = bool(getattr(sys, "frozen", False)) + + +def ensure_dirs() -> None: + APP_DIR.mkdir(parents=True, exist_ok=True) + + +# single-instance lock + +_lock_file_path: Optional[Path] = None + + +def _same_process(meta: dict, proc: psutil.Process, script_hint: str) -> bool: + try: + lock_ct = float(meta.get("create_time", 0.0)) + if lock_ct > 0 and abs(lock_ct - proc.create_time()) > 1.0: + return False + except Exception: + return False + if IS_FROZEN: + return APP_NAME.lower() in proc.name().lower() + try: + for arg in proc.cmdline(): + if script_hint in arg: + return True + except Exception: + pass + return False + + +def acquire_lock(script_hint: str = "") -> bool: + global _lock_file_path + ensure_dirs() + for f in list(APP_DIR.glob("*.lock")): + try: + pid = int(f.stem) + except Exception: + f.unlink(missing_ok=True) + continue + meta: dict = {} + try: + raw = f.read_text(encoding="utf-8").strip() + if raw: + meta = json.loads(raw) + except Exception: + pass + try: + if _same_process(meta, psutil.Process(pid), script_hint): + return False + except Exception: + pass + f.unlink(missing_ok=True) + + lock_file = APP_DIR / f"{os.getpid()}.lock" + try: + proc = psutil.Process(os.getpid()) + lock_file.write_text( + json.dumps({"create_time": proc.create_time()}, ensure_ascii=False), + encoding="utf-8", + ) + except Exception: + lock_file.touch() + _lock_file_path = lock_file + return True + + +def release_lock() -> None: + global _lock_file_path + if _lock_file_path: + try: + _lock_file_path.unlink(missing_ok=True) + except Exception: + pass + _lock_file_path = None + + +# config + +def load_config() -> dict: + ensure_dirs() + if CONFIG_FILE.exists(): + try: + with open(CONFIG_FILE, "r", encoding="utf-8") as f: + data = json.load(f) + for k, v in DEFAULT_CONFIG.items(): + data.setdefault(k, v) + return data + except Exception as exc: + log.warning("Failed to load config: %s", exc) + return dict(DEFAULT_CONFIG) + + +def save_config(cfg: dict) -> None: + ensure_dirs() + with open(CONFIG_FILE, "w", encoding="utf-8") as f: + json.dump(cfg, f, indent=2, ensure_ascii=False) + + +# logging + +_LOG_FMT_FILE = "%(asctime)s %(levelname)-5s %(name)s %(message)s" +_LOG_FMT_CONSOLE = "%(asctime)s %(levelname)-5s %(message)s" + + +def setup_logging(verbose: bool = False, log_max_mb: float = 5) -> None: + ensure_dirs() + level = logging.DEBUG if verbose else logging.INFO + root = logging.getLogger() + root.setLevel(level) + + fh = logging.handlers.RotatingFileHandler( + str(LOG_FILE), + maxBytes=max(32 * 1024, int(log_max_mb * 1024 * 1024)), + backupCount=0, + encoding="utf-8", + ) + fh.setLevel(logging.DEBUG) + fh.setFormatter(logging.Formatter(_LOG_FMT_FILE, datefmt="%Y-%m-%d %H:%M:%S")) + root.addHandler(fh) + + if not IS_FROZEN: + ch = logging.StreamHandler(sys.stdout) + ch.setLevel(level) + ch.setFormatter(logging.Formatter(_LOG_FMT_CONSOLE, datefmt="%H:%M:%S")) + root.addHandler(ch) + + +# icon + +def make_icon_image(size: int = 64, *, color: Tuple[int, ...] = (0, 136, 204, 255)): + from PIL import Image, ImageDraw, ImageFont + + img = Image.new("RGBA", (size, size), (0, 0, 0, 0)) + draw = ImageDraw.Draw(img) + margin = 2 + draw.ellipse([margin, margin, size - margin, size - margin], fill=color) + + for path in _font_paths(): + try: + font = ImageFont.truetype(path, size=int(size * 0.55)) + break + except Exception: + continue + else: + font = ImageFont.load_default() + + bbox = draw.textbbox((0, 0), "T", font=font) + tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] + draw.text( + ((size - tw) // 2 - bbox[0], (size - th) // 2 - bbox[1]), + "T", + fill=(255, 255, 255, 255), + font=font, + ) + return img + + +def _font_paths(): + if sys.platform == "win32": + return ["arial.ttf"] + if sys.platform == "darwin": + return ["/System/Library/Fonts/Helvetica.ttc"] + return [ + "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", + "/usr/share/fonts/TTF/DejaVuSans-Bold.ttf", + ] + + +def load_icon(): + from PIL import Image + + icon_path = Path(__file__).parents[1] / "icon.ico" + if icon_path.exists(): + try: + return Image.open(str(icon_path)) + except Exception: + pass + return make_icon_image(64) + + +# proxy lifecycle + +_proxy_thread: Optional[threading.Thread] = None +_async_stop: Optional[Tuple[asyncio.AbstractEventLoop, asyncio.Event]] = None + + +def _run_proxy_thread(on_port_busy: Callable[[str], None]) -> None: + global _async_stop + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + stop_ev = asyncio.Event() + _async_stop = (loop, stop_ev) + + try: + loop.run_until_complete(tg_ws_proxy._run(stop_event=stop_ev)) + except Exception as exc: + log.error("Proxy thread crashed: %s", exc) + if "Address already in use" in str(exc) or "10048" in str(exc): + on_port_busy( + "Не удалось запустить прокси:\n" + "Порт уже используется другим приложением.\n\n" + "Закройте приложение, использующее этот порт, " + "или измените порт в настройках прокси и перезапустите." + ) + finally: + loop.close() + _async_stop = None + + +def apply_proxy_config(cfg: dict) -> bool: + dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"]) + try: + dc_redirects = tg_ws_proxy.parse_dc_ip_list(dc_ip_list) + except ValueError as e: + log.error("Bad config dc_ip: %s", e) + return False + + pc = tg_ws_proxy.proxy_config + pc.port = cfg.get("port", DEFAULT_CONFIG["port"]) + pc.host = cfg.get("host", DEFAULT_CONFIG["host"]) + pc.secret = cfg.get("secret", DEFAULT_CONFIG["secret"]) + pc.dc_redirects = dc_redirects + pc.buffer_size = max(4, cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])) * 1024 + pc.pool_size = max(0, cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])) + return True + + +def start_proxy(cfg: dict, on_error: Callable[[str], None]) -> None: + global _proxy_thread + if _proxy_thread and _proxy_thread.is_alive(): + log.info("Proxy already running") + return + + if not apply_proxy_config(cfg): + on_error("Ошибка конфигурации DC → IP.") + return + + pc = tg_ws_proxy.proxy_config + log.info("Starting proxy on %s:%d ...", pc.host, pc.port) + _proxy_thread = threading.Thread( + target=_run_proxy_thread, args=(on_error,), daemon=True, name="proxy" + ) + _proxy_thread.start() + + +def stop_proxy() -> None: + global _proxy_thread, _async_stop + if _async_stop: + loop, stop_ev = _async_stop + loop.call_soon_threadsafe(stop_ev.set) + if _proxy_thread: + _proxy_thread.join(timeout=5) + _proxy_thread = None + log.info("Proxy stopped") + + +def restart_proxy(cfg: dict, on_error: Callable[[str], None]) -> None: + log.info("Restarting proxy...") + stop_proxy() + time.sleep(0.3) + start_proxy(cfg, on_error) + + +def tg_proxy_url(cfg: dict) -> str: + host = cfg.get("host", DEFAULT_CONFIG["host"]) + port = cfg.get("port", DEFAULT_CONFIG["port"]) + secret = cfg.get("secret", DEFAULT_CONFIG["secret"]) + link_host = tg_ws_proxy.get_link_host(host) + return f"tg://proxy?server={link_host}&port={port}&secret=dd{secret}" + + +_IPV6_WARNING = ( + "На вашем компьютере включена поддержка подключения по IPv6.\n\n" + "Telegram может пытаться подключаться через IPv6, " + "что не поддерживается и может привести к ошибкам.\n\n" + "Если прокси не работает или в логах присутствуют ошибки, " + "связанные с попытками подключения по IPv6 - " + "попробуйте отключить в настройках прокси Telegram попытку соединения " + "по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 " + "в системе.\n\n" + "Это предупреждение будет показано только один раз." +) + + +def _has_ipv6() -> bool: + try: + for addr in _socket.getaddrinfo(_socket.gethostname(), None, _socket.AF_INET6): + ip = addr[4][0] + if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"): + return True + except Exception: + pass + try: + s = _socket.socket(_socket.AF_INET6, _socket.SOCK_STREAM) + s.bind(("::1", 0)) + s.close() + return True + except Exception: + return False + + +def check_ipv6_warning(show_info: Callable[[str, str], None]) -> None: + ensure_dirs() + if IPV6_WARN_MARKER.exists() or not _has_ipv6(): + return + IPV6_WARN_MARKER.touch() + threading.Thread( + target=lambda: show_info(_IPV6_WARNING, "TG WS Proxy"), + daemon=True, + ).start() + + +# update check + +def maybe_notify_update( + cfg: dict, + is_exiting: Callable[[], bool], + ask_open: Callable[[str, str], bool], +) -> None: + if not cfg.get("check_updates", True): + return + + def _work(): + time.sleep(1.5) + if is_exiting(): + return + try: + from utils.update_check import RELEASES_PAGE_URL, get_status, run_check + import webbrowser + + run_check(__version__) + st = get_status() + if not st.get("has_update"): + return + url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL + ver = st.get("latest") or "?" + if ask_open( + f"Доступна новая версия: {ver}\n\nОткрыть страницу релиза в браузере?", + "TG WS Proxy — обновление", + ): + webbrowser.open(url) + except Exception as exc: + log.debug("Update check failed: %s", exc) + + threading.Thread(target=_work, daemon=True, name="update-check").start() + + +# ctk thread (windows / linux) + +_ctk_root: Any = None +_ctk_root_ready = threading.Event() + + +def ensure_ctk_thread(ctk: Any) -> bool: + global _ctk_root + if ctk is None: + return False + if _ctk_root_ready.is_set(): + return True + + def _run(): + global _ctk_root + from ui.ctk_theme import apply_ctk_appearance, install_tkinter_variable_del_guard + + install_tkinter_variable_del_guard() + apply_ctk_appearance(ctk) + _ctk_root = ctk.CTk() + _ctk_root.withdraw() + _ctk_root_ready.set() + _ctk_root.mainloop() + + threading.Thread(target=_run, daemon=True, name="ctk-root").start() + _ctk_root_ready.wait(timeout=5.0) + return _ctk_root is not None + + +def ctk_run_dialog(build_fn: Callable[[threading.Event], None]) -> None: + if _ctk_root is None: + return + done = threading.Event() + + def _invoke(): + try: + build_fn(done) + except Exception: + log.exception("CTk dialog failed") + done.set() + + _ctk_root.after(0, _invoke) + done.wait() + import gc + gc.collect() + + +def quit_ctk() -> None: + if _ctk_root is not None: + try: + _ctk_root.after(0, _ctk_root.quit) + except Exception: + pass + + +# common bootstrap + +def bootstrap(cfg: dict) -> None: + save_config(cfg) + if LOG_FILE.exists(): + try: + LOG_FILE.unlink() + except Exception: + pass + setup_logging( + cfg.get("verbose", False), + log_max_mb=cfg.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]), + ) + log.info("TG WS Proxy версия %s starting", __version__) + log.info("Config: %s", cfg) + log.info("Log file: %s", LOG_FILE) diff --git a/windows.py b/windows.py index c140d81..d612bd0 100644 --- a/windows.py +++ b/windows.py @@ -1,20 +1,14 @@ from __future__ import annotations import ctypes -import ipaddress -import json -import logging -import logging.handlers import os -import winreg -import psutil import sys import threading import time import webbrowser -import asyncio as _asyncio +import winreg from pathlib import Path -from typing import Dict, Optional +from typing import Optional try: import pyperclip @@ -32,201 +26,62 @@ except ImportError: ctk = None try: - from PIL import Image, ImageDraw, ImageFont + from PIL import Image except ImportError: - Image = ImageDraw = ImageFont = None + Image = None import proxy.tg_ws_proxy as tg_ws_proxy -from proxy.tg_ws_proxy import proxy_config -from proxy import __version__ -from utils.default_config import default_tray_config +from utils.tray_common import ( + APP_NAME, DEFAULT_CONFIG, FIRST_RUN_MARKER, IS_FROZEN, LOG_FILE, + acquire_lock, bootstrap, check_ipv6_warning, ctk_run_dialog, + ensure_ctk_thread, ensure_dirs, load_config, load_icon, log, + maybe_notify_update, quit_ctk, release_lock, restart_proxy, + save_config, start_proxy, stop_proxy, tg_proxy_url, +) from ui.ctk_tray_ui import ( - install_tray_config_buttons, - install_tray_config_form, - populate_first_run_window, - tray_settings_scroll_and_footer, + install_tray_config_buttons, install_tray_config_form, + populate_first_run_window, tray_settings_scroll_and_footer, validate_config_form, ) from ui.ctk_theme import ( - CONFIG_DIALOG_FRAME_PAD, - CONFIG_DIALOG_SIZE, - FIRST_RUN_SIZE, - create_ctk_toplevel, - ctk_theme_for_platform, - main_content_frame, + CONFIG_DIALOG_FRAME_PAD, CONFIG_DIALOG_SIZE, FIRST_RUN_SIZE, + create_ctk_toplevel, ctk_theme_for_platform, main_content_frame, ) - -IS_FROZEN = bool(getattr(sys, "frozen", False)) - -APP_NAME = "TgWsProxy" -APP_DIR = Path(os.environ.get("APPDATA", Path.home())) / APP_NAME -CONFIG_FILE = APP_DIR / "config.json" -LOG_FILE = APP_DIR / "proxy.log" -FIRST_RUN_MARKER = APP_DIR / ".first_run_done" -IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned" - - -DEFAULT_CONFIG = default_tray_config() - - -_proxy_thread: Optional[threading.Thread] = None -_async_stop: Optional[object] = None _tray_icon: Optional[object] = None _config: dict = {} -_exiting: bool = False -_lock_file_path: Optional[Path] = None +_exiting = False -_ctk_root = None -_ctk_root_ready = threading.Event() +ICON_PATH = str(Path(__file__).parent / "icon.ico") -log = logging.getLogger("tg-ws-tray") +# win32 dialogs -_user32 = ctypes.windll.user32 -_user32.MessageBoxW.argtypes = [ - ctypes.c_void_p, - ctypes.c_wchar_p, - ctypes.c_wchar_p, - ctypes.c_uint, -] -_user32.MessageBoxW.restype = ctypes.c_int +_u32 = ctypes.windll.user32 +_u32.MessageBoxW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint] +_u32.MessageBoxW.restype = ctypes.c_int + +_MB_OK_ERR = 0x10 +_MB_OK_INFO = 0x40 +_MB_YESNO_Q = 0x24 +_IDYES = 6 -def _same_process(lock_meta: dict, proc: psutil.Process) -> bool: - try: - lock_ct = float(lock_meta.get("create_time", 0.0)) - proc_ct = float(proc.create_time()) - if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0: - return False - except Exception: - return False - - try: - for arg in proc.cmdline(): - if "windows.py" in arg: - return True - except Exception: - pass - - frozen = bool(getattr(sys, "frozen", False)) - if frozen: - return ( - os.path.basename(sys.executable).lower() == proc.name().lower() - ) - - return False +def _show_error(text: str, title: str = "TG WS Proxy — Ошибка") -> None: + _u32.MessageBoxW(None, text, title, _MB_OK_ERR) -def _release_lock(): - global _lock_file_path - if not _lock_file_path: - return - try: - _lock_file_path.unlink(missing_ok=True) - except Exception: - pass - _lock_file_path = None +def _show_info(text: str, title: str = "TG WS Proxy") -> None: + _u32.MessageBoxW(None, text, title, _MB_OK_INFO) -def _acquire_lock() -> bool: - global _lock_file_path - _ensure_dirs() - lock_files = list(APP_DIR.glob("*.lock")) - - for f in lock_files: - pid = None - meta: dict = {} - - try: - pid = int(f.stem) - except Exception: - f.unlink(missing_ok=True) - continue - - try: - raw = f.read_text(encoding="utf-8").strip() - if raw: - meta = json.loads(raw) - except Exception: - meta = {} - - try: - proc = psutil.Process(pid) - if _same_process(meta, proc): - return False - except Exception: - pass - - f.unlink(missing_ok=True) - - lock_file = APP_DIR / f"{os.getpid()}.lock" - try: - proc = psutil.Process(os.getpid()) - payload = { - "create_time": proc.create_time(), - } - lock_file.write_text(json.dumps(payload, ensure_ascii=False), - encoding="utf-8") - except Exception: - lock_file.touch() - - _lock_file_path = lock_file - return True +def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool: + return _u32.MessageBoxW(None, text, title, _MB_YESNO_Q) == _IDYES -def _ensure_dirs(): - APP_DIR.mkdir(parents=True, exist_ok=True) +# autostart (registry) - -def load_config() -> dict: - _ensure_dirs() - if CONFIG_FILE.exists(): - try: - with open(CONFIG_FILE, "r", encoding="utf-8") as f: - data = json.load(f) - for k, v in DEFAULT_CONFIG.items(): - data.setdefault(k, v) - return data - except Exception as exc: - log.warning("Failed to load config: %s", exc) - return dict(DEFAULT_CONFIG) - - -def save_config(cfg: dict): - _ensure_dirs() - with open(CONFIG_FILE, "w", encoding="utf-8") as f: - json.dump(cfg, f, indent=2, ensure_ascii=False) - - -def setup_logging(verbose: bool = False, log_max_mb: float = 5): - _ensure_dirs() - root = logging.getLogger() - root.setLevel(logging.DEBUG if verbose else logging.INFO) - - fh = logging.handlers.RotatingFileHandler( - str(LOG_FILE), - maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024), - backupCount=0, - encoding='utf-8', - ) - fh.setLevel(logging.DEBUG) - fh.setFormatter(logging.Formatter( - "%(asctime)s %(levelname)-5s %(name)s %(message)s", - datefmt="%Y-%m-%d %H:%M:%S")) - root.addHandler(fh) - - if not getattr(sys, "frozen", False): - ch = logging.StreamHandler(sys.stdout) - ch.setLevel(logging.DEBUG if verbose else logging.INFO) - ch.setFormatter(logging.Formatter( - "%(asctime)s %(levelname)-5s %(message)s", - datefmt="%H:%M:%S")) - root.addHandler(ch) - - -def _autostart_reg_name() -> str: - return APP_NAME +_RUN_KEY = r"Software\Microsoft\Windows\CurrentVersion\Run" def _supports_autostart() -> bool: @@ -239,297 +94,94 @@ def _autostart_command() -> str: def is_autostart_enabled() -> bool: try: - with winreg.OpenKey( - winreg.HKEY_CURRENT_USER, - r"Software\Microsoft\Windows\CurrentVersion\Run", - 0, - winreg.KEY_READ, - ) as k: - val, _ = winreg.QueryValueEx(k, _autostart_reg_name()) - stored = str(val).strip() - expected = _autostart_command().strip() - return stored == expected - except FileNotFoundError: - return False - except OSError: + with winreg.OpenKey(winreg.HKEY_CURRENT_USER, _RUN_KEY, 0, winreg.KEY_READ) as k: + val, _ = winreg.QueryValueEx(k, APP_NAME) + return str(val).strip() == _autostart_command().strip() + except (FileNotFoundError, OSError): return False def set_autostart_enabled(enabled: bool) -> None: try: - with winreg.CreateKey( - winreg.HKEY_CURRENT_USER, - r"Software\Microsoft\Windows\CurrentVersion\Run", - ) as k: + with winreg.CreateKey(winreg.HKEY_CURRENT_USER, _RUN_KEY) as k: if enabled: - winreg.SetValueEx( - k, - _autostart_reg_name(), - 0, - winreg.REG_SZ, - _autostart_command(), - ) + winreg.SetValueEx(k, APP_NAME, 0, winreg.REG_SZ, _autostart_command()) else: try: - winreg.DeleteValue(k, _autostart_reg_name()) + winreg.DeleteValue(k, APP_NAME) except FileNotFoundError: pass except OSError as exc: log.error("Failed to update autostart: %s", exc) _show_error( "Не удалось изменить автозапуск.\n\n" - "Попробуйте запустить приложение от имени пользователя с правами на реестр.\n\n" - f"Ошибка: {exc}" + "Попробуйте запустить приложение от имени пользователя " + f"с правами на реестр.\n\nОшибка: {exc}" ) -def _make_icon_image(size: int = 64): - if Image is None: - raise RuntimeError("Pillow is required for tray icon") - img = Image.new("RGBA", (size, size), (0, 0, 0, 0)) - draw = ImageDraw.Draw(img) - - margin = 2 - draw.ellipse([margin, margin, size - margin, size - margin], - fill=(0, 136, 204, 255)) - - try: - font = ImageFont.truetype("arial.ttf", size=int(size * 0.55)) - except Exception: - font = ImageFont.load_default() - bbox = draw.textbbox((0, 0), "T", font=font) - tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] - tx = (size - tw) // 2 - bbox[0] - ty = (size - th) // 2 - bbox[1] - draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font) +# tray callbacks - return img - - -def _load_icon(): - icon_path = Path(__file__).parent / "icon.ico" - if icon_path.exists() and Image: - try: - return Image.open(str(icon_path)) - except Exception: - pass - return _make_icon_image() - - - -def _run_proxy_thread(): - global _async_stop - - loop = _asyncio.new_event_loop() - _asyncio.set_event_loop(loop) - - stop_ev = _asyncio.Event() - _async_stop = (loop, stop_ev) - - try: - loop.run_until_complete( - tg_ws_proxy._run(stop_event=stop_ev)) - except Exception as exc: - log.error("Proxy thread crashed: %s", exc) - if "10048" in str(exc) or "Address already in use" in str(exc): - _show_error("Не удалось запустить прокси:\nПорт уже используется другим приложением.\n\nЗакройте приложение, использующее этот порт, или измените порт в настройках прокси и перезапустите.") - finally: - loop.close() - _async_stop = None - - -def start_proxy(): - global _proxy_thread, _config - if _proxy_thread and _proxy_thread.is_alive(): - log.info("Proxy already running") - return - - cfg = _config - port = cfg.get("port", DEFAULT_CONFIG["port"]) - host = cfg.get("host", DEFAULT_CONFIG["host"]) - secret = cfg.get("secret", DEFAULT_CONFIG["secret"]) - dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"]) - buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"]) - pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"]) - - try: - dc_redirects = tg_ws_proxy.parse_dc_ip_list(dc_ip_list) - except ValueError as e: - log.error("Bad config dc_ip: %s", e) - _show_error(f"Ошибка конфигурации:\n{e}") - return - - proxy_config.port = port - proxy_config.host = host - proxy_config.secret = secret - proxy_config.dc_redirects = dc_redirects - proxy_config.buffer_size = max(4, buf_kb) * 1024 - proxy_config.pool_size = max(0, pool_size) - - log.info("Starting proxy on %s:%d ...", host, port) - - _proxy_thread = threading.Thread( - target=_run_proxy_thread, - daemon=True, name="proxy") - _proxy_thread.start() - - -def stop_proxy(): - global _proxy_thread, _async_stop - if _async_stop: - loop, stop_ev = _async_stop - loop.call_soon_threadsafe(stop_ev.set) - if _proxy_thread: - _proxy_thread.join(timeout=5) - if _proxy_thread.is_alive(): - log.warning( - "Proxy thread did not finish within timeout; " - "the process may still exit shortly") - _proxy_thread = None - log.info("Proxy stopped") - - -def restart_proxy(): - log.info("Restarting proxy...") - stop_proxy() - time.sleep(0.3) - start_proxy() - - -def _show_error(text: str, title: str = "TG WS Proxy — Ошибка"): - _user32.MessageBoxW(None, text, title, 0x10) - - -def _show_info(text: str, title: str = "TG WS Proxy"): - _user32.MessageBoxW(None, text, title, 0x40) - - -def _ask_open_release_page(latest_version: str, url: str) -> bool: - """Win32 Yes/No: открыть страницу релиза.""" - MB_YESNO = 0x4 - MB_ICONQUESTION = 0x20 - IDYES = 6 - text = ( - f"Доступна новая версия: {latest_version}\n\n" - f"Открыть страницу релиза в браузере?" - ) - r = _user32.MessageBoxW( - None, - text, - "TG WS Proxy — обновление", - MB_YESNO | MB_ICONQUESTION, - ) - return r == IDYES - - -def _maybe_notify_update_async(): - """ - Фоновая проверка GitHub Releases и уведомление (не блокирует трей). - """ - def _work(): - time.sleep(1.5) - if _exiting: - return - if not _config.get("check_updates", True): - return - try: - from utils.update_check import RELEASES_PAGE_URL, get_status, run_check - run_check(__version__) - st = get_status() - if not st.get("has_update"): - return - url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL - ver = st.get("latest") or "?" - if _ask_open_release_page(str(ver), url): - webbrowser.open(url) - except Exception as exc: - log.debug("Update check failed: %s", exc) - - threading.Thread(target=_work, daemon=True, name="update-check").start() - - -def _ensure_ctk_thread() -> bool: - """Start the persistent hidden CTk root in its own thread (once).""" - global _ctk_root - if ctk is None: - return False - if _ctk_root_ready.is_set(): - return True - - def _run(): - global _ctk_root - from ui.ctk_theme import ( - apply_ctk_appearance, - _install_tkinter_variable_del_guard, - ) - _install_tkinter_variable_del_guard() - apply_ctk_appearance(ctk) - _ctk_root = ctk.CTk() - _ctk_root.withdraw() - _ctk_root_ready.set() - _ctk_root.mainloop() - - threading.Thread(target=_run, daemon=True, name="ctk-root").start() - _ctk_root_ready.wait(timeout=5.0) - return _ctk_root is not None - - -def _ctk_run_dialog(build_fn) -> None: - """Schedule build_fn(done_event) on the CTk thread and block until done_event is set.""" - if _ctk_root is None: - return - done = threading.Event() - - def _invoke(): - try: - build_fn(done) - except Exception: - log.exception("CTk dialog failed") - done.set() - - _ctk_root.after(0, _invoke) - done.wait() - - -def _on_open_in_telegram(icon=None, item=None): - host = _config.get("host", DEFAULT_CONFIG["host"]) - port = _config.get("port", DEFAULT_CONFIG["port"]) - secret = _config.get("secret", DEFAULT_CONFIG["secret"]) - - url = f"tg://proxy?server={host}&port={port}&secret=dd{secret}" +def _on_open_in_telegram(icon=None, item=None) -> None: + url = tg_proxy_url(_config) log.info("Opening %s", url) try: - result = webbrowser.open(url) - if not result: - raise RuntimeError("webbrowser.open returned False") + if not webbrowser.open(url): + raise RuntimeError except Exception: log.info("Browser open failed, copying to clipboard") if pyperclip is None: _show_error( "Не удалось открыть Telegram автоматически.\n\n" - f"Установите пакет pyperclip для копирования в буфер или откройте вручную:\n{url}") + f"Установите пакет pyperclip для копирования в буфер или откройте вручную:\n{url}" + ) return try: pyperclip.copy(url) _show_info( - f"Не удалось открыть Telegram автоматически.\n\n" - f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}", - "TG WS Proxy") + "Не удалось открыть Telegram автоматически.\n\n" + f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}" + ) except Exception as exc: log.error("Clipboard copy failed: %s", exc) _show_error(f"Не удалось скопировать ссылку:\n{exc}") -def _on_restart(icon=None, item=None): - threading.Thread(target=restart_proxy, daemon=True).start() +def _on_restart(icon=None, item=None) -> None: + threading.Thread( + target=lambda: restart_proxy(_config, _show_error), daemon=True + ).start() -def _on_edit_config(icon=None, item=None): +def _on_edit_config(icon=None, item=None) -> None: threading.Thread(target=_edit_config_dialog, daemon=True).start() -def _edit_config_dialog(): - if not _ensure_ctk_thread(): +def _on_open_logs(icon=None, item=None) -> None: + log.info("Opening log file: %s", LOG_FILE) + if LOG_FILE.exists(): + os.startfile(str(LOG_FILE)) + else: + _show_info("Файл логов ещё не создан.") + + +def _on_exit(icon=None, item=None) -> None: + global _exiting + if _exiting: + os._exit(0) + return + _exiting = True + log.info("User requested exit") + quit_ctk() + threading.Thread(target=lambda: (time.sleep(3), os._exit(0)), daemon=True, name="force-exit").start() + if icon: + icon.stop() + + +# settings dialog + +def _edit_config_dialog() -> None: + if not ensure_ctk_thread(ctk): _show_error("customtkinter не установлен.") return @@ -538,113 +190,64 @@ def _edit_config_dialog(): if _supports_autostart() and not cfg["autostart"]: set_autostart_enabled(False) - def _build(done: threading.Event): + def _build(done: threading.Event) -> None: theme = ctk_theme_for_platform() w, h = CONFIG_DIALOG_SIZE if _supports_autostart(): h += 100 - icon_path = str(Path(__file__).parent / "icon.ico") root = create_ctk_toplevel( - ctk, - title="TG WS Proxy — Настройки", - width=w, - height=h, - theme=theme, - after_create=lambda r: r.iconbitmap(icon_path), + ctk, title="TG WS Proxy — Настройки", width=w, height=h, theme=theme, + after_create=lambda r: r.iconbitmap(ICON_PATH), ) - fpx, fpy = CONFIG_DIALOG_FRAME_PAD frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy) scroll, footer = tray_settings_scroll_and_footer(ctk, frame, theme) widgets = install_tray_config_form( - ctk, - scroll, - theme, - cfg, - DEFAULT_CONFIG, + ctk, scroll, theme, cfg, DEFAULT_CONFIG, show_autostart=_supports_autostart(), autostart_value=cfg.get("autostart", False), ) - def _finish(): + def _finish() -> None: root.destroy() done.set() - def on_save(): - merged = validate_config_form( - widgets, - DEFAULT_CONFIG, - include_autostart=_supports_autostart(), - ) + def on_save() -> None: + merged = validate_config_form(widgets, DEFAULT_CONFIG, include_autostart=_supports_autostart()) if isinstance(merged, str): _show_error(merged) return - - new_cfg = merged - save_config(new_cfg) - _config.update(new_cfg) - log.info("Config saved: %s", new_cfg) + save_config(merged) + _config.update(merged) + log.info("Config saved: %s", merged) if _supports_autostart(): - set_autostart_enabled(bool(new_cfg.get("autostart", False))) + set_autostart_enabled(bool(merged.get("autostart", False))) _tray_icon.menu = _build_menu() from tkinter import messagebox do_restart = messagebox.askyesno( "Перезапустить?", "Настройки сохранены.\n\nПерезапустить прокси сейчас?", - parent=root) + parent=root, + ) _finish() if do_restart: - threading.Thread( - target=restart_proxy, daemon=True).start() + threading.Thread(target=lambda: restart_proxy(_config, _show_error), daemon=True).start() - def on_cancel(): - _finish() + root.protocol("WM_DELETE_WINDOW", _finish) + install_tray_config_buttons(ctk, footer, theme, on_save=on_save, on_cancel=_finish) - root.protocol("WM_DELETE_WINDOW", on_cancel) - install_tray_config_buttons( - ctk, footer, theme, on_save=on_save, on_cancel=on_cancel) - - _ctk_run_dialog(_build) + ctk_run_dialog(_build) -def _on_open_logs(icon=None, item=None): - log.info("Opening log file: %s", LOG_FILE) - if LOG_FILE.exists(): - os.startfile(str(LOG_FILE)) - else: - _show_info("Файл логов ещё не создан.", "TG WS Proxy") +# first run - -def _on_exit(icon=None, item=None): - global _exiting - if _exiting: - os._exit(0) - return - _exiting = True - log.info("User requested exit") - - if _ctk_root is not None: - try: - _ctk_root.after(0, _ctk_root.quit) - except Exception: - pass - - def _force_exit(): - time.sleep(3) - os._exit(0) - threading.Thread(target=_force_exit, daemon=True, name="force-exit").start() - - if icon: - icon.stop() - - -def _show_first_run(): - _ensure_dirs() +def _show_first_run() -> None: + ensure_dirs() if FIRST_RUN_MARKER.exists(): return - if not _ensure_ctk_thread(): + if not ensure_ctk_thread(ctk): FIRST_RUN_MARKER.touch() return @@ -652,84 +255,27 @@ def _show_first_run(): port = _config.get("port", DEFAULT_CONFIG["port"]) secret = _config.get("secret", DEFAULT_CONFIG["secret"]) - def _build(done: threading.Event): + def _build(done: threading.Event) -> None: theme = ctk_theme_for_platform() - icon_path = str(Path(__file__).parent / "icon.ico") w, h = FIRST_RUN_SIZE root = create_ctk_toplevel( - ctk, - title="TG WS Proxy", - width=w, - height=h, - theme=theme, - after_create=lambda r: r.iconbitmap(icon_path), + ctk, title="TG WS Proxy", width=w, height=h, theme=theme, + after_create=lambda r: r.iconbitmap(ICON_PATH), ) - def on_done(open_tg: bool): + def on_done(open_tg: bool) -> None: FIRST_RUN_MARKER.touch() root.destroy() done.set() if open_tg: _on_open_in_telegram() - populate_first_run_window( - ctk, root, theme, host=host, port=port, secret=secret, - on_done=on_done) + populate_first_run_window(ctk, root, theme, host=host, port=port, secret=secret, on_done=on_done) - _ctk_run_dialog(_build) + ctk_run_dialog(_build) -def _has_ipv6_enabled() -> bool: - import socket as _sock - try: - addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6) - for addr in addrs: - ip = addr[4][0] - if not ip or ip.startswith("::1"): - continue - try: - if ipaddress.IPv6Address(ip).is_link_local: - continue - except ValueError: - if ip.startswith("fe80:"): - continue - return True - except Exception: - pass - try: - s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM) - s.bind(('::1', 0)) - s.close() - return True - except Exception: - return False - - -def _check_ipv6_warning(): - _ensure_dirs() - if IPV6_WARN_MARKER.exists(): - return - if not _has_ipv6_enabled(): - return - - IPV6_WARN_MARKER.touch() - - threading.Thread(target=_show_ipv6_dialog, daemon=True).start() - - -def _show_ipv6_dialog(): - _show_info( - "На вашем компьютере включена поддержка подключения по IPv6.\n\n" - "Telegram может пытаться подключаться через IPv6, " - "что не поддерживается и может привести к ошибкам.\n\n" - "Если прокси не работает или в логах присутствуют ошибки, " - "связанные с попытками подключения по IPv6 - " - "попробуйте отключить в настройках прокси Telegram попытку соединения " - "по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 " - "в системе.\n\n" - "Это предупреждение будет показано только один раз.", - "TG WS Proxy") - +# tray menu def _build_menu(): if pystray is None: @@ -737,12 +283,8 @@ def _build_menu(): host = _config.get("host", DEFAULT_CONFIG["host"]) port = _config.get("port", DEFAULT_CONFIG["port"]) link_host = tg_ws_proxy.get_link_host(host) - return pystray.Menu( - pystray.MenuItem( - f"Открыть в Telegram ({link_host}:{port})", - _on_open_in_telegram, - default=True), + pystray.MenuItem(f"Открыть в Telegram ({link_host}:{port})", _on_open_in_telegram, default=True), pystray.Menu.SEPARATOR, pystray.MenuItem("Перезапустить прокси", _on_restart), pystray.MenuItem("Настройки...", _on_edit_config), @@ -752,29 +294,17 @@ def _build_menu(): ) -def run_tray(): +# entry point + +def run_tray() -> None: global _tray_icon, _config _config = load_config() - save_config(_config) - - if LOG_FILE.exists(): - try: - LOG_FILE.unlink() - except Exception: - pass - - setup_logging(_config.get("verbose", False), - log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"])) - log.info("TG WS Proxy версия %s, tray app starting", __version__) - log.info("Config: %s", _config) - log.info("Log file: %s", LOG_FILE) + bootstrap(_config) if pystray is None or Image is None or ctk is None: - log.error( - "pystray, Pillow or customtkinter not installed; " - "running in console mode") - start_proxy() + log.error("pystray, Pillow or customtkinter not installed; running in console mode") + start_proxy(_config, _show_error) try: while True: time.sleep(1) @@ -782,20 +312,12 @@ def run_tray(): stop_proxy() return - start_proxy() - - _maybe_notify_update_async() - + start_proxy(_config, _show_error) + maybe_notify_update(_config, lambda: _exiting, _ask_yes_no) _show_first_run() - _check_ipv6_warning() - - icon_image = _load_icon() - _tray_icon = pystray.Icon( - APP_NAME, - icon_image, - "TG WS Proxy", - menu=_build_menu()) + check_ipv6_warning(_show_info) + _tray_icon = pystray.Icon(APP_NAME, load_icon(), "TG WS Proxy", menu=_build_menu()) log.info("Tray icon running") _tray_icon.run() @@ -803,15 +325,14 @@ def run_tray(): log.info("Tray app exited") -def main(): - if not _acquire_lock(): +def main() -> None: + if not acquire_lock("windows.py"): _show_info("Приложение уже запущено.", os.path.basename(sys.argv[0])) return - try: run_tray() finally: - _release_lock() + release_lock() if __name__ == "__main__":