ctk refactoring

This commit is contained in:
Flowseal 2026-03-29 15:21:56 +03:00
parent c4a044542c
commit 46426c45b0
8 changed files with 1041 additions and 1896 deletions

725
linux.py
View File

@ -1,398 +1,44 @@
from __future__ import annotations
import asyncio as _asyncio
import json
import logging
import logging.handlers
import os
import subprocess
import sys
import threading
import webbrowser
import time
from pathlib import Path
from typing import Dict, Optional
from typing import Optional
import customtkinter as ctk
import psutil
import pyperclip
import pystray
from PIL import Image, ImageDraw, ImageFont
from PIL import Image, ImageTk
import proxy.tg_ws_proxy as tg_ws_proxy
from proxy.tg_ws_proxy import proxy_config
from proxy import __version__
from utils.default_config import default_tray_config
from utils.tray_common import (
APP_NAME, DEFAULT_CONFIG, FIRST_RUN_MARKER, LOG_FILE,
acquire_lock, bootstrap, check_ipv6_warning, ctk_run_dialog,
ensure_ctk_thread, ensure_dirs, load_config, load_icon, log,
maybe_notify_update, quit_ctk, release_lock, restart_proxy,
save_config, start_proxy, stop_proxy, tg_proxy_url,
)
from ui.ctk_tray_ui import (
install_tray_config_buttons,
install_tray_config_form,
populate_first_run_window,
tray_settings_scroll_and_footer,
install_tray_config_buttons, install_tray_config_form,
populate_first_run_window, tray_settings_scroll_and_footer,
validate_config_form,
)
from ui.ctk_theme import (
CONFIG_DIALOG_FRAME_PAD,
CONFIG_DIALOG_SIZE,
FIRST_RUN_SIZE,
create_ctk_toplevel,
ctk_theme_for_platform,
main_content_frame,
CONFIG_DIALOG_FRAME_PAD, CONFIG_DIALOG_SIZE, FIRST_RUN_SIZE,
create_ctk_toplevel, ctk_theme_for_platform, main_content_frame,
)
APP_NAME = "TgWsProxy"
APP_DIR = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / APP_NAME
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
DEFAULT_CONFIG = default_tray_config()
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[object] = None
_tray_icon: Optional[object] = None
_config: dict = {}
_exiting: bool = False
_lock_file_path: Optional[Path] = None
_exiting = False
_ctk_root = None
_ctk_root_ready = threading.Event()
# dialogs (tkinter messagebox)
log = logging.getLogger("tg-ws-tray")
def _same_process(lock_meta: dict, proc: psutil.Process) -> bool:
try:
lock_ct = float(lock_meta.get("create_time", 0.0))
proc_ct = float(proc.create_time())
if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0:
return False
except Exception:
return False
try:
cmdline = proc.cmdline()
for arg in cmdline:
if "linux.py" in arg:
return True
except Exception:
pass
frozen = bool(getattr(sys, "frozen", False))
if frozen:
return APP_NAME.lower() in proc.name().lower()
return False
def _release_lock():
global _lock_file_path
if not _lock_file_path:
return
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
def _acquire_lock() -> bool:
global _lock_file_path
_ensure_dirs()
lock_files = list(APP_DIR.glob("*.lock"))
for f in lock_files:
pid = None
meta: dict = {}
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
meta = {}
try:
proc = psutil.Process(pid)
if _same_process(meta, proc):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
payload = {
"create_time": proc.create_time(),
}
lock_file.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
def _ensure_dirs():
APP_DIR.mkdir(parents=True, exist_ok=True)
def load_config() -> dict:
_ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict):
_ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
def setup_logging(verbose: bool = False, log_max_mb: float = 5):
_ensure_dirs()
root = logging.getLogger()
root.setLevel(logging.DEBUG if verbose else logging.INFO)
fh = logging.handlers.RotatingFileHandler(
str(LOG_FILE),
maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024),
backupCount=0,
encoding='utf-8',
)
fh.setLevel(logging.DEBUG)
fh.setFormatter(
logging.Formatter(
"%(asctime)s %(levelname)-5s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
root.addHandler(fh)
if not getattr(sys, "frozen", False):
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG if verbose else logging.INFO)
ch.setFormatter(
logging.Formatter(
"%(asctime)s %(levelname)-5s %(message)s", datefmt="%H:%M:%S"
)
)
root.addHandler(ch)
def _make_icon_image(size: int = 64):
if Image is None:
raise RuntimeError("Pillow is required for tray icon")
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = 2
draw.ellipse(
[margin, margin, size - margin, size - margin], fill=(0, 136, 204, 255)
)
try:
font = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
size=int(size * 0.55),
)
except Exception:
try:
font = ImageFont.truetype(
"/usr/share/fonts/TTF/DejaVuSans-Bold.ttf", size=int(size * 0.55)
)
except Exception:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
tx = (size - tw) // 2 - bbox[0]
ty = (size - th) // 2 - bbox[1]
draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font)
return img
def _load_icon():
icon_path = Path(__file__).parent / "icon.ico"
if icon_path.exists() and Image:
try:
return Image.open(str(icon_path))
except Exception:
pass
return _make_icon_image()
def _apply_linux_ctk_window_icon(root) -> None:
"""PhotoImage храним на root — иначе GC может убрать картинку до закрытия окна."""
icon_img = _load_icon()
if icon_img:
from PIL import ImageTk
root._ctk_icon_photo = ImageTk.PhotoImage(icon_img.resize((64, 64)))
root.iconphoto(False, root._ctk_icon_photo)
def _ensure_ctk_thread() -> bool:
"""Start the persistent hidden CTk root in its own thread (once)."""
global _ctk_root
if _ctk_root_ready.is_set():
return True
def _run():
global _ctk_root
from ui.ctk_theme import (
apply_ctk_appearance,
_install_tkinter_variable_del_guard,
)
_install_tkinter_variable_del_guard()
apply_ctk_appearance(ctk)
_ctk_root = ctk.CTk()
_ctk_root.withdraw()
_ctk_root_ready.set()
_ctk_root.mainloop()
threading.Thread(target=_run, daemon=True, name="ctk-root").start()
_ctk_root_ready.wait(timeout=5.0)
return _ctk_root is not None
def _ctk_run_dialog(build_fn) -> None:
"""Schedule build_fn(done_event) on the CTk thread and block until done_event is set."""
if _ctk_root is None:
return
done = threading.Event()
def _invoke():
try:
build_fn(done)
except Exception:
log.exception("CTk dialog failed")
done.set()
_ctk_root.after(0, _invoke)
done.wait()
def _run_proxy_thread():
global _async_stop
loop = _asyncio.new_event_loop()
_asyncio.set_event_loop(loop)
stop_ev = _asyncio.Event()
_async_stop = (loop, stop_ev)
try:
loop.run_until_complete(
tg_ws_proxy._run(stop_event=stop_ev)
)
except Exception as exc:
log.error("Proxy thread crashed: %s", exc)
if "Address already in use" in str(exc):
_show_error(
"Не удалось запустить прокси:\nПорт уже используется другим приложением.\n\nЗакройте приложение, использующее этот порт, или измените порт в настройках прокси и перезапустите."
)
finally:
loop.close()
_async_stop = None
def start_proxy():
global _proxy_thread, _config
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
cfg = _config
port = cfg.get("port", DEFAULT_CONFIG["port"])
host = cfg.get("host", DEFAULT_CONFIG["host"])
secret = cfg.get("secret", DEFAULT_CONFIG["secret"])
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])
pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])
try:
dc_redirects = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
_show_error(f"Ошибка конфигурации:\n{e}")
return
proxy_config.port = port
proxy_config.host = host
proxy_config.secret = secret
proxy_config.dc_redirects = dc_redirects
proxy_config.buffer_size = max(4, buf_kb) * 1024
proxy_config.pool_size = max(0, pool_size)
log.info("Starting proxy on %s:%d ...", host, port)
_proxy_thread = threading.Thread(
target=_run_proxy_thread,
daemon=True,
name="proxy",
)
_proxy_thread.start()
def stop_proxy():
global _proxy_thread, _async_stop
if _async_stop:
loop, stop_ev = _async_stop
loop.call_soon_threadsafe(stop_ev.set)
if _proxy_thread:
_proxy_thread.join(timeout=2)
_proxy_thread = None
log.info("Proxy stopped")
def restart_proxy():
log.info("Restarting proxy...")
stop_proxy()
time.sleep(0.3)
start_proxy()
def _show_error(text: str, title: str = "TG WS Proxy — Ошибка"):
import tkinter as _tk
from tkinter import messagebox as _mb
root = _tk.Tk()
root.withdraw()
_mb.showerror(title, text, parent=root)
root.destroy()
def _show_info(text: str, title: str = "TG WS Proxy"):
import tkinter as _tk
from tkinter import messagebox as _mb
root = _tk.Tk()
root.withdraw()
_mb.showinfo(title, text, parent=root)
root.destroy()
def _ask_yes_no_dialog(text: str, title: str = "TG WS Proxy") -> bool:
def _msgbox(kind: str, text: str, title: str, **kw):
import tkinter as _tk
from tkinter import messagebox as _mb
@ -402,178 +48,142 @@ def _ask_yes_no_dialog(text: str, title: str = "TG WS Proxy") -> bool:
root.attributes("-topmost", True)
except Exception:
pass
r = _mb.askyesno(title, text, parent=root)
result = getattr(_mb, kind)(title, text, parent=root, **kw)
root.destroy()
return bool(r)
return result
def _maybe_notify_update_async():
def _work():
time.sleep(1.5)
if _exiting:
return
if not _config.get("check_updates", True):
return
try:
from utils.update_check import RELEASES_PAGE_URL, get_status, run_check
run_check(__version__)
st = get_status()
if not st.get("has_update"):
return
url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ver = st.get("latest") or "?"
text = (
f"Доступна новая версия: {ver}\n\n"
f"Открыть страницу релиза в браузере?"
)
if _ask_yes_no_dialog(text, "TG WS Proxy — обновление"):
webbrowser.open(url)
except Exception as exc:
log.debug("Update check failed: %s", exc)
threading.Thread(target=_work, daemon=True, name="update-check").start()
def _show_error(text: str, title: str = "TG WS Proxy — Ошибка") -> None:
_msgbox("showerror", text, title)
def _on_open_in_telegram(icon=None, item=None):
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
secret = _config.get("secret", DEFAULT_CONFIG["secret"])
def _show_info(text: str, title: str = "TG WS Proxy") -> None:
_msgbox("showinfo", text, title)
url = f"tg://proxy?server={host}&port={port}&secret=dd{secret}"
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
return bool(_msgbox("askyesno", text, title))
def _apply_window_icon(root) -> None:
icon_img = load_icon()
if icon_img:
root._ctk_icon_photo = ImageTk.PhotoImage(icon_img.resize((64, 64)))
root.iconphoto(False, root._ctk_icon_photo)
# tray callbacks
def _on_open_in_telegram(icon=None, item=None) -> None:
url = tg_proxy_url(_config)
log.info("Copying %s", url)
try:
pyperclip.copy(url)
_show_info(
f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}",
"TG WS Proxy",
f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}"
)
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_restart(icon=None, item=None):
threading.Thread(target=restart_proxy, daemon=True).start()
def _on_restart(icon=None, item=None) -> None:
threading.Thread(
target=lambda: restart_proxy(_config, _show_error), daemon=True
).start()
def _on_edit_config(icon=None, item=None):
def _on_edit_config(icon=None, item=None) -> None:
threading.Thread(target=_edit_config_dialog, daemon=True).start()
def _edit_config_dialog():
if not _ensure_ctk_thread():
_show_error("customtkinter не установлен.")
return
cfg = dict(_config)
def _build(done: threading.Event):
theme = ctk_theme_for_platform()
w, h = CONFIG_DIALOG_SIZE
root = create_ctk_toplevel(
ctk,
title="TG WS Proxy — Настройки",
width=w,
height=h,
theme=theme,
after_create=_apply_linux_ctk_window_icon,
)
fpx, fpy = CONFIG_DIALOG_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
scroll, footer = tray_settings_scroll_and_footer(ctk, frame, theme)
widgets = install_tray_config_form(
ctk, scroll, theme, cfg, DEFAULT_CONFIG,
show_autostart=False,
)
def _finish():
root.destroy()
done.set()
def on_save():
merged = validate_config_form(
widgets, DEFAULT_CONFIG, include_autostart=False)
if isinstance(merged, str):
_show_error(merged)
return
new_cfg = merged
save_config(new_cfg)
_config.update(new_cfg)
log.info("Config saved: %s", new_cfg)
_tray_icon.menu = _build_menu()
from tkinter import messagebox
do_restart = messagebox.askyesno(
"Перезапустить?",
"Настройки сохранены.\n\nПерезапустить прокси сейчас?",
parent=root)
_finish()
if do_restart:
threading.Thread(
target=restart_proxy, daemon=True).start()
def on_cancel():
_finish()
root.protocol("WM_DELETE_WINDOW", on_cancel)
install_tray_config_buttons(
ctk, footer, theme, on_save=on_save, on_cancel=on_cancel)
_ctk_run_dialog(_build)
def _on_open_logs(icon=None, item=None):
def _on_open_logs(icon=None, item=None) -> None:
log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists():
env = os.environ.copy()
env.pop("VIRTUAL_ENV", None)
env.pop("PYTHONPATH", None)
env.pop("PYTHONHOME", None)
env = {k: v for k, v in os.environ.items() if k not in ("VIRTUAL_ENV", "PYTHONPATH", "PYTHONHOME")}
subprocess.Popen(
["xdg-open", str(LOG_FILE)],
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
start_new_session=True,
["xdg-open", str(LOG_FILE)], env=env,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL, start_new_session=True,
)
else:
_show_info("Файл логов ещё не создан.", "TG WS Proxy")
_show_info("Файл логов ещё не создан.")
def _on_exit(icon=None, item=None):
def _on_exit(icon=None, item=None) -> None:
global _exiting
if _exiting:
os._exit(0)
return
_exiting = True
log.info("User requested exit")
if _ctk_root is not None:
try:
_ctk_root.after(0, _ctk_root.quit)
except Exception:
pass
def _force_exit():
time.sleep(3)
os._exit(0)
threading.Thread(target=_force_exit, daemon=True, name="force-exit").start()
quit_ctk()
threading.Thread(target=lambda: (time.sleep(3), os._exit(0)), daemon=True, name="force-exit").start()
if icon:
icon.stop()
def _show_first_run():
_ensure_dirs()
# settings dialog
def _edit_config_dialog() -> None:
if not ensure_ctk_thread(ctk):
_show_error("customtkinter не установлен.")
return
cfg = dict(_config)
def _build(done: threading.Event) -> None:
theme = ctk_theme_for_platform()
w, h = CONFIG_DIALOG_SIZE
root = create_ctk_toplevel(
ctk, title="TG WS Proxy — Настройки", width=w, height=h, theme=theme,
after_create=_apply_window_icon,
)
fpx, fpy = CONFIG_DIALOG_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
scroll, footer = tray_settings_scroll_and_footer(ctk, frame, theme)
widgets = install_tray_config_form(ctk, scroll, theme, cfg, DEFAULT_CONFIG, show_autostart=False)
def _finish() -> None:
root.destroy()
done.set()
def on_save() -> None:
merged = validate_config_form(widgets, DEFAULT_CONFIG, include_autostart=False)
if isinstance(merged, str):
_show_error(merged)
return
save_config(merged)
_config.update(merged)
log.info("Config saved: %s", merged)
_tray_icon.menu = _build_menu()
from tkinter import messagebox
do_restart = messagebox.askyesno(
"Перезапустить?",
"Настройки сохранены.\n\nПерезапустить прокси сейчас?",
parent=root,
)
_finish()
if do_restart:
threading.Thread(target=lambda: restart_proxy(_config, _show_error), daemon=True).start()
root.protocol("WM_DELETE_WINDOW", _finish)
install_tray_config_buttons(ctk, footer, theme, on_save=on_save, on_cancel=_finish)
ctk_run_dialog(_build)
# first run
def _show_first_run() -> None:
ensure_dirs()
if FIRST_RUN_MARKER.exists():
return
if not _ensure_ctk_thread():
if not ensure_ctk_thread(ctk):
FIRST_RUN_MARKER.touch()
return
@ -581,90 +191,35 @@ def _show_first_run():
port = _config.get("port", DEFAULT_CONFIG["port"])
secret = _config.get("secret", DEFAULT_CONFIG["secret"])
def _build(done: threading.Event):
def _build(done: threading.Event) -> None:
theme = ctk_theme_for_platform()
w, h = FIRST_RUN_SIZE
root = create_ctk_toplevel(
ctk,
title="TG WS Proxy",
width=w,
height=h,
theme=theme,
after_create=_apply_linux_ctk_window_icon,
ctk, title="TG WS Proxy", width=w, height=h, theme=theme,
after_create=_apply_window_icon,
)
def on_done(open_tg: bool):
def on_done(open_tg: bool) -> None:
FIRST_RUN_MARKER.touch()
root.destroy()
done.set()
if open_tg:
_on_open_in_telegram()
populate_first_run_window(
ctk, root, theme, host=host, port=port, secret=secret,
on_done=on_done)
populate_first_run_window(ctk, root, theme, host=host, port=port, secret=secret, on_done=on_done)
_ctk_run_dialog(_build)
ctk_run_dialog(_build)
def _has_ipv6_enabled() -> bool:
import socket as _sock
try:
addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6)
for addr in addrs:
ip = addr[4][0]
if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"):
return True
except Exception:
pass
try:
s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM)
s.bind(("::1", 0))
s.close()
return True
except Exception:
return False
def _check_ipv6_warning():
_ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
if not _has_ipv6_enabled():
return
IPV6_WARN_MARKER.touch()
threading.Thread(target=_show_ipv6_dialog, daemon=True).start()
def _show_ipv6_dialog():
_show_info(
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает или в логах присутствуют ошибки, "
"связанные с попытками подключения по IPv6 - "
"попробуйте отключить в настройках прокси Telegram попытку соединения "
"по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 "
"в системе.\n\n"
"Это предупреждение будет показано только один раз.",
"TG WS Proxy",
)
# tray menu
def _build_menu():
if pystray is None:
return None
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
link_host = tg_ws_proxy.get_link_host(host)
return pystray.Menu(
pystray.MenuItem(
f"Открыть в Telegram ({link_host}:{port})", _on_open_in_telegram, default=True
),
pystray.MenuItem(f"Открыть в Telegram ({link_host}:{port})", _on_open_in_telegram, default=True),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Перезапустить прокси", _on_restart),
pystray.MenuItem("Настройки...", _on_edit_config),
@ -674,27 +229,18 @@ def _build_menu():
)
def run_tray():
# entry point
def run_tray() -> None:
global _tray_icon, _config
_config = load_config()
save_config(_config)
if LOG_FILE.exists():
try:
LOG_FILE.unlink()
except Exception:
pass
setup_logging(_config.get("verbose", False),
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]))
log.info("TG WS Proxy версия %s, tray app starting", __version__)
log.info("Config: %s", _config)
log.info("Log file: %s", LOG_FILE)
bootstrap(_config)
if pystray is None or Image is None:
log.error("pystray or Pillow not installed; running in console mode")
start_proxy()
start_proxy(_config, _show_error)
try:
while True:
time.sleep(1)
@ -702,16 +248,12 @@ def run_tray():
stop_proxy()
return
start_proxy()
_maybe_notify_update_async()
start_proxy(_config, _show_error)
maybe_notify_update(_config, lambda: _exiting, _ask_yes_no)
_show_first_run()
_check_ipv6_warning()
icon_image = _load_icon()
_tray_icon = pystray.Icon(APP_NAME, icon_image, "TG WS Proxy", menu=_build_menu())
check_ipv6_warning(_show_info)
_tray_icon = pystray.Icon(APP_NAME, load_icon(), "TG WS Proxy", menu=_build_menu())
log.info("Tray icon running")
_tray_icon.run()
@ -719,15 +261,14 @@ def run_tray():
log.info("Tray app exited")
def main():
if not _acquire_lock():
def main() -> None:
if not acquire_lock("linux.py"):
_show_info("Приложение уже запущено.", os.path.basename(sys.argv[0]))
return
try:
run_tray()
finally:
_release_lock()
release_lock()
if __name__ == "__main__":

579
macos.py
View File

@ -1,18 +1,13 @@
from __future__ import annotations
import json
import logging
import logging.handlers
import os
import psutil
import subprocess
import sys
import threading
import time
import webbrowser
import asyncio as _asyncio
from pathlib import Path
from typing import Dict, Optional
from typing import Optional
try:
import rumps
@ -30,261 +25,135 @@ except ImportError:
pyperclip = None
import proxy.tg_ws_proxy as tg_ws_proxy
from proxy.tg_ws_proxy import proxy_config
from proxy import __version__
from utils.default_config import default_tray_config
from utils.tray_common import (
APP_DIR, APP_NAME, DEFAULT_CONFIG, FIRST_RUN_MARKER, IPV6_WARN_MARKER,
LOG_FILE, acquire_lock, apply_proxy_config, ensure_dirs, load_config,
log, release_lock, save_config, setup_logging, stop_proxy, tg_proxy_url,
)
APP_NAME = "TgWsProxy"
APP_DIR = Path.home() / "Library" / "Application Support" / APP_NAME
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
MENUBAR_ICON_PATH = APP_DIR / "menubar_icon.png"
DEFAULT_CONFIG = default_tray_config()
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[object] = None
_app: Optional[object] = None
_config: dict = {}
_exiting: bool = False
_lock_file_path: Optional[Path] = None
log = logging.getLogger("tg-ws-tray")
# osascript dialogs
# Single-instance lock
def _same_process(lock_meta: dict, proc: psutil.Process) -> bool:
try:
lock_ct = float(lock_meta.get("create_time", 0.0))
proc_ct = float(proc.create_time())
if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0:
return False
except Exception:
return False
frozen = bool(getattr(sys, "frozen", False))
if frozen:
return APP_NAME.lower() in proc.name().lower()
return False
def _esc(text: str) -> str:
return text.replace("\\", "\\\\").replace('"', '\\"')
def _release_lock():
global _lock_file_path
if not _lock_file_path:
return
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
def _osascript(script: str) -> str:
r = subprocess.run(["osascript", "-e", script], capture_output=True, text=True)
return r.stdout.strip()
def _acquire_lock() -> bool:
global _lock_file_path
_ensure_dirs()
lock_files = list(APP_DIR.glob("*.lock"))
for f in lock_files:
pid = None
meta: dict = {}
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
meta = {}
try:
proc = psutil.Process(pid)
if _same_process(meta, proc):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
payload = {"create_time": proc.create_time()}
lock_file.write_text(json.dumps(payload, ensure_ascii=False),
encoding="utf-8")
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
# Filesystem helpers
def _ensure_dirs():
APP_DIR.mkdir(parents=True, exist_ok=True)
def load_config() -> dict:
_ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict):
_ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
def setup_logging(verbose: bool = False, log_max_mb: float = 5):
_ensure_dirs()
root = logging.getLogger()
root.setLevel(logging.DEBUG if verbose else logging.INFO)
fh = logging.handlers.RotatingFileHandler(
str(LOG_FILE),
maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024),
backupCount=0,
encoding='utf-8',
def _show_error(text: str, title: str = "TG WS Proxy") -> None:
_osascript(
f'display dialog "{_esc(text)}" with title "{_esc(title)}" '
f'buttons {{"OK"}} default button "OK" with icon stop'
)
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"))
root.addHandler(fh)
if not getattr(sys, "frozen", False):
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG if verbose else logging.INFO)
ch.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(message)s",
datefmt="%H:%M:%S"))
root.addHandler(ch)
# Menubar icon
def _show_info(text: str, title: str = "TG WS Proxy") -> None:
_osascript(
f'display dialog "{_esc(text)}" with title "{_esc(title)}" '
f'buttons {{"OK"}} default button "OK" with icon note'
)
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
return _ask_yes_no_close(text, title) is True
def _ask_yes_no_close(text: str, title: str = "TG WS Proxy") -> Optional[bool]:
r = subprocess.run(
[
"osascript", "-e",
f'button returned of (display dialog "{_esc(text)}" '
f'with title "{_esc(title)}" '
f'buttons {{"Закрыть", "Нет", "Да"}} '
f'default button "Да" cancel button "Закрыть" with icon note)',
],
capture_output=True, text=True,
)
if r.returncode != 0:
return None
btn = r.stdout.strip()
if btn == "Да":
return True
if btn == "Нет":
return False
return None
def _osascript_input(prompt: str, default: str, title: str = "TG WS Proxy") -> Optional[str]:
r = subprocess.run(
[
"osascript", "-e",
f'text returned of (display dialog "{_esc(prompt)}" '
f'default answer "{_esc(default)}" '
f'with title "{_esc(title)}" '
f'buttons {{"Закрыть", "OK"}} '
f'default button "OK" cancel button "Закрыть")',
],
capture_output=True, text=True,
)
if r.returncode != 0:
return None
return r.stdout.rstrip("\r\n")
# menubar icon
def _make_menubar_icon(size: int = 44):
if Image is None:
return None
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = size // 11
draw.ellipse([margin, margin, size - margin, size - margin],
fill=(0, 0, 0, 255))
draw.ellipse([margin, margin, size - margin, size - margin], fill=(0, 0, 0, 255))
try:
font = ImageFont.truetype(
"/System/Library/Fonts/Helvetica.ttc",
size=int(size * 0.55))
font = ImageFont.truetype("/System/Library/Fonts/Helvetica.ttc", size=int(size * 0.55))
except Exception:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
tx = (size - tw) // 2 - bbox[0]
ty = (size - th) // 2 - bbox[1]
draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font)
draw.text(
((size - tw) // 2 - bbox[0], (size - th) // 2 - bbox[1]),
"T", fill=(255, 255, 255, 255), font=font,
)
return img
# Generate menubar icon PNG if it does not exist.
def _ensure_menubar_icon():
def _ensure_menubar_icon() -> None:
if MENUBAR_ICON_PATH.exists():
return
_ensure_dirs()
ensure_dirs()
img = _make_menubar_icon(44)
if img:
img.save(str(MENUBAR_ICON_PATH), "PNG")
# Native macOS dialogs
# proxy lifecycle (macOS-local)
def _escape_osascript_text(text: str) -> str:
return text.replace('\\', '\\\\').replace('"', '\\"')
import asyncio as _asyncio
def _osascript(script: str) -> str:
r = subprocess.run(
['osascript', '-e', script],
capture_output=True, text=True)
return r.stdout.strip()
def _show_error(text: str, title: str = "TG WS Proxy"):
text_esc = _escape_osascript_text(text)
title_esc = _escape_osascript_text(title)
_osascript(
f'display dialog "{text_esc}" with title "{title_esc}" '
f'buttons {{"OK"}} default button "OK" with icon stop')
def _show_info(text: str, title: str = "TG WS Proxy"):
text_esc = _escape_osascript_text(text)
title_esc = _escape_osascript_text(title)
_osascript(
f'display dialog "{text_esc}" with title "{title_esc}" '
f'buttons {{"OK"}} default button "OK" with icon note')
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
result = _ask_yes_no_close(text, title)
return result is True
def _ask_yes_no_close(text: str,
title: str = "TG WS Proxy") -> Optional[bool]:
text_esc = _escape_osascript_text(text)
title_esc = _escape_osascript_text(title)
r = subprocess.run(
['osascript', '-e',
f'button returned of (display dialog "{text_esc}" '
f'with title "{title_esc}" '
f'buttons {{"Закрыть", "Нет", "Да"}} '
f'default button "Да" cancel button "Закрыть" with icon note)'],
capture_output=True, text=True)
if r.returncode != 0:
return None
result = r.stdout.strip()
if result == "Да":
return True
if result == "Нет":
return False
return None
# Proxy lifecycle
def _run_proxy_thread():
def _run_proxy_thread() -> None:
global _async_stop
loop = _asyncio.new_event_loop()
_asyncio.set_event_loop(loop)
stop_ev = _asyncio.Event()
_async_stop = (loop, stop_ev)
try:
loop.run_until_complete(
tg_ws_proxy._run(stop_event=stop_ev))
loop.run_until_complete(tg_ws_proxy._run(stop_event=stop_ev))
except Exception as exc:
log.error("Proxy thread crashed: %s", exc)
if "Address already in use" in str(exc):
@ -292,49 +161,28 @@ def _run_proxy_thread():
"Не удалось запустить прокси:\n"
"Порт уже используется другим приложением.\n\n"
"Закройте приложение, использующее этот порт, "
"или измените порт в настройках прокси и перезапустите.")
"или измените порт в настройках прокси и перезапустите."
)
finally:
loop.close()
_async_stop = None
def start_proxy():
global _proxy_thread, _config
def _start_proxy() -> None:
global _proxy_thread
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
cfg = _config
port = cfg.get("port", DEFAULT_CONFIG["port"])
host = cfg.get("host", DEFAULT_CONFIG["host"])
secret = cfg.get("secret", DEFAULT_CONFIG["secret"])
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])
pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])
try:
dc_redirects = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
_show_error(f"Ошибка конфигурации:\n{e}")
if not apply_proxy_config(_config):
_show_error("Ошибка конфигурации DC → IP.")
return
proxy_config.port = port
proxy_config.host = host
proxy_config.secret = secret
proxy_config.dc_redirects = dc_redirects
proxy_config.buffer_size = max(4, buf_kb) * 1024
proxy_config.pool_size = max(0, pool_size)
log.info("Starting proxy on %s:%d ...", host, port)
_proxy_thread = threading.Thread(
target=_run_proxy_thread,
daemon=True, name="proxy")
pc = tg_ws_proxy.proxy_config
log.info("Starting proxy on %s:%d ...", pc.host, pc.port)
_proxy_thread = threading.Thread(target=_run_proxy_thread, daemon=True, name="proxy")
_proxy_thread.start()
def stop_proxy():
def _stop_proxy() -> None:
global _proxy_thread, _async_stop
if _async_stop:
loop, stop_ev = _async_stop
@ -345,24 +193,21 @@ def stop_proxy():
log.info("Proxy stopped")
def restart_proxy():
def _restart_proxy() -> None:
log.info("Restarting proxy...")
stop_proxy()
_stop_proxy()
time.sleep(0.3)
start_proxy()
_start_proxy()
# Menu callbacks
# menu callbacks
def _on_open_in_telegram(_=None):
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
secret = _config.get("secret", DEFAULT_CONFIG["secret"])
url = f"tg://proxy?server={host}&port={port}&secret=dd{secret}"
def _on_open_in_telegram(_=None) -> None:
url = tg_proxy_url(_config)
log.info("Opening %s", url)
try:
result = subprocess.call(['open', url])
result = subprocess.call(["open", url])
if result != 0:
raise RuntimeError("open command failed")
except Exception:
@ -376,67 +221,45 @@ def _on_open_in_telegram(_=None):
if pyperclip:
pyperclip.copy(url)
else:
subprocess.run(['pbcopy'], input=url.encode(),
check=True)
subprocess.run(["pbcopy"], input=url.encode(), check=True)
_show_info(
"Не удалось открыть Telegram автоматически.\n\n"
f"Ссылка скопирована в буфер обмена:\n{url}")
f"Ссылка скопирована в буфер обмена:\n{url}"
)
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_restart(_=None):
def _do_restart():
def _on_restart(_=None) -> None:
def _do():
global _config
_config = load_config()
if _app:
_app.update_menu_title()
restart_proxy()
_restart_proxy()
threading.Thread(target=_do_restart, daemon=True).start()
threading.Thread(target=_do, daemon=True).start()
def _on_open_logs(_=None):
def _on_open_logs(_=None) -> None:
log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists():
subprocess.call(['open', str(LOG_FILE)])
subprocess.call(["open", str(LOG_FILE)])
else:
_show_info("Файл логов ещё не создан.")
# Show a native text input dialog. Returns None if cancelled.
def _osascript_input(prompt: str, default: str,
title: str = "TG WS Proxy") -> Optional[str]:
prompt_esc = _escape_osascript_text(prompt)
default_esc = _escape_osascript_text(default)
title_esc = _escape_osascript_text(title)
r = subprocess.run(
['osascript', '-e',
f'text returned of (display dialog "{prompt_esc}" '
f'default answer "{default_esc}" '
f'with title "{title_esc}" '
f'buttons {{"Закрыть", "OK"}} '
f'default button "OK" cancel button "Закрыть")'],
capture_output=True, text=True)
if r.returncode != 0:
return None
return r.stdout.rstrip("\r\n")
def _on_edit_config(_=None):
def _on_edit_config(_=None) -> None:
threading.Thread(target=_edit_config_dialog, daemon=True).start()
def _check_updates_menu_title() -> str:
on = bool(_config.get("check_updates", True))
return (
"✓ Проверять обновления при запуске"
if on
else "Проверять обновления при запуске (выкл)"
)
return "✓ Проверять обновления при запуске" if on else "Проверять обновления при запуске (выкл)"
def _toggle_check_updates(_=None):
def _toggle_check_updates(_=None) -> None:
global _config
_config["check_updates"] = not bool(_config.get("check_updates", True))
save_config(_config)
@ -444,12 +267,15 @@ def _toggle_check_updates(_=None):
_app._check_updates_item.title = _check_updates_menu_title()
def _on_open_release_page(_=None):
def _on_open_release_page(_=None) -> None:
from utils.update_check import RELEASES_PAGE_URL
webbrowser.open(RELEASES_PAGE_URL)
def _maybe_notify_update_async():
# update check
def _maybe_notify_update_async() -> None:
def _work():
time.sleep(1.5)
if _exiting:
@ -465,8 +291,7 @@ def _maybe_notify_update_async():
url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ver = st.get("latest") or "?"
if _ask_yes_no(
f"Доступна новая версия: {ver}\n\n"
f"Открыть страницу релиза в браузере?",
f"Доступна новая версия: {ver}\n\nОткрыть страницу релиза в браузере?",
"TG WS Proxy — обновление",
):
webbrowser.open(url)
@ -476,18 +301,16 @@ def _maybe_notify_update_async():
threading.Thread(target=_work, daemon=True, name="update-check").start()
# Settings via native macOS dialogs
def _edit_config_dialog():
# settings dialog
def _edit_config_dialog() -> None:
cfg = load_config()
# Host
host = _osascript_input(
"IP-адрес прокси:",
cfg.get("host", DEFAULT_CONFIG["host"]))
host = _osascript_input("IP-адрес прокси:", cfg.get("host", DEFAULT_CONFIG["host"]))
if host is None:
return
host = host.strip()
import socket as _sock
try:
_sock.inet_aton(host)
@ -495,10 +318,7 @@ def _edit_config_dialog():
_show_error("Некорректный IP-адрес.")
return
# Port
port_str = _osascript_input(
"Порт прокси:",
str(cfg.get("port", DEFAULT_CONFIG["port"])))
port_str = _osascript_input("Порт прокси:", str(cfg.get("port", DEFAULT_CONFIG["port"])))
if port_str is None:
return
try:
@ -509,10 +329,9 @@ def _edit_config_dialog():
_show_error("Порт должен быть числом 1-65535")
return
# Secret
secret_str = _osascript_input(
"MTProto Secret (32 hex символа):",
cfg.get("secret", DEFAULT_CONFIG["secret"]))
"MTProto Secret (32 hex символа):", cfg.get("secret", DEFAULT_CONFIG["secret"])
)
if secret_str is None:
return
secret_str = secret_str.strip().lower()
@ -520,42 +339,39 @@ def _edit_config_dialog():
_show_error("Secret должен быть строкой из 32 шестнадцатеричных символов.")
return
# DC-IP mappings
dc_default = ", ".join(cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"]))
dc_str = _osascript_input(
"DC → IP маппинги (через запятую, формат DC:IP):\n"
"Например: 2:149.154.167.220, 4:149.154.167.220",
dc_default)
dc_default,
)
if dc_str is None:
return
dc_lines = [s.strip() for s in dc_str.replace(',', '\n').splitlines()
if s.strip()]
dc_lines = [s.strip() for s in dc_str.replace(",", "\n").splitlines() if s.strip()]
try:
tg_ws_proxy.parse_dc_ip_list(dc_lines)
except ValueError as e:
_show_error(str(e))
return
# Verbose
verbose = _ask_yes_no_close("Включить подробное логирование (verbose)?")
if verbose is None:
return
# Advanced settings
adv_str = _osascript_input(
"Расширенные настройки (буфер KB, WS пул, лог MB):\n"
"Формат: buf_kb,pool_size,log_max_mb",
f"{cfg.get('buf_kb', DEFAULT_CONFIG['buf_kb'])},"
f"{cfg.get('pool_size', DEFAULT_CONFIG['pool_size'])},"
f"{cfg.get('log_max_mb', DEFAULT_CONFIG['log_max_mb'])}")
f"{cfg.get('log_max_mb', DEFAULT_CONFIG['log_max_mb'])}",
)
if adv_str is None:
return
adv = {}
if adv_str:
parts = [s.strip() for s in adv_str.split(',')]
keys = [("buf_kb", int), ("pool_size", int),
("log_max_mb", float)]
parts = [s.strip() for s in adv_str.split(",")]
keys = [("buf_kb", int), ("pool_size", int), ("log_max_mb", float)]
for i, (k, typ) in enumerate(keys):
if i < len(parts):
try:
@ -572,6 +388,7 @@ def _edit_config_dialog():
"buf_kb": adv.get("buf_kb", cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])),
"pool_size": adv.get("pool_size", cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])),
"log_max_mb": adv.get("log_max_mb", cfg.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"])),
"check_updates": cfg.get("check_updates", True),
}
save_config(new_cfg)
log.info("Config saved: %s", new_cfg)
@ -581,23 +398,22 @@ def _edit_config_dialog():
if _app:
_app.update_menu_title()
if _ask_yes_no_close(
"Настройки сохранены.\n\nПерезапустить прокси сейчас?"):
restart_proxy()
if _ask_yes_no_close("Настройки сохранены.\n\nПерезапустить прокси сейчас?"):
_restart_proxy()
# First-run & IPv6 dialogs
# first run & ipv6
def _show_first_run():
_ensure_dirs()
def _show_first_run() -> None:
ensure_dirs()
if FIRST_RUN_MARKER.exists():
return
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
secret = _config.get("secret", DEFAULT_CONFIG["secret"])
tg_url = f"tg://proxy?server={host}&port={port}&secret=dd{secret}"
tg_url = tg_proxy_url(_config)
text = (
f"Прокси запущен и работает в строке меню.\n\n"
@ -613,49 +429,48 @@ def _show_first_run():
)
FIRST_RUN_MARKER.touch()
if _ask_yes_no(text, "TG WS Proxy"):
_on_open_in_telegram()
def _has_ipv6_enabled() -> bool:
import socket as _sock
try:
addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6)
for addr in addrs:
ip = addr[4][0]
if ip and not ip.startswith('::1') and not ip.startswith('fe80::1'):
return True
except Exception:
pass
try:
s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM)
s.bind(('::1', 0))
s.close()
return True
except Exception:
return False
def _check_ipv6_warning():
_ensure_dirs()
def _check_ipv6_warning() -> None:
ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
if not _has_ipv6_enabled():
import socket as _sock
has = False
try:
for addr in _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6):
ip = addr[4][0]
if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"):
has = True
break
except Exception:
pass
if not has:
try:
s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM)
s.bind(("::1", 0))
s.close()
has = True
except Exception:
pass
if not has:
return
IPV6_WARN_MARKER.touch()
_show_info(
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает, попробуйте отключить "
"попытку соединения по IPv6 в настройках прокси Telegram.\n\n"
"Это предупреждение будет показано только один раз.")
"Это предупреждение будет показано только один раз."
)
# rumps menubar app
# rumps app
_TgWsProxyAppBase = rumps.App if rumps else object
@ -663,34 +478,25 @@ _TgWsProxyAppBase = rumps.App if rumps else object
class TgWsProxyApp(_TgWsProxyAppBase):
def __init__(self):
_ensure_menubar_icon()
icon_path = (str(MENUBAR_ICON_PATH)
if MENUBAR_ICON_PATH.exists() else None)
icon_path = str(MENUBAR_ICON_PATH) if MENUBAR_ICON_PATH.exists() else None
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
link_host = tg_ws_proxy.get_link_host(host)
self._open_tg_item = rumps.MenuItem(
f"Открыть в Telegram ({link_host}:{port})",
callback=_on_open_in_telegram)
self._restart_item = rumps.MenuItem(
"Перезапустить прокси",
callback=_on_restart)
self._settings_item = rumps.MenuItem(
"Настройки...",
callback=_on_edit_config)
self._logs_item = rumps.MenuItem(
"Открыть логи",
callback=_on_open_logs)
f"Открыть в Telegram ({link_host}:{port})", callback=_on_open_in_telegram
)
self._restart_item = rumps.MenuItem("Перезапустить прокси", callback=_on_restart)
self._settings_item = rumps.MenuItem("Настройки...", callback=_on_edit_config)
self._logs_item = rumps.MenuItem("Открыть логи", callback=_on_open_logs)
self._release_page_item = rumps.MenuItem(
"Страница релиза на GitHub…",
callback=_on_open_release_page)
"Страница релиза на GitHub…", callback=_on_open_release_page
)
self._check_updates_item = rumps.MenuItem(
_check_updates_menu_title(),
callback=_toggle_check_updates)
self._version_item = rumps.MenuItem(
f"Версия {__version__}",
callback=lambda _: None)
_check_updates_menu_title(), callback=_toggle_check_updates
)
self._version_item = rumps.MenuItem(f"Версия {__version__}", callback=lambda _: None)
super().__init__(
"TG WS Proxy",
@ -708,18 +514,20 @@ class TgWsProxyApp(_TgWsProxyAppBase):
self._check_updates_item,
None,
self._version_item,
])
],
)
def update_menu_title(self):
def update_menu_title(self) -> None:
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
link_host = tg_ws_proxy.get_link_host(host)
self._open_tg_item.title = (
f"Открыть в Telegram ({link_host}:{port})")
self._open_tg_item.title = f"Открыть в Telegram ({link_host}:{port})"
def run_menubar():
# entry point
def run_menubar() -> None:
global _app, _config
_config = load_config()
@ -731,26 +539,26 @@ def run_menubar():
except Exception:
pass
setup_logging(_config.get("verbose", False),
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]))
setup_logging(
_config.get("verbose", False),
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]),
)
log.info("TG WS Proxy версия %s, menubar app starting", __version__)
log.info("Config: %s", _config)
log.info("Log file: %s", LOG_FILE)
if rumps is None or Image is None:
log.error("rumps or Pillow not installed; running in console mode")
start_proxy()
_start_proxy()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop_proxy()
_stop_proxy()
return
start_proxy()
_start_proxy()
_maybe_notify_update_async()
_show_first_run()
_check_ipv6_warning()
@ -758,19 +566,18 @@ def run_menubar():
log.info("Menubar app running")
_app.run()
stop_proxy()
_stop_proxy()
log.info("Menubar app exited")
def main():
if not _acquire_lock():
def main() -> None:
if not acquire_lock("macos.py"):
_show_info("Приложение уже запущено.")
return
try:
run_menubar()
finally:
_release_lock()
release_lock()
if __name__ == "__main__":

View File

@ -1,8 +1,3 @@
"""
Общая светлая тема и фабрика окон CustomTkinter для tray-приложений (Windows / Linux).
Цвета и отступы задаются в одном месте правки темы не дублируются по платформам.
"""
from __future__ import annotations
import sys
@ -13,11 +8,7 @@ from typing import Any, Callable, Optional, Tuple
_tk_variable_del_guard_installed = False
def _install_tkinter_variable_del_guard() -> None:
"""
Убирает «Exception ignored» при выходе процесса: Tcl уже разрушен, а GC ещё
вызывает Variable.__del__ (StringVar и т.д.) напр. окно CTk в фоновом потоке.
"""
def install_tkinter_variable_del_guard() -> None:
global _tk_variable_del_guard_installed
if _tk_variable_del_guard_installed:
return
@ -32,7 +23,6 @@ def _install_tkinter_variable_del_guard() -> None:
tkinter.Variable.__del__ = _safe_variable_del # type: ignore[assignment]
_tk_variable_del_guard_installed = True
# Размеры и отступы (единые для диалогов настроек и первого запуска)
CONFIG_DIALOG_SIZE: Tuple[int, int] = (460, 560)
CONFIG_DIALOG_FRAME_PAD: Tuple[int, int] = (20, 14)
FIRST_RUN_SIZE: Tuple[int, int] = (520, 440)
@ -41,8 +31,6 @@ FIRST_RUN_FRAME_PAD: Tuple[int, int] = (28, 24)
@dataclass(frozen=True)
class CtkTheme:
"""Палитра Telegram-style и семейства шрифтов для UI и моноширинного текста."""
tg_blue: str = "#3390ec"
tg_blue_hover: str = "#2b7cd4"
bg: str = "#ffffff"
@ -71,34 +59,6 @@ def center_ctk_geometry(root: Any, width: int, height: int) -> None:
root.geometry(f"{width}x{height}+{(sw - width) // 2}+{(sh - height) // 2}")
def create_ctk_root(
ctk: Any,
*,
title: str,
width: int,
height: int,
theme: CtkTheme,
topmost: bool = True,
after_create: Optional[Callable[[Any], None]] = None,
) -> Any:
"""
Создаёт CTk: глобальная тема, заголовок, без ресайза, по центру экрана, фон из палитры.
after_create опционально: установка иконки окна (различается по ОС).
"""
_install_tkinter_variable_del_guard()
apply_ctk_appearance(ctk)
root = ctk.CTk()
root.title(title)
root.resizable(False, False)
if topmost:
root.attributes("-topmost", True)
center_ctk_geometry(root, width, height)
root.configure(fg_color=theme.bg)
if after_create:
after_create(root)
return root
def create_ctk_toplevel(
ctk: Any,
*,
@ -119,7 +79,17 @@ def create_ctk_toplevel(
root.lift()
root.focus_force()
if after_create:
after_create(root)
_after_id = root.after(300, lambda: after_create(root))
_orig_destroy = root.destroy
def _safe_destroy():
try:
root.after_cancel(_after_id)
except Exception:
pass
_orig_destroy()
root.destroy = _safe_destroy
return root

View File

@ -1,7 +1,3 @@
"""
Всплывающие подсказки для CustomTkinter / tk: задержка, Toplevel без рамки, wrap.
"""
from __future__ import annotations
import tkinter as tk
@ -9,8 +5,6 @@ from typing import Any, List, Optional
class CtkTooltip:
"""Показ текста при наведении на виджет."""
def __init__(
self,
widget: Any,
@ -31,6 +25,8 @@ class CtkTooltip:
widget.bind("<Destroy>", self._on_destroy, add="+")
def _schedule(self, _event: Any = None) -> None:
if self.widget is None:
return
self._cancel_after()
self._after_id = self.widget.after(self.delay_ms, self._show)
@ -89,6 +85,7 @@ class CtkTooltip:
def _on_destroy(self, _event: Any = None) -> None:
self._hide()
self.widget = None
def _is_windows() -> bool:
@ -104,11 +101,9 @@ def attach_ctk_tooltip(
delay_ms: int = 450,
wraplength: int = 320,
) -> None:
"""Повесить подсказку на виджет (CTk или tk)."""
CtkTooltip(widget, text, delay_ms=delay_ms, wraplength=wraplength)
def attach_tooltip_to_widgets(widgets: List[Any], text: str, **kwargs: Any) -> None:
"""Одна и та же подсказка на несколько виджетов (подпись + поле)."""
for w in widgets:
attach_ctk_tooltip(w, text, **kwargs)

View File

@ -1,8 +1,3 @@
"""
Общая разметка CustomTkinter для tray (Windows / Linux): настройки и первый запуск.
Логика сохранения и колбэки остаются в платформенных модулях.
"""
from __future__ import annotations
import os
@ -21,7 +16,6 @@ from ui.ctk_theme import (
)
from ui.ctk_tooltip import attach_ctk_tooltip, attach_tooltip_to_widgets
# Подсказки для формы настроек (новые пользователи)
_TIP_HOST = (
"Адрес, на котором прокси принимает подключения.\n"
"Обычно 127.0.0.1 — локальная сеть, 0.0.0.0 - все интерфейсы"
@ -30,9 +24,7 @@ _TIP_PORT = (
"Порт прокси. В Telegram Desktop в настройках прокси должен быть "
"указан тот же порт"
)
_TIP_SECRET = (
"Секретный ключ для авторизации клиентов\n"
)
_TIP_SECRET = "Секретный ключ для авторизации клиентов"
_TIP_DC = (
"Соответствие номера датацентра Telegram (DC) и IP-адреса сервера.\n"
"Каждая строка: «номер:IP», например 2:149.154.167.220. "
@ -57,14 +49,60 @@ _TIP_AUTOSTART = (
"Запускать TG WS Proxy при входе в Windows. "
"Если вы переместите программу в другую папку, автозапуск сбросится"
)
_TIP_CHECK_UPDATES = (
"При запуске проверять наличие обновлений"
)
_TIP_CHECK_UPDATES = "При запуске проверять наличие обновлений"
_TIP_SAVE = "Сохранить настройки"
_TIP_CANCEL = "Закрыть окно без сохранения изменений"
# Внутренняя ширина полей относительно ширины окна настроек (см. CONFIG_DIALOG_SIZE)
_CONFIG_FORM_INNER_WIDTH = 396
_INNER_W = 396
def _entry(ctk, parent, theme, *, var=None, width=0, height=36, radius=10, **kw):
opts = dict(
font=(theme.ui_font_family, 13), corner_radius=radius,
fg_color=theme.bg, border_color=theme.field_border,
border_width=1, text_color=theme.text_primary,
)
if var is not None:
opts["textvariable"] = var
if width:
opts["width"] = width
opts["height"] = height
opts.update(kw)
return ctk.CTkEntry(parent, **opts)
def _checkbox(ctk, parent, theme, text, variable):
return ctk.CTkCheckBox(
parent, text=text, variable=variable,
font=(theme.ui_font_family, 13), text_color=theme.text_primary,
fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover,
corner_radius=6, border_width=2, border_color=theme.field_border,
)
def _label(ctk, parent, theme, text, *, size=12, bold=False, secondary=True, **kw):
weight = "bold" if bold else "normal"
return ctk.CTkLabel(
parent, text=text,
font=(theme.ui_font_family, size, weight),
text_color=theme.text_secondary if secondary else theme.text_primary,
anchor="w", **kw,
)
def _labeled_entry(ctk, parent, theme, label_text, value, *, tip="", width=0, pack_fill=False):
col = ctk.CTkFrame(parent, fg_color="transparent")
lbl = _label(ctk, col, theme, label_text)
lbl.pack(anchor="w", pady=(0, 2))
var = ctk.StringVar(value=str(value))
ent = _entry(ctk, col, theme, var=var, width=width)
if pack_fill:
ent.pack(fill="x")
else:
ent.pack(anchor="w")
if tip:
attach_tooltip_to_widgets([lbl, ent, col], tip)
return col, var
def tray_settings_scroll_and_footer(
@ -72,10 +110,6 @@ def tray_settings_scroll_and_footer(
content_parent: Any,
theme: CtkTheme,
) -> Tuple[Any, Any]:
"""
Нижняя панель под кнопки и прокручиваемая область для формы (форма не обрезает кнопки).
Возвращает (scroll_frame, footer_frame).
"""
footer = ctk.CTkFrame(content_parent, fg_color=theme.bg)
footer.pack(side="bottom", fill="x")
scroll = ctk.CTkScrollableFrame(
@ -97,22 +131,12 @@ def _config_section(
*,
bottom_spacer: int = 6,
) -> Any:
"""Заголовок секции и карточка с рамкой для группировки полей."""
wrap = ctk.CTkFrame(parent, fg_color="transparent")
wrap.pack(fill="x", pady=(0, bottom_spacer))
ctk.CTkLabel(
wrap,
text=title,
font=(theme.ui_font_family, 12, "bold"),
text_color=theme.text_primary,
anchor="w",
).pack(anchor="w", pady=(0, 2))
_label(ctk, wrap, theme, title, secondary=False, bold=True).pack(anchor="w", pady=(0, 2))
card = ctk.CTkFrame(
wrap,
fg_color=theme.field_bg,
corner_radius=10,
border_width=1,
border_color=theme.field_border,
wrap, fg_color=theme.field_bg, corner_radius=10,
border_width=1, border_color=theme.field_border,
)
card.pack(fill="x")
inner = ctk.CTkFrame(card, fg_color="transparent")
@ -143,153 +167,67 @@ def install_tray_config_form(
show_autostart: bool = False,
autostart_value: bool = False,
) -> TrayConfigFormWidgets:
"""Поля настроек прокси внутри уже созданного `frame`."""
header = ctk.CTkFrame(frame, fg_color="transparent")
header.pack(fill="x", pady=(0, 2))
ctk.CTkLabel(
header,
text="Настройки прокси",
header, text="Настройки прокси",
font=(theme.ui_font_family, 17, "bold"),
text_color=theme.text_primary,
anchor="w",
text_color=theme.text_primary, anchor="w",
).pack(side="left")
ctk.CTkLabel(
header,
text=f"v{__version__}",
header, text=f"v{__version__}",
font=(theme.ui_font_family, 12),
text_color=theme.text_secondary,
anchor="e",
text_color=theme.text_secondary, anchor="e",
).pack(side="right")
inner_w = _CONFIG_FORM_INNER_WIDTH
conn = _config_section(ctk, frame, theme, "Подключение MTProto")
host_row = ctk.CTkFrame(conn, fg_color="transparent")
host_row.pack(fill="x")
host_col = ctk.CTkFrame(host_row, fg_color="transparent")
host_col, host_var = _labeled_entry(
ctk, host_row, theme, "IP-адрес",
cfg.get("host", default_config["host"]),
tip=_TIP_HOST, width=160, pack_fill=True,
)
host_col.pack(side="left", fill="x", expand=True, padx=(0, 10))
host_lbl = ctk.CTkLabel(
host_col,
text="IP-адрес",
font=(theme.ui_font_family, 12),
text_color=theme.text_secondary,
anchor="w",
)
host_lbl.pack(anchor="w", pady=(0, 2))
host_var = ctk.StringVar(value=cfg.get("host", default_config["host"]))
host_entry = ctk.CTkEntry(
host_col,
textvariable=host_var,
width=160,
height=36,
font=(theme.ui_font_family, 13),
corner_radius=10,
fg_color=theme.bg,
border_color=theme.field_border,
border_width=1,
text_color=theme.text_primary,
)
host_entry.pack(fill="x", pady=(0, 0))
attach_tooltip_to_widgets([host_lbl, host_entry, host_col], _TIP_HOST)
port_col = ctk.CTkFrame(host_row, fg_color="transparent")
port_col, port_var = _labeled_entry(
ctk, host_row, theme, "Порт",
cfg.get("port", default_config["port"]),
tip=_TIP_PORT, width=100,
)
port_col.pack(side="left")
port_lbl = ctk.CTkLabel(
port_col,
text="Порт",
font=(theme.ui_font_family, 12),
text_color=theme.text_secondary,
anchor="w",
)
port_lbl.pack(anchor="w", pady=(0, 2))
port_var = ctk.StringVar(value=str(cfg.get("port", default_config["port"])))
port_entry = ctk.CTkEntry(
port_col,
textvariable=port_var,
width=100,
height=36,
font=(theme.ui_font_family, 13),
corner_radius=10,
fg_color=theme.bg,
border_color=theme.field_border,
border_width=1,
text_color=theme.text_primary,
)
port_entry.pack(anchor="w")
attach_tooltip_to_widgets([port_lbl, port_entry, port_col], _TIP_PORT)
secret_row = ctk.CTkFrame(conn, fg_color="transparent")
secret_row.pack(fill="x")
secret_col = ctk.CTkFrame(secret_row, fg_color="transparent")
secret_col, secret_var = _labeled_entry(
ctk, secret_row, theme, "Secret",
cfg.get("secret", default_config["secret"]),
tip=_TIP_SECRET, width=160, pack_fill=True,
)
secret_col.pack(side="left", fill="x", expand=True, padx=(0, 10))
secret_lbl = ctk.CTkLabel(
secret_col,
text="Secret",
font=(theme.ui_font_family, 12),
text_color=theme.text_secondary,
anchor="w",
)
secret_lbl.pack(anchor="w", pady=(0, 2))
secret_var = ctk.StringVar(value=cfg.get("secret", default_config["secret"]))
secret_entry = ctk.CTkEntry(
secret_col,
textvariable=secret_var,
width=160,
height=36,
font=(theme.ui_font_family, 13),
corner_radius=10,
fg_color=theme.bg,
border_color=theme.field_border,
border_width=1,
text_color=theme.text_primary,
)
secret_entry.pack(fill="x", pady=(0, 0))
attach_tooltip_to_widgets([secret_lbl, secret_entry, secret_col], _TIP_SECRET)
regen_col = ctk.CTkFrame(secret_row, fg_color="transparent")
regen_col.pack(side="left", anchor="s")
ctk.CTkLabel(
regen_col,
text="",
font=(theme.ui_font_family, 12),
).pack(pady=(0, 2))
ctk.CTkLabel(regen_col, text="", font=(theme.ui_font_family, 12)).pack(pady=(0, 2))
ctk.CTkButton(
regen_col,
text="",
width=36,
height=36,
font=(theme.ui_font_family, 18),
corner_radius=10,
fg_color=theme.tg_blue,
hover_color=theme.tg_blue_hover,
text_color="#ffffff",
border_width=1,
border_color=theme.field_border,
regen_col, text="", width=36, height=36,
font=(theme.ui_font_family, 18), corner_radius=10,
fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover,
text_color="#ffffff", border_width=1, border_color=theme.field_border,
command=lambda: secret_var.set(os.urandom(16).hex()),
).pack()
dc_inner = _config_section(ctk, frame, theme, "Датацентры Telegram (DC → IP)")
dc_lbl = ctk.CTkLabel(
dc_inner,
text="По одному правилу на строку, формат: номер:IP",
font=(theme.ui_font_family, 11),
text_color=theme.text_secondary,
anchor="w",
)
dc_lbl = _label(ctk, dc_inner, theme, "По одному правилу на строку, формат: номер:IP", size=11)
dc_lbl.pack(anchor="w", pady=(0, 4))
dc_textbox = ctk.CTkTextbox(
dc_inner,
width=inner_w,
height=88,
font=(theme.mono_font_family, 12),
corner_radius=10,
fg_color=theme.bg,
border_color=theme.field_border,
border_width=1,
text_color=theme.text_primary,
dc_inner, width=_INNER_W, height=88,
font=(theme.mono_font_family, 12), corner_radius=10,
fg_color=theme.bg, border_color=theme.field_border,
border_width=1, text_color=theme.text_primary,
)
dc_textbox.pack(fill="x")
dc_textbox.insert("1.0", "\n".join(cfg.get("dc_ip", default_config["dc_ip"])))
@ -298,18 +236,7 @@ def install_tray_config_form(
log_inner = _config_section(ctk, frame, theme, "Логи и производительность")
verbose_var = ctk.BooleanVar(value=cfg.get("verbose", False))
verbose_cb = ctk.CTkCheckBox(
log_inner,
text="Подробное логирование (verbose)",
variable=verbose_var,
font=(theme.ui_font_family, 13),
text_color=theme.text_primary,
fg_color=theme.tg_blue,
hover_color=theme.tg_blue_hover,
corner_radius=6,
border_width=2,
border_color=theme.field_border,
)
verbose_cb = _checkbox(ctk, log_inner, theme, "Подробное логирование (verbose)", verbose_var)
verbose_cb.pack(anchor="w", pady=(0, 6))
attach_ctk_tooltip(verbose_cb, _TIP_VERBOSE)
@ -321,33 +248,17 @@ def install_tray_config_form(
("Пул WebSocket-сессий (по умолчанию 4)", "pool_size", _TIP_POOL),
("Макс. размер лога, МБ (по умолчанию 5)", "log_max_mb", _TIP_LOG_MB),
]
for lbl, key, tip in adv_rows:
col_frame = ctk.CTkFrame(adv_frame, fg_color="transparent")
col_frame.pack(fill="x", pady=(0, 0 if key == "log_max_mb" else 5))
adv_l = ctk.CTkLabel(
col_frame,
text=lbl,
font=(theme.ui_font_family, 11),
text_color=theme.text_secondary,
anchor="w",
)
for label_text, key, tip in adv_rows:
col = ctk.CTkFrame(adv_frame, fg_color="transparent")
col.pack(fill="x", pady=(0, 0 if key == "log_max_mb" else 5))
adv_l = _label(ctk, col, theme, label_text, size=11)
adv_l.pack(anchor="w", pady=(0, 2))
adv_e = ctk.CTkEntry(
col_frame,
width=inner_w,
height=32,
font=(theme.ui_font_family, 13),
corner_radius=8,
fg_color=theme.bg,
border_color=theme.field_border,
border_width=1,
text_color=theme.text_primary,
textvariable=ctk.StringVar(
value=str(cfg.get(key, default_config[key]))
),
adv_e = _entry(
ctk, col, theme, width=_INNER_W, height=32, radius=8,
textvariable=ctk.StringVar(value=str(cfg.get(key, default_config[key]))),
)
adv_e.pack(fill="x")
attach_tooltip_to_widgets([adv_l, adv_e, col_frame], tip)
attach_tooltip_to_widgets([adv_l, adv_e, col], tip)
adv_entries = list(adv_frame.winfo_children())
adv_keys = ("buf_kb", "pool_size", "log_max_mb")
@ -355,22 +266,9 @@ def install_tray_config_form(
upd_inner = _config_section(ctk, frame, theme, "Обновления")
st = get_status()
check_updates_var = ctk.BooleanVar(
value=bool(
cfg.get("check_updates", default_config.get("check_updates", True))
)
)
upd_cb = ctk.CTkCheckBox(
upd_inner,
text="Проверять обновления при запуске",
variable=check_updates_var,
font=(theme.ui_font_family, 13),
text_color=theme.text_primary,
fg_color=theme.tg_blue,
hover_color=theme.tg_blue_hover,
corner_radius=6,
border_width=2,
border_color=theme.field_border,
value=bool(cfg.get("check_updates", default_config.get("check_updates", True)))
)
upd_cb = _checkbox(ctk, upd_inner, theme, "Проверять обновления при запуске", check_updates_var)
upd_cb.pack(anchor="w", pady=(0, 6))
attach_ctk_tooltip(upd_cb, _TIP_CHECK_UPDATES)
@ -391,73 +289,38 @@ def install_tray_config_form(
else:
upd_status = "Установлена последняя известная версия с GitHub."
ctk.CTkLabel(
upd_inner,
text=upd_status,
font=(theme.ui_font_family, 11),
text_color=theme.text_secondary,
anchor="w",
justify="left",
wraplength=inner_w,
).pack(anchor="w", pady=(0, 8))
_label(ctk, upd_inner, theme, upd_status, size=11,
justify="left", wraplength=_INNER_W).pack(anchor="w", pady=(0, 8))
rel_url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
open_rel_btn = ctk.CTkButton(
upd_inner,
text="Открыть страницу релиза",
height=32,
font=(theme.ui_font_family, 13),
corner_radius=8,
fg_color=theme.field_bg,
hover_color=theme.field_border,
text_color=theme.text_primary,
border_width=1,
ctk.CTkButton(
upd_inner, text="Открыть страницу релиза", height=32,
font=(theme.ui_font_family, 13), corner_radius=8,
fg_color=theme.field_bg, hover_color=theme.field_border,
text_color=theme.text_primary, border_width=1,
border_color=theme.field_border,
command=lambda u=rel_url: webbrowser.open(u),
)
open_rel_btn.pack(anchor="w")
).pack(anchor="w")
autostart_var = None
if show_autostart:
sys_inner = _config_section(
ctk, frame, theme, "Запуск Windows", bottom_spacer=4
)
sys_inner = _config_section(ctk, frame, theme, "Запуск Windows", bottom_spacer=4)
autostart_var = ctk.BooleanVar(value=autostart_value)
as_cb = ctk.CTkCheckBox(
sys_inner,
text="Автозапуск при включении компьютера",
variable=autostart_var,
font=(theme.ui_font_family, 13),
text_color=theme.text_primary,
fg_color=theme.tg_blue,
hover_color=theme.tg_blue_hover,
corner_radius=6,
border_width=2,
border_color=theme.field_border,
)
as_cb = _checkbox(ctk, sys_inner, theme, "Автозапуск при включении компьютера", autostart_var)
as_cb.pack(anchor="w", pady=(0, 4))
as_hint = ctk.CTkLabel(
sys_inner,
text="Если переместить программу в другую папку, запись автозапуска может сброситься.",
font=(theme.ui_font_family, 11),
text_color=theme.text_secondary,
anchor="w",
justify="left",
wraplength=inner_w,
as_hint = _label(
ctk, sys_inner, theme,
"Если переместить программу в другую папку, запись автозапуска может сброситься.",
size=11, justify="left", wraplength=_INNER_W,
)
as_hint.pack(anchor="w")
attach_tooltip_to_widgets([as_cb, as_hint], _TIP_AUTOSTART)
return TrayConfigFormWidgets(
host_var=host_var,
port_var=port_var,
secret_var=secret_var,
dc_textbox=dc_textbox,
verbose_var=verbose_var,
adv_entries=adv_entries,
adv_keys=adv_keys,
autostart_var=autostart_var,
check_updates_var=check_updates_var,
host_var=host_var, port_var=port_var, secret_var=secret_var,
dc_textbox=dc_textbox, verbose_var=verbose_var,
adv_entries=adv_entries, adv_keys=adv_keys,
autostart_var=autostart_var, check_updates_var=check_updates_var,
)
@ -466,7 +329,6 @@ def merge_adv_from_form(
base: Dict[str, Any],
default_config: dict,
) -> None:
"""Дополняет base значениями buf_kb / pool_size / log_max_mb (in-place)."""
for i, key in enumerate(widgets.adv_keys):
col_frame = widgets.adv_entries[i]
entry = col_frame.winfo_children()[1]
@ -485,9 +347,6 @@ def validate_config_form(
*,
include_autostart: bool,
) -> Union[dict, str]:
"""
Возвращает словарь полей конфига или строку ошибки для показа пользователю.
"""
import socket as _sock
host_val = widgets.host_var.get().strip()
@ -578,9 +437,6 @@ def populate_first_run_window(
secret: str,
on_done: Callable[[bool], None],
) -> None:
"""
Содержимое окна первого запуска. on_done(open_in_telegram) по «Начать» и по закрытию окна.
"""
tg_url = f"tg://proxy?server={host}&port={port}&secret=dd{secret}"
fpx, fpy = FIRST_RUN_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
@ -620,12 +476,8 @@ def populate_first_run_window(
corner_radius=0).pack(fill="x", pady=(0, 12))
auto_var = ctk.BooleanVar(value=True)
ctk.CTkCheckBox(frame, text="Открыть прокси в Telegram сейчас",
variable=auto_var, font=(theme.ui_font_family, 13),
text_color=theme.text_primary,
fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover,
corner_radius=6, border_width=2,
border_color=theme.field_border).pack(anchor="w", pady=(0, 16))
_checkbox(ctk, frame, theme, "Открыть прокси в Telegram сейчас",
auto_var).pack(anchor="w", pady=(0, 16))
def on_ok():
on_done(auto_var.get())

View File

@ -21,7 +21,6 @@ _TRAY_DEFAULTS_COMMON: Dict[str, Any] = {
def default_tray_config() -> Dict[str, Any]:
"""Новая копия конфига по умолчанию для текущей ОС."""
cfg = dict(_TRAY_DEFAULTS_COMMON)
cfg["secret"] = os.urandom(16).hex()

460
utils/tray_common.py Normal file
View File

@ -0,0 +1,460 @@
from __future__ import annotations
import asyncio
import json
import logging
import logging.handlers
import os
import socket as _socket
import sys
import threading
import time
from pathlib import Path
from typing import Any, Callable, Dict, Optional, Tuple
import psutil
import proxy.tg_ws_proxy as tg_ws_proxy
from proxy import __version__
from utils.default_config import default_tray_config
log = logging.getLogger("tg-ws-tray")
APP_NAME = "TgWsProxy"
def _app_dir() -> Path:
if sys.platform == "win32":
return Path(os.environ.get("APPDATA", Path.home())) / APP_NAME
if sys.platform == "darwin":
return Path.home() / "Library" / "Application Support" / APP_NAME
return Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / APP_NAME
APP_DIR = _app_dir()
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
DEFAULT_CONFIG: Dict[str, Any] = default_tray_config()
IS_FROZEN = bool(getattr(sys, "frozen", False))
def ensure_dirs() -> None:
APP_DIR.mkdir(parents=True, exist_ok=True)
# single-instance lock
_lock_file_path: Optional[Path] = None
def _same_process(meta: dict, proc: psutil.Process, script_hint: str) -> bool:
try:
lock_ct = float(meta.get("create_time", 0.0))
if lock_ct > 0 and abs(lock_ct - proc.create_time()) > 1.0:
return False
except Exception:
return False
if IS_FROZEN:
return APP_NAME.lower() in proc.name().lower()
try:
for arg in proc.cmdline():
if script_hint in arg:
return True
except Exception:
pass
return False
def acquire_lock(script_hint: str = "") -> bool:
global _lock_file_path
ensure_dirs()
for f in list(APP_DIR.glob("*.lock")):
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
meta: dict = {}
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
pass
try:
if _same_process(meta, psutil.Process(pid), script_hint):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
lock_file.write_text(
json.dumps({"create_time": proc.create_time()}, ensure_ascii=False),
encoding="utf-8",
)
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
def release_lock() -> None:
global _lock_file_path
if _lock_file_path:
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
# config
def load_config() -> dict:
ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict) -> None:
ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
# logging
_LOG_FMT_FILE = "%(asctime)s %(levelname)-5s %(name)s %(message)s"
_LOG_FMT_CONSOLE = "%(asctime)s %(levelname)-5s %(message)s"
def setup_logging(verbose: bool = False, log_max_mb: float = 5) -> None:
ensure_dirs()
level = logging.DEBUG if verbose else logging.INFO
root = logging.getLogger()
root.setLevel(level)
fh = logging.handlers.RotatingFileHandler(
str(LOG_FILE),
maxBytes=max(32 * 1024, int(log_max_mb * 1024 * 1024)),
backupCount=0,
encoding="utf-8",
)
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(_LOG_FMT_FILE, datefmt="%Y-%m-%d %H:%M:%S"))
root.addHandler(fh)
if not IS_FROZEN:
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(level)
ch.setFormatter(logging.Formatter(_LOG_FMT_CONSOLE, datefmt="%H:%M:%S"))
root.addHandler(ch)
# icon
def make_icon_image(size: int = 64, *, color: Tuple[int, ...] = (0, 136, 204, 255)):
from PIL import Image, ImageDraw, ImageFont
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = 2
draw.ellipse([margin, margin, size - margin, size - margin], fill=color)
for path in _font_paths():
try:
font = ImageFont.truetype(path, size=int(size * 0.55))
break
except Exception:
continue
else:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
draw.text(
((size - tw) // 2 - bbox[0], (size - th) // 2 - bbox[1]),
"T",
fill=(255, 255, 255, 255),
font=font,
)
return img
def _font_paths():
if sys.platform == "win32":
return ["arial.ttf"]
if sys.platform == "darwin":
return ["/System/Library/Fonts/Helvetica.ttc"]
return [
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/TTF/DejaVuSans-Bold.ttf",
]
def load_icon():
from PIL import Image
icon_path = Path(__file__).parents[1] / "icon.ico"
if icon_path.exists():
try:
return Image.open(str(icon_path))
except Exception:
pass
return make_icon_image(64)
# proxy lifecycle
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[Tuple[asyncio.AbstractEventLoop, asyncio.Event]] = None
def _run_proxy_thread(on_port_busy: Callable[[str], None]) -> None:
global _async_stop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
stop_ev = asyncio.Event()
_async_stop = (loop, stop_ev)
try:
loop.run_until_complete(tg_ws_proxy._run(stop_event=stop_ev))
except Exception as exc:
log.error("Proxy thread crashed: %s", exc)
if "Address already in use" in str(exc) or "10048" in str(exc):
on_port_busy(
"Не удалось запустить прокси:\n"
"Порт уже используется другим приложением.\n\n"
"Закройте приложение, использующее этот порт, "
"или измените порт в настройках прокси и перезапустите."
)
finally:
loop.close()
_async_stop = None
def apply_proxy_config(cfg: dict) -> bool:
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
try:
dc_redirects = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
return False
pc = tg_ws_proxy.proxy_config
pc.port = cfg.get("port", DEFAULT_CONFIG["port"])
pc.host = cfg.get("host", DEFAULT_CONFIG["host"])
pc.secret = cfg.get("secret", DEFAULT_CONFIG["secret"])
pc.dc_redirects = dc_redirects
pc.buffer_size = max(4, cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])) * 1024
pc.pool_size = max(0, cfg.get("pool_size", DEFAULT_CONFIG["pool_size"]))
return True
def start_proxy(cfg: dict, on_error: Callable[[str], None]) -> None:
global _proxy_thread
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
if not apply_proxy_config(cfg):
on_error("Ошибка конфигурации DC → IP.")
return
pc = tg_ws_proxy.proxy_config
log.info("Starting proxy on %s:%d ...", pc.host, pc.port)
_proxy_thread = threading.Thread(
target=_run_proxy_thread, args=(on_error,), daemon=True, name="proxy"
)
_proxy_thread.start()
def stop_proxy() -> None:
global _proxy_thread, _async_stop
if _async_stop:
loop, stop_ev = _async_stop
loop.call_soon_threadsafe(stop_ev.set)
if _proxy_thread:
_proxy_thread.join(timeout=5)
_proxy_thread = None
log.info("Proxy stopped")
def restart_proxy(cfg: dict, on_error: Callable[[str], None]) -> None:
log.info("Restarting proxy...")
stop_proxy()
time.sleep(0.3)
start_proxy(cfg, on_error)
def tg_proxy_url(cfg: dict) -> str:
host = cfg.get("host", DEFAULT_CONFIG["host"])
port = cfg.get("port", DEFAULT_CONFIG["port"])
secret = cfg.get("secret", DEFAULT_CONFIG["secret"])
link_host = tg_ws_proxy.get_link_host(host)
return f"tg://proxy?server={link_host}&port={port}&secret=dd{secret}"
_IPV6_WARNING = (
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает или в логах присутствуют ошибки, "
"связанные с попытками подключения по IPv6 - "
"попробуйте отключить в настройках прокси Telegram попытку соединения "
"по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 "
"в системе.\n\n"
"Это предупреждение будет показано только один раз."
)
def _has_ipv6() -> bool:
try:
for addr in _socket.getaddrinfo(_socket.gethostname(), None, _socket.AF_INET6):
ip = addr[4][0]
if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"):
return True
except Exception:
pass
try:
s = _socket.socket(_socket.AF_INET6, _socket.SOCK_STREAM)
s.bind(("::1", 0))
s.close()
return True
except Exception:
return False
def check_ipv6_warning(show_info: Callable[[str, str], None]) -> None:
ensure_dirs()
if IPV6_WARN_MARKER.exists() or not _has_ipv6():
return
IPV6_WARN_MARKER.touch()
threading.Thread(
target=lambda: show_info(_IPV6_WARNING, "TG WS Proxy"),
daemon=True,
).start()
# update check
def maybe_notify_update(
cfg: dict,
is_exiting: Callable[[], bool],
ask_open: Callable[[str, str], bool],
) -> None:
if not cfg.get("check_updates", True):
return
def _work():
time.sleep(1.5)
if is_exiting():
return
try:
from utils.update_check import RELEASES_PAGE_URL, get_status, run_check
import webbrowser
run_check(__version__)
st = get_status()
if not st.get("has_update"):
return
url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ver = st.get("latest") or "?"
if ask_open(
f"Доступна новая версия: {ver}\n\nОткрыть страницу релиза в браузере?",
"TG WS Proxy — обновление",
):
webbrowser.open(url)
except Exception as exc:
log.debug("Update check failed: %s", exc)
threading.Thread(target=_work, daemon=True, name="update-check").start()
# ctk thread (windows / linux)
_ctk_root: Any = None
_ctk_root_ready = threading.Event()
def ensure_ctk_thread(ctk: Any) -> bool:
global _ctk_root
if ctk is None:
return False
if _ctk_root_ready.is_set():
return True
def _run():
global _ctk_root
from ui.ctk_theme import apply_ctk_appearance, install_tkinter_variable_del_guard
install_tkinter_variable_del_guard()
apply_ctk_appearance(ctk)
_ctk_root = ctk.CTk()
_ctk_root.withdraw()
_ctk_root_ready.set()
_ctk_root.mainloop()
threading.Thread(target=_run, daemon=True, name="ctk-root").start()
_ctk_root_ready.wait(timeout=5.0)
return _ctk_root is not None
def ctk_run_dialog(build_fn: Callable[[threading.Event], None]) -> None:
if _ctk_root is None:
return
done = threading.Event()
def _invoke():
try:
build_fn(done)
except Exception:
log.exception("CTk dialog failed")
done.set()
_ctk_root.after(0, _invoke)
done.wait()
import gc
gc.collect()
def quit_ctk() -> None:
if _ctk_root is not None:
try:
_ctk_root.after(0, _ctk_root.quit)
except Exception:
pass
# common bootstrap
def bootstrap(cfg: dict) -> None:
save_config(cfg)
if LOG_FILE.exists():
try:
LOG_FILE.unlink()
except Exception:
pass
setup_logging(
cfg.get("verbose", False),
log_max_mb=cfg.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]),
)
log.info("TG WS Proxy версия %s starting", __version__)
log.info("Config: %s", cfg)
log.info("Log file: %s", LOG_FILE)

View File

@ -1,20 +1,14 @@
from __future__ import annotations
import ctypes
import ipaddress
import json
import logging
import logging.handlers
import os
import winreg
import psutil
import sys
import threading
import time
import webbrowser
import asyncio as _asyncio
import winreg
from pathlib import Path
from typing import Dict, Optional
from typing import Optional
try:
import pyperclip
@ -32,201 +26,62 @@ except ImportError:
ctk = None
try:
from PIL import Image, ImageDraw, ImageFont
from PIL import Image
except ImportError:
Image = ImageDraw = ImageFont = None
Image = None
import proxy.tg_ws_proxy as tg_ws_proxy
from proxy.tg_ws_proxy import proxy_config
from proxy import __version__
from utils.default_config import default_tray_config
from utils.tray_common import (
APP_NAME, DEFAULT_CONFIG, FIRST_RUN_MARKER, IS_FROZEN, LOG_FILE,
acquire_lock, bootstrap, check_ipv6_warning, ctk_run_dialog,
ensure_ctk_thread, ensure_dirs, load_config, load_icon, log,
maybe_notify_update, quit_ctk, release_lock, restart_proxy,
save_config, start_proxy, stop_proxy, tg_proxy_url,
)
from ui.ctk_tray_ui import (
install_tray_config_buttons,
install_tray_config_form,
populate_first_run_window,
tray_settings_scroll_and_footer,
install_tray_config_buttons, install_tray_config_form,
populate_first_run_window, tray_settings_scroll_and_footer,
validate_config_form,
)
from ui.ctk_theme import (
CONFIG_DIALOG_FRAME_PAD,
CONFIG_DIALOG_SIZE,
FIRST_RUN_SIZE,
create_ctk_toplevel,
ctk_theme_for_platform,
main_content_frame,
CONFIG_DIALOG_FRAME_PAD, CONFIG_DIALOG_SIZE, FIRST_RUN_SIZE,
create_ctk_toplevel, ctk_theme_for_platform, main_content_frame,
)
IS_FROZEN = bool(getattr(sys, "frozen", False))
APP_NAME = "TgWsProxy"
APP_DIR = Path(os.environ.get("APPDATA", Path.home())) / APP_NAME
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
DEFAULT_CONFIG = default_tray_config()
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[object] = None
_tray_icon: Optional[object] = None
_config: dict = {}
_exiting: bool = False
_lock_file_path: Optional[Path] = None
_exiting = False
_ctk_root = None
_ctk_root_ready = threading.Event()
ICON_PATH = str(Path(__file__).parent / "icon.ico")
log = logging.getLogger("tg-ws-tray")
# win32 dialogs
_user32 = ctypes.windll.user32
_user32.MessageBoxW.argtypes = [
ctypes.c_void_p,
ctypes.c_wchar_p,
ctypes.c_wchar_p,
ctypes.c_uint,
]
_user32.MessageBoxW.restype = ctypes.c_int
_u32 = ctypes.windll.user32
_u32.MessageBoxW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint]
_u32.MessageBoxW.restype = ctypes.c_int
_MB_OK_ERR = 0x10
_MB_OK_INFO = 0x40
_MB_YESNO_Q = 0x24
_IDYES = 6
def _same_process(lock_meta: dict, proc: psutil.Process) -> bool:
try:
lock_ct = float(lock_meta.get("create_time", 0.0))
proc_ct = float(proc.create_time())
if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0:
return False
except Exception:
return False
try:
for arg in proc.cmdline():
if "windows.py" in arg:
return True
except Exception:
pass
frozen = bool(getattr(sys, "frozen", False))
if frozen:
return (
os.path.basename(sys.executable).lower() == proc.name().lower()
)
return False
def _show_error(text: str, title: str = "TG WS Proxy — Ошибка") -> None:
_u32.MessageBoxW(None, text, title, _MB_OK_ERR)
def _release_lock():
global _lock_file_path
if not _lock_file_path:
return
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
def _show_info(text: str, title: str = "TG WS Proxy") -> None:
_u32.MessageBoxW(None, text, title, _MB_OK_INFO)
def _acquire_lock() -> bool:
global _lock_file_path
_ensure_dirs()
lock_files = list(APP_DIR.glob("*.lock"))
for f in lock_files:
pid = None
meta: dict = {}
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
meta = {}
try:
proc = psutil.Process(pid)
if _same_process(meta, proc):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
payload = {
"create_time": proc.create_time(),
}
lock_file.write_text(json.dumps(payload, ensure_ascii=False),
encoding="utf-8")
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
return _u32.MessageBoxW(None, text, title, _MB_YESNO_Q) == _IDYES
def _ensure_dirs():
APP_DIR.mkdir(parents=True, exist_ok=True)
# autostart (registry)
def load_config() -> dict:
_ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict):
_ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
def setup_logging(verbose: bool = False, log_max_mb: float = 5):
_ensure_dirs()
root = logging.getLogger()
root.setLevel(logging.DEBUG if verbose else logging.INFO)
fh = logging.handlers.RotatingFileHandler(
str(LOG_FILE),
maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024),
backupCount=0,
encoding='utf-8',
)
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"))
root.addHandler(fh)
if not getattr(sys, "frozen", False):
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG if verbose else logging.INFO)
ch.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(message)s",
datefmt="%H:%M:%S"))
root.addHandler(ch)
def _autostart_reg_name() -> str:
return APP_NAME
_RUN_KEY = r"Software\Microsoft\Windows\CurrentVersion\Run"
def _supports_autostart() -> bool:
@ -239,297 +94,94 @@ def _autostart_command() -> str:
def is_autostart_enabled() -> bool:
try:
with winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
0,
winreg.KEY_READ,
) as k:
val, _ = winreg.QueryValueEx(k, _autostart_reg_name())
stored = str(val).strip()
expected = _autostart_command().strip()
return stored == expected
except FileNotFoundError:
return False
except OSError:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, _RUN_KEY, 0, winreg.KEY_READ) as k:
val, _ = winreg.QueryValueEx(k, APP_NAME)
return str(val).strip() == _autostart_command().strip()
except (FileNotFoundError, OSError):
return False
def set_autostart_enabled(enabled: bool) -> None:
try:
with winreg.CreateKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
) as k:
with winreg.CreateKey(winreg.HKEY_CURRENT_USER, _RUN_KEY) as k:
if enabled:
winreg.SetValueEx(
k,
_autostart_reg_name(),
0,
winreg.REG_SZ,
_autostart_command(),
)
winreg.SetValueEx(k, APP_NAME, 0, winreg.REG_SZ, _autostart_command())
else:
try:
winreg.DeleteValue(k, _autostart_reg_name())
winreg.DeleteValue(k, APP_NAME)
except FileNotFoundError:
pass
except OSError as exc:
log.error("Failed to update autostart: %s", exc)
_show_error(
"Не удалось изменить автозапуск.\n\n"
"Попробуйте запустить приложение от имени пользователя с правами на реестр.\n\n"
f"Ошибка: {exc}"
"Попробуйте запустить приложение от имени пользователя "
f"с правами на реестр.\n\nОшибка: {exc}"
)
def _make_icon_image(size: int = 64):
if Image is None:
raise RuntimeError("Pillow is required for tray icon")
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = 2
draw.ellipse([margin, margin, size - margin, size - margin],
fill=(0, 136, 204, 255))
try:
font = ImageFont.truetype("arial.ttf", size=int(size * 0.55))
except Exception:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
tx = (size - tw) // 2 - bbox[0]
ty = (size - th) // 2 - bbox[1]
draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font)
# tray callbacks
return img
def _load_icon():
icon_path = Path(__file__).parent / "icon.ico"
if icon_path.exists() and Image:
try:
return Image.open(str(icon_path))
except Exception:
pass
return _make_icon_image()
def _run_proxy_thread():
global _async_stop
loop = _asyncio.new_event_loop()
_asyncio.set_event_loop(loop)
stop_ev = _asyncio.Event()
_async_stop = (loop, stop_ev)
try:
loop.run_until_complete(
tg_ws_proxy._run(stop_event=stop_ev))
except Exception as exc:
log.error("Proxy thread crashed: %s", exc)
if "10048" in str(exc) or "Address already in use" in str(exc):
_show_error("Не удалось запустить прокси:\nПорт уже используется другим приложением.\n\nЗакройте приложение, использующее этот порт, или измените порт в настройках прокси и перезапустите.")
finally:
loop.close()
_async_stop = None
def start_proxy():
global _proxy_thread, _config
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
cfg = _config
port = cfg.get("port", DEFAULT_CONFIG["port"])
host = cfg.get("host", DEFAULT_CONFIG["host"])
secret = cfg.get("secret", DEFAULT_CONFIG["secret"])
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])
pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])
try:
dc_redirects = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
_show_error(f"Ошибка конфигурации:\n{e}")
return
proxy_config.port = port
proxy_config.host = host
proxy_config.secret = secret
proxy_config.dc_redirects = dc_redirects
proxy_config.buffer_size = max(4, buf_kb) * 1024
proxy_config.pool_size = max(0, pool_size)
log.info("Starting proxy on %s:%d ...", host, port)
_proxy_thread = threading.Thread(
target=_run_proxy_thread,
daemon=True, name="proxy")
_proxy_thread.start()
def stop_proxy():
global _proxy_thread, _async_stop
if _async_stop:
loop, stop_ev = _async_stop
loop.call_soon_threadsafe(stop_ev.set)
if _proxy_thread:
_proxy_thread.join(timeout=5)
if _proxy_thread.is_alive():
log.warning(
"Proxy thread did not finish within timeout; "
"the process may still exit shortly")
_proxy_thread = None
log.info("Proxy stopped")
def restart_proxy():
log.info("Restarting proxy...")
stop_proxy()
time.sleep(0.3)
start_proxy()
def _show_error(text: str, title: str = "TG WS Proxy — Ошибка"):
_user32.MessageBoxW(None, text, title, 0x10)
def _show_info(text: str, title: str = "TG WS Proxy"):
_user32.MessageBoxW(None, text, title, 0x40)
def _ask_open_release_page(latest_version: str, url: str) -> bool:
"""Win32 Yes/No: открыть страницу релиза."""
MB_YESNO = 0x4
MB_ICONQUESTION = 0x20
IDYES = 6
text = (
f"Доступна новая версия: {latest_version}\n\n"
f"Открыть страницу релиза в браузере?"
)
r = _user32.MessageBoxW(
None,
text,
"TG WS Proxy — обновление",
MB_YESNO | MB_ICONQUESTION,
)
return r == IDYES
def _maybe_notify_update_async():
"""
Фоновая проверка GitHub Releases и уведомление (не блокирует трей).
"""
def _work():
time.sleep(1.5)
if _exiting:
return
if not _config.get("check_updates", True):
return
try:
from utils.update_check import RELEASES_PAGE_URL, get_status, run_check
run_check(__version__)
st = get_status()
if not st.get("has_update"):
return
url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ver = st.get("latest") or "?"
if _ask_open_release_page(str(ver), url):
webbrowser.open(url)
except Exception as exc:
log.debug("Update check failed: %s", exc)
threading.Thread(target=_work, daemon=True, name="update-check").start()
def _ensure_ctk_thread() -> bool:
"""Start the persistent hidden CTk root in its own thread (once)."""
global _ctk_root
if ctk is None:
return False
if _ctk_root_ready.is_set():
return True
def _run():
global _ctk_root
from ui.ctk_theme import (
apply_ctk_appearance,
_install_tkinter_variable_del_guard,
)
_install_tkinter_variable_del_guard()
apply_ctk_appearance(ctk)
_ctk_root = ctk.CTk()
_ctk_root.withdraw()
_ctk_root_ready.set()
_ctk_root.mainloop()
threading.Thread(target=_run, daemon=True, name="ctk-root").start()
_ctk_root_ready.wait(timeout=5.0)
return _ctk_root is not None
def _ctk_run_dialog(build_fn) -> None:
"""Schedule build_fn(done_event) on the CTk thread and block until done_event is set."""
if _ctk_root is None:
return
done = threading.Event()
def _invoke():
try:
build_fn(done)
except Exception:
log.exception("CTk dialog failed")
done.set()
_ctk_root.after(0, _invoke)
done.wait()
def _on_open_in_telegram(icon=None, item=None):
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
secret = _config.get("secret", DEFAULT_CONFIG["secret"])
url = f"tg://proxy?server={host}&port={port}&secret=dd{secret}"
def _on_open_in_telegram(icon=None, item=None) -> None:
url = tg_proxy_url(_config)
log.info("Opening %s", url)
try:
result = webbrowser.open(url)
if not result:
raise RuntimeError("webbrowser.open returned False")
if not webbrowser.open(url):
raise RuntimeError
except Exception:
log.info("Browser open failed, copying to clipboard")
if pyperclip is None:
_show_error(
"Не удалось открыть Telegram автоматически.\n\n"
f"Установите пакет pyperclip для копирования в буфер или откройте вручную:\n{url}")
f"Установите пакет pyperclip для копирования в буфер или откройте вручную:\n{url}"
)
return
try:
pyperclip.copy(url)
_show_info(
f"Не удалось открыть Telegram автоматически.\n\n"
f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}",
"TG WS Proxy")
"Не удалось открыть Telegram автоматически.\n\n"
f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}"
)
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_restart(icon=None, item=None):
threading.Thread(target=restart_proxy, daemon=True).start()
def _on_restart(icon=None, item=None) -> None:
threading.Thread(
target=lambda: restart_proxy(_config, _show_error), daemon=True
).start()
def _on_edit_config(icon=None, item=None):
def _on_edit_config(icon=None, item=None) -> None:
threading.Thread(target=_edit_config_dialog, daemon=True).start()
def _edit_config_dialog():
if not _ensure_ctk_thread():
def _on_open_logs(icon=None, item=None) -> None:
log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists():
os.startfile(str(LOG_FILE))
else:
_show_info("Файл логов ещё не создан.")
def _on_exit(icon=None, item=None) -> None:
global _exiting
if _exiting:
os._exit(0)
return
_exiting = True
log.info("User requested exit")
quit_ctk()
threading.Thread(target=lambda: (time.sleep(3), os._exit(0)), daemon=True, name="force-exit").start()
if icon:
icon.stop()
# settings dialog
def _edit_config_dialog() -> None:
if not ensure_ctk_thread(ctk):
_show_error("customtkinter не установлен.")
return
@ -538,113 +190,64 @@ def _edit_config_dialog():
if _supports_autostart() and not cfg["autostart"]:
set_autostart_enabled(False)
def _build(done: threading.Event):
def _build(done: threading.Event) -> None:
theme = ctk_theme_for_platform()
w, h = CONFIG_DIALOG_SIZE
if _supports_autostart():
h += 100
icon_path = str(Path(__file__).parent / "icon.ico")
root = create_ctk_toplevel(
ctk,
title="TG WS Proxy — Настройки",
width=w,
height=h,
theme=theme,
after_create=lambda r: r.iconbitmap(icon_path),
ctk, title="TG WS Proxy — Настройки", width=w, height=h, theme=theme,
after_create=lambda r: r.iconbitmap(ICON_PATH),
)
fpx, fpy = CONFIG_DIALOG_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
scroll, footer = tray_settings_scroll_and_footer(ctk, frame, theme)
widgets = install_tray_config_form(
ctk,
scroll,
theme,
cfg,
DEFAULT_CONFIG,
ctk, scroll, theme, cfg, DEFAULT_CONFIG,
show_autostart=_supports_autostart(),
autostart_value=cfg.get("autostart", False),
)
def _finish():
def _finish() -> None:
root.destroy()
done.set()
def on_save():
merged = validate_config_form(
widgets,
DEFAULT_CONFIG,
include_autostart=_supports_autostart(),
)
def on_save() -> None:
merged = validate_config_form(widgets, DEFAULT_CONFIG, include_autostart=_supports_autostart())
if isinstance(merged, str):
_show_error(merged)
return
new_cfg = merged
save_config(new_cfg)
_config.update(new_cfg)
log.info("Config saved: %s", new_cfg)
save_config(merged)
_config.update(merged)
log.info("Config saved: %s", merged)
if _supports_autostart():
set_autostart_enabled(bool(new_cfg.get("autostart", False)))
set_autostart_enabled(bool(merged.get("autostart", False)))
_tray_icon.menu = _build_menu()
from tkinter import messagebox
do_restart = messagebox.askyesno(
"Перезапустить?",
"Настройки сохранены.\n\nПерезапустить прокси сейчас?",
parent=root)
parent=root,
)
_finish()
if do_restart:
threading.Thread(
target=restart_proxy, daemon=True).start()
threading.Thread(target=lambda: restart_proxy(_config, _show_error), daemon=True).start()
def on_cancel():
_finish()
root.protocol("WM_DELETE_WINDOW", _finish)
install_tray_config_buttons(ctk, footer, theme, on_save=on_save, on_cancel=_finish)
root.protocol("WM_DELETE_WINDOW", on_cancel)
install_tray_config_buttons(
ctk, footer, theme, on_save=on_save, on_cancel=on_cancel)
_ctk_run_dialog(_build)
ctk_run_dialog(_build)
def _on_open_logs(icon=None, item=None):
log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists():
os.startfile(str(LOG_FILE))
else:
_show_info("Файл логов ещё не создан.", "TG WS Proxy")
# first run
def _on_exit(icon=None, item=None):
global _exiting
if _exiting:
os._exit(0)
return
_exiting = True
log.info("User requested exit")
if _ctk_root is not None:
try:
_ctk_root.after(0, _ctk_root.quit)
except Exception:
pass
def _force_exit():
time.sleep(3)
os._exit(0)
threading.Thread(target=_force_exit, daemon=True, name="force-exit").start()
if icon:
icon.stop()
def _show_first_run():
_ensure_dirs()
def _show_first_run() -> None:
ensure_dirs()
if FIRST_RUN_MARKER.exists():
return
if not _ensure_ctk_thread():
if not ensure_ctk_thread(ctk):
FIRST_RUN_MARKER.touch()
return
@ -652,84 +255,27 @@ def _show_first_run():
port = _config.get("port", DEFAULT_CONFIG["port"])
secret = _config.get("secret", DEFAULT_CONFIG["secret"])
def _build(done: threading.Event):
def _build(done: threading.Event) -> None:
theme = ctk_theme_for_platform()
icon_path = str(Path(__file__).parent / "icon.ico")
w, h = FIRST_RUN_SIZE
root = create_ctk_toplevel(
ctk,
title="TG WS Proxy",
width=w,
height=h,
theme=theme,
after_create=lambda r: r.iconbitmap(icon_path),
ctk, title="TG WS Proxy", width=w, height=h, theme=theme,
after_create=lambda r: r.iconbitmap(ICON_PATH),
)
def on_done(open_tg: bool):
def on_done(open_tg: bool) -> None:
FIRST_RUN_MARKER.touch()
root.destroy()
done.set()
if open_tg:
_on_open_in_telegram()
populate_first_run_window(
ctk, root, theme, host=host, port=port, secret=secret,
on_done=on_done)
populate_first_run_window(ctk, root, theme, host=host, port=port, secret=secret, on_done=on_done)
_ctk_run_dialog(_build)
ctk_run_dialog(_build)
def _has_ipv6_enabled() -> bool:
import socket as _sock
try:
addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6)
for addr in addrs:
ip = addr[4][0]
if not ip or ip.startswith("::1"):
continue
try:
if ipaddress.IPv6Address(ip).is_link_local:
continue
except ValueError:
if ip.startswith("fe80:"):
continue
return True
except Exception:
pass
try:
s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM)
s.bind(('::1', 0))
s.close()
return True
except Exception:
return False
def _check_ipv6_warning():
_ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
if not _has_ipv6_enabled():
return
IPV6_WARN_MARKER.touch()
threading.Thread(target=_show_ipv6_dialog, daemon=True).start()
def _show_ipv6_dialog():
_show_info(
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает или в логах присутствуют ошибки, "
"связанные с попытками подключения по IPv6 - "
"попробуйте отключить в настройках прокси Telegram попытку соединения "
"по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 "
"в системе.\n\n"
"Это предупреждение будет показано только один раз.",
"TG WS Proxy")
# tray menu
def _build_menu():
if pystray is None:
@ -737,12 +283,8 @@ def _build_menu():
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
link_host = tg_ws_proxy.get_link_host(host)
return pystray.Menu(
pystray.MenuItem(
f"Открыть в Telegram ({link_host}:{port})",
_on_open_in_telegram,
default=True),
pystray.MenuItem(f"Открыть в Telegram ({link_host}:{port})", _on_open_in_telegram, default=True),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Перезапустить прокси", _on_restart),
pystray.MenuItem("Настройки...", _on_edit_config),
@ -752,29 +294,17 @@ def _build_menu():
)
def run_tray():
# entry point
def run_tray() -> None:
global _tray_icon, _config
_config = load_config()
save_config(_config)
if LOG_FILE.exists():
try:
LOG_FILE.unlink()
except Exception:
pass
setup_logging(_config.get("verbose", False),
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]))
log.info("TG WS Proxy версия %s, tray app starting", __version__)
log.info("Config: %s", _config)
log.info("Log file: %s", LOG_FILE)
bootstrap(_config)
if pystray is None or Image is None or ctk is None:
log.error(
"pystray, Pillow or customtkinter not installed; "
"running in console mode")
start_proxy()
log.error("pystray, Pillow or customtkinter not installed; running in console mode")
start_proxy(_config, _show_error)
try:
while True:
time.sleep(1)
@ -782,20 +312,12 @@ def run_tray():
stop_proxy()
return
start_proxy()
_maybe_notify_update_async()
start_proxy(_config, _show_error)
maybe_notify_update(_config, lambda: _exiting, _ask_yes_no)
_show_first_run()
_check_ipv6_warning()
icon_image = _load_icon()
_tray_icon = pystray.Icon(
APP_NAME,
icon_image,
"TG WS Proxy",
menu=_build_menu())
check_ipv6_warning(_show_info)
_tray_icon = pystray.Icon(APP_NAME, load_icon(), "TG WS Proxy", menu=_build_menu())
log.info("Tray icon running")
_tray_icon.run()
@ -803,15 +325,14 @@ def run_tray():
log.info("Tray app exited")
def main():
if not _acquire_lock():
def main() -> None:
if not acquire_lock("windows.py"):
_show_info("Приложение уже запущено.", os.path.basename(sys.argv[0]))
return
try:
run_tray()
finally:
_release_lock()
release_lock()
if __name__ == "__main__":