Files
tg-ws-proxy/linux.py
T
2026-06-23 15:41:49 +03:00

323 lines
9.3 KiB
Python

from __future__ import annotations
import os
import subprocess
import sys
import threading
import time
from typing import Optional
import customtkinter as ctk
import pyperclip
import pystray
from PIL import Image, ImageTk
from proxy import get_link_host
from utils.tray_common import (
APP_NAME, DEFAULT_CONFIG, FIRST_RUN_MARKER, LOG_FILE,
acquire_lock, bootstrap, check_ipv6_warning, ctk_run_dialog,
ensure_ctk_thread, ensure_dirs, load_config, load_icon, log,
maybe_notify_update, quit_ctk, release_lock, restart_proxy,
save_config, start_proxy, stop_proxy, tg_proxy_url,
)
from ui.ctk_tray_ui import (
install_tray_config_buttons, install_tray_config_form,
populate_first_run_window, tray_settings_scroll_and_footer,
validate_config_form,
)
from ui.ctk_theme import (
CONFIG_DIALOG_FRAME_PAD, CONFIG_DIALOG_SIZE, FIRST_RUN_SIZE,
create_ctk_toplevel, ctk_theme_for_platform, main_content_frame,
)
from ui.i18n import set_language, t
_tray_icon: Optional[object] = None
_config: dict = {}
_exiting = False
# dialogs (tkinter messagebox)
def _msgbox(kind: str, text: str, title: str, **kw):
import tkinter as _tk
from tkinter import messagebox as _mb
root = _tk.Tk()
root.withdraw()
try:
root.attributes("-topmost", True)
except Exception:
pass
result = getattr(_mb, kind)(title, text, parent=root, **kw)
root.destroy()
return result
def _show_error(text: str, title: Optional[str] = None) -> None:
_msgbox("showerror", text, title or t("app.error_title"))
def _show_info(text: str, title: Optional[str] = None) -> None:
_msgbox("showinfo", text, title or t("app.name"))
def _ask_yes_no(text: str, title: Optional[str] = None) -> bool:
return bool(_msgbox("askyesno", text, title or t("app.name")))
def _apply_window_icon(root) -> None:
icon_img = load_icon()
if icon_img:
root._ctk_icon_photo = ImageTk.PhotoImage(icon_img.resize((64, 64)))
root.iconphoto(False, root._ctk_icon_photo)
# tray callbacks
def _on_open_in_telegram(icon=None, item=None) -> None:
url = tg_proxy_url(_config)
log.info("Copying %s", url)
try:
pyperclip.copy(url)
_show_info(t("dialog.copy_ok", url=url))
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(t("dialog.copy_fail", error=exc))
def _on_copy_link(icon=None, item=None) -> None:
url = tg_proxy_url(_config)
log.info("Copying link: %s", url)
try:
pyperclip.copy(url)
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(t("dialog.copy_fail", error=exc))
def _on_restart(icon=None, item=None) -> None:
threading.Thread(
target=lambda: restart_proxy(_config, _show_error), daemon=True
).start()
def _on_edit_config(icon=None, item=None) -> None:
threading.Thread(target=_edit_config_dialog, daemon=True).start()
def _on_open_logs(icon=None, item=None) -> None:
log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists():
env = {k: v for k, v in os.environ.items() if k not in ("VIRTUAL_ENV", "PYTHONPATH", "PYTHONHOME")}
subprocess.Popen(
["xdg-open", str(LOG_FILE)], env=env,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL, start_new_session=True,
)
else:
_show_info(t("dialog.log_not_found"))
def _on_exit(icon=None, item=None) -> None:
global _exiting
if _exiting:
os._exit(0)
return
_exiting = True
log.info("User requested exit")
quit_ctk()
threading.Thread(target=lambda: (time.sleep(3), os._exit(0)), daemon=True, name="force-exit").start()
if icon:
icon.stop()
# settings dialog
def _edit_config_dialog() -> None:
if not ensure_ctk_thread(ctk, _config.get("appearance", "auto")):
_show_error(t("dialog.ctk_missing"))
return
cfg = dict(_config)
def _build(done: threading.Event) -> None:
theme = ctk_theme_for_platform()
w, h = CONFIG_DIALOG_SIZE
root = create_ctk_toplevel(
ctk, title=t("app.settings_title"), width=w, height=h, theme=theme,
after_create=_apply_window_icon,
)
fpx, fpy = CONFIG_DIALOG_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
scroll, footer = tray_settings_scroll_and_footer(ctk, frame, theme)
def _refresh_tray_menu() -> None:
if _tray_icon is not None:
_tray_icon.menu = _build_menu()
_original_language = _config.get("language", DEFAULT_CONFIG["language"])
widgets = install_tray_config_form(
ctk, scroll, theme, cfg, DEFAULT_CONFIG,
show_autostart=False,
on_language_change=_refresh_tray_menu,
)
_original_appearance = ctk.get_appearance_mode()
def _restore_ui_locale() -> None:
set_language(_original_language)
_refresh_tray_menu()
def _finish() -> None:
root.destroy()
done.set()
def _cancel() -> None:
ctk.set_appearance_mode(_original_appearance)
_restore_ui_locale()
_finish()
def on_save() -> None:
from tkinter import messagebox
merged = validate_config_form(widgets, DEFAULT_CONFIG, include_autostart=False)
if isinstance(merged, str):
messagebox.showerror(t("app.error_title"), merged, parent=root)
return
_ui_only_keys = {"appearance", "check_updates", "language"}
config_changed = any(merged.get(k) != _config.get(k) for k in merged)
proxy_changed = any(merged.get(k) != _config.get(k) for k in merged if k not in _ui_only_keys)
if not config_changed:
_restore_ui_locale()
_finish()
return
save_config(merged)
_config.update(merged)
set_language(merged.get("language", DEFAULT_CONFIG["language"]))
log.info("Config saved: %s", merged)
_tray_icon.menu = _build_menu()
if not proxy_changed:
_finish()
return
do_restart = messagebox.askyesno(
t("dialog.restart_title"),
t("dialog.restart_body"),
parent=root,
)
_finish()
if do_restart:
threading.Thread(target=lambda: restart_proxy(_config, _show_error), daemon=True).start()
root.protocol("WM_DELETE_WINDOW", _cancel)
install_tray_config_buttons(ctk, footer, theme, on_save=on_save, on_cancel=_cancel)
ctk_run_dialog(_build)
# first run
def _show_first_run() -> None:
ensure_dirs()
if FIRST_RUN_MARKER.exists():
return
if not ensure_ctk_thread(ctk, _config.get("appearance", "auto")):
FIRST_RUN_MARKER.touch()
return
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
secret = _config.get("secret", DEFAULT_CONFIG["secret"])
def _build(done: threading.Event) -> None:
theme = ctk_theme_for_platform()
w, h = FIRST_RUN_SIZE
root = create_ctk_toplevel(
ctk, title=t("app.name"), width=w, height=h, theme=theme,
after_create=_apply_window_icon,
)
def on_done(open_tg: bool) -> None:
FIRST_RUN_MARKER.touch()
root.destroy()
done.set()
if open_tg:
_on_open_in_telegram()
populate_first_run_window(ctk, root, theme, host=host, port=port, secret=secret, on_done=on_done)
ctk_run_dialog(_build)
# tray menu
def _build_menu():
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
link_host = get_link_host(host)
return pystray.Menu(
pystray.MenuItem(t("tray.open_telegram", host=link_host, port=port), _on_open_in_telegram, default=True),
pystray.MenuItem(t("tray.copy_link"), _on_copy_link),
pystray.Menu.SEPARATOR,
pystray.MenuItem(t("tray.restart"), _on_restart),
pystray.MenuItem(t("tray.settings"), _on_edit_config),
pystray.MenuItem(t("tray.logs"), _on_open_logs),
pystray.Menu.SEPARATOR,
pystray.MenuItem(t("tray.exit"), _on_exit),
)
# entry point
def run_tray() -> None:
global _tray_icon, _config
_config = load_config()
bootstrap(_config)
if pystray is None or Image is None:
log.error("pystray or Pillow not installed; running in console mode")
start_proxy(_config, _show_error)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop_proxy()
return
start_proxy(_config, _show_error)
maybe_notify_update(_config, lambda: _exiting, _ask_yes_no)
_show_first_run()
check_ipv6_warning(_show_info)
_tray_icon = pystray.Icon(APP_NAME, load_icon(), t("app.name"), menu=_build_menu())
log.info("Tray icon running")
_tray_icon.run()
stop_proxy()
log.info("Tray app exited")
def main() -> None:
if not acquire_lock():
_show_info(t("dialog.already_running"), os.path.basename(sys.argv[0]))
return
try:
run_tray()
finally:
release_lock()
if __name__ == "__main__":
main()