Compare commits

..

No commits in common. "main" and "v1.3.0" have entirely different histories.
main ... v1.3.0

16 changed files with 2398 additions and 1881 deletions

View File

@ -303,7 +303,7 @@ jobs:
Maintainer: Flowseal
Depends: libgtk-3-0, libayatana-appindicator3-1, python3-tk
Description: Telegram Desktop WebSocket Bridge Proxy
MTProto/WebSocket bridge proxy for Telegram Desktop with tray UI.
SOCKS5/WebSocket bridge proxy for Telegram Desktop with tray UI.
EOF
dpkg-deb --build --root-owner-group \

View File

@ -23,7 +23,7 @@ ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PATH=/opt/venv/bin:$PATH \
TG_WS_PROXY_HOST=0.0.0.0 \
TG_WS_PROXY_PORT=1443 \
TG_WS_PROXY_PORT=1080 \
TG_WS_PROXY_DC_IPS="2:149.154.167.220 4:149.154.167.220"
RUN apt-get update \
@ -39,7 +39,7 @@ COPY README.md LICENSE ./
USER app
EXPOSE 1443/tcp
EXPOSE 1080/tcp
ENTRYPOINT ["/usr/bin/tini", "--", "/bin/sh", "-lc", "set -eu; args=\"--host ${TG_WS_PROXY_HOST} --port ${TG_WS_PROXY_PORT}\"; for dc in ${TG_WS_PROXY_DC_IPS}; do args=\"$args --dc-ip $dc\"; done; exec python -u proxy/tg_ws_proxy.py $args \"$@\"", "--"]
CMD []

View File

@ -12,17 +12,17 @@
# TG WS Proxy
**Локальный MTProto-прокси** для Telegram Desktop, который **ускоряет работу Telegram**, перенаправляя трафик через WebSocket-соединения. Данные передаются в том же зашифрованном виде, а для работы не нужны сторонние сервера.
**Локальный SOCKS5-прокси** для Telegram Desktop, который **ускоряет работу Telegram**, перенаправляя трафик через WebSocket-соединения. Данные передаются в том же зашифрованном виде, а для работы не нужны сторонние сервера.
<img width="529" height="487" alt="image" src="https://github.com/user-attachments/assets/6a4cf683-0df8-43af-86c1-0e8f08682b62" />
## Как это работает
```
Telegram Desktop → MTProto Proxy (127.0.0.1:1443) → WebSocket → Telegram DC
Telegram Desktop → SOCKS5 (127.0.0.1:1080) → TG WS Proxy → WSS → Telegram DC
```
1. Приложение поднимает MTProto прокси на `127.0.0.1:1443`
1. Приложение поднимает локальный SOCKS5-прокси на `127.0.0.1:1080`
2. Перехватывает подключения к IP-адресам Telegram
3. Извлекает DC ID из MTProto obfuscation init-пакета
4. Устанавливает WebSocket (TLS) соединение к соответствующему DC через домены Telegram
@ -38,7 +38,7 @@ Telegram Desktop → MTProto Proxy (127.0.0.1:1443) → WebSocket → Telegram D
**Меню трея:**
- **Открыть в Telegram** — автоматически настроить прокси через `tg://proxy` ссылку
- **Открыть в Telegram** — автоматически настроить прокси через `tg://socks` ссылку
- **Перезапустить прокси** — перезапуск без выхода из приложения
- **Настройки...** — GUI-редактор конфигурации (в т.ч. версия приложения, опциональная проверка обновлений с GitHub)
- **Открыть логи** — открыть файл логов
@ -69,9 +69,8 @@ makepkg -si
# При помощи AUR-helper
paru -S tg-ws-proxy-bin
# Если вы установили -cli пакет, то запуск осуществляется через systemctl, где 8888 это номер порта,
# разделитель ":" и secret, который можно сгенерировать командой: openssl rand -hex 16
sudo systemctl start tg-ws-proxy-cli@8888:3075abe65830f0325116bb0416cadf9f
# Если вы установили -cli пакет, то запуск осуществляется через systemctl, где 8888 это номер порта прокси:
sudo systemctl start tg-ws-proxy-cli@8888
```
Для остальных дистрибутивов можно использовать **`TgWsProxy_linux_amd64`** (бинарный файл для x86_64).
@ -87,7 +86,7 @@ chmod +x TgWsProxy_linux_amd64
### Консольный proxy
Для запуска только proxy без tray-интерфейса достаточно базовой установки:
Для запуска только SOCKS5/WebSocket proxy без tray-интерфейса достаточно базовой установки:
```bash
pip install -e .
@ -125,15 +124,9 @@ tg-ws-proxy [--port PORT] [--host HOST] [--dc-ip DC:IP ...] [-v]
| Аргумент | По умолчанию | Описание |
|---|---|---|
| `--port` | `1443` | Порт прокси |
| `--host` | `127.0.0.1` | Хост прокси |
| `--secret` | `random` | 32 hex chars secret для авторизации клиентов |
| `--port` | `1080` | Порт SOCKS5-прокси |
| `--host` | `127.0.0.1` | Хост SOCKS5-прокси |
| `--dc-ip` | `2:149.154.167.220`, `4:149.154.167.220` | Целевой IP для DC (можно указать несколько раз) |
| `--buf-kb` | `256` | Размер буфера в КБ
| `--pool-size` | `4` | Количество заготовленных соединений на каждый DC
| `--log-file` | выкл. | Путь до файла, в который сохранять логи
| `--log-max-mb` | `5` | Максимальный размер файла логов в МБ (после идёт перезапись)
| `--log-backups` | `0` | Количество сохранений логов после перезаписи
| `-v`, `--verbose` | выкл. | Подробное логирование (DEBUG) |
**Примеры:**
@ -173,10 +166,10 @@ tg-ws-proxy-tray-linux = "linux:main"
1. Telegram → **Настройки****Продвинутые настройки****Тип подключения** → **Прокси**
2. Добавить прокси:
- **Тип:** MTProto
- **Сервер:** `127.0.0.1` (или переопределенный вами)
- **Порт:** `1443` (или переопределенный вами)
- **Secret:** из настроек или логов
- **Тип:** SOCKS5
- **Сервер:** `127.0.0.1`
- **Порт:** `1080`
- **Логин/Пароль:** оставить пустыми
## Конфигурация
@ -189,8 +182,7 @@ Tray-приложение хранит данные в:
```json
{
"host": "127.0.0.1",
"port": 1443,
"secret": "...",
"port": 1080,
"dc_ip": [
"2:149.154.167.220",
"4:149.154.167.220"

BIN
icon.ico

Binary file not shown.

Before

Width:  |  Height:  |  Size: 473 B

After

Width:  |  Height:  |  Size: 22 KiB

710
linux.py
View File

@ -1,44 +1,350 @@
from __future__ import annotations
import asyncio as _asyncio
import json
import logging
import logging.handlers
import os
import subprocess
import sys
import threading
import webbrowser
import time
from typing import Optional
from pathlib import Path
from typing import Dict, Optional
import customtkinter as ctk
import psutil
import pyperclip
import pystray
from PIL import Image, ImageTk
from PIL import Image, ImageDraw, ImageFont
import proxy.tg_ws_proxy as tg_ws_proxy
from utils.tray_common import (
APP_NAME, DEFAULT_CONFIG, FIRST_RUN_MARKER, LOG_FILE,
acquire_lock, bootstrap, check_ipv6_warning, ctk_run_dialog,
ensure_ctk_thread, ensure_dirs, load_config, load_icon, log,
maybe_notify_update, quit_ctk, release_lock, restart_proxy,
save_config, start_proxy, stop_proxy, tg_proxy_url,
)
from proxy import __version__
from utils.default_config import default_tray_config
from ui.ctk_tray_ui import (
install_tray_config_buttons, install_tray_config_form,
populate_first_run_window, tray_settings_scroll_and_footer,
install_tray_config_buttons,
install_tray_config_form,
populate_first_run_window,
tray_settings_scroll_and_footer,
validate_config_form,
)
from ui.ctk_theme import (
CONFIG_DIALOG_FRAME_PAD, CONFIG_DIALOG_SIZE, FIRST_RUN_SIZE,
create_ctk_toplevel, ctk_theme_for_platform, main_content_frame,
CONFIG_DIALOG_FRAME_PAD,
CONFIG_DIALOG_SIZE,
FIRST_RUN_SIZE,
create_ctk_root,
ctk_theme_for_platform,
main_content_frame,
)
APP_NAME = "TgWsProxy"
APP_DIR = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / APP_NAME
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
DEFAULT_CONFIG = default_tray_config()
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[object] = None
_tray_icon: Optional[object] = None
_config: dict = {}
_exiting = False
_exiting: bool = False
_lock_file_path: Optional[Path] = None
# dialogs (tkinter messagebox)
log = logging.getLogger("tg-ws-tray")
def _msgbox(kind: str, text: str, title: str, **kw):
def _same_process(lock_meta: dict, proc: psutil.Process) -> bool:
try:
lock_ct = float(lock_meta.get("create_time", 0.0))
proc_ct = float(proc.create_time())
if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0:
return False
except Exception:
return False
try:
cmdline = proc.cmdline()
for arg in cmdline:
if "linux.py" in arg:
return True
except Exception:
pass
frozen = bool(getattr(sys, "frozen", False))
if frozen:
return APP_NAME.lower() in proc.name().lower()
return False
def _release_lock():
global _lock_file_path
if not _lock_file_path:
return
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
def _acquire_lock() -> bool:
global _lock_file_path
_ensure_dirs()
lock_files = list(APP_DIR.glob("*.lock"))
for f in lock_files:
pid = None
meta: dict = {}
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
meta = {}
try:
proc = psutil.Process(pid)
if _same_process(meta, proc):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
payload = {
"create_time": proc.create_time(),
}
lock_file.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
def _ensure_dirs():
APP_DIR.mkdir(parents=True, exist_ok=True)
def load_config() -> dict:
_ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict):
_ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
def setup_logging(verbose: bool = False, log_max_mb: float = 5):
_ensure_dirs()
root = logging.getLogger()
root.setLevel(logging.DEBUG if verbose else logging.INFO)
fh = logging.handlers.RotatingFileHandler(
str(LOG_FILE),
maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024),
backupCount=0,
encoding='utf-8',
)
fh.setLevel(logging.DEBUG)
fh.setFormatter(
logging.Formatter(
"%(asctime)s %(levelname)-5s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
root.addHandler(fh)
if not getattr(sys, "frozen", False):
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG if verbose else logging.INFO)
ch.setFormatter(
logging.Formatter(
"%(asctime)s %(levelname)-5s %(message)s", datefmt="%H:%M:%S"
)
)
root.addHandler(ch)
def _make_icon_image(size: int = 64):
if Image is None:
raise RuntimeError("Pillow is required for tray icon")
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = 2
draw.ellipse(
[margin, margin, size - margin, size - margin], fill=(0, 136, 204, 255)
)
try:
font = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
size=int(size * 0.55),
)
except Exception:
try:
font = ImageFont.truetype(
"/usr/share/fonts/TTF/DejaVuSans-Bold.ttf", size=int(size * 0.55)
)
except Exception:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
tx = (size - tw) // 2 - bbox[0]
ty = (size - th) // 2 - bbox[1]
draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font)
return img
def _load_icon():
icon_path = Path(__file__).parent / "icon.ico"
if icon_path.exists() and Image:
try:
return Image.open(str(icon_path))
except Exception:
pass
return _make_icon_image()
def _apply_linux_ctk_window_icon(root) -> None:
"""PhotoImage храним на root — иначе GC может убрать картинку до закрытия окна."""
icon_img = _load_icon()
if icon_img:
from PIL import ImageTk
root._ctk_icon_photo = ImageTk.PhotoImage(icon_img.resize((64, 64)))
root.iconphoto(False, root._ctk_icon_photo)
def _run_proxy_thread(
port: int, dc_opt: Dict[int, str], verbose: bool, host: str = "127.0.0.1"
):
global _async_stop
loop = _asyncio.new_event_loop()
_asyncio.set_event_loop(loop)
stop_ev = _asyncio.Event()
_async_stop = (loop, stop_ev)
try:
loop.run_until_complete(
tg_ws_proxy._run(port, dc_opt, stop_event=stop_ev, host=host)
)
except Exception as exc:
log.error("Proxy thread crashed: %s", exc)
if "Address already in use" in str(exc):
_show_error(
"Не удалось запустить прокси:\nПорт уже используется другим приложением.\n\nЗакройте приложение, использующее этот порт, или измените порт в настройках прокси и перезапустите."
)
finally:
loop.close()
_async_stop = None
def start_proxy():
global _proxy_thread, _config
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
cfg = _config
port = cfg.get("port", DEFAULT_CONFIG["port"])
host = cfg.get("host", DEFAULT_CONFIG["host"])
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
verbose = cfg.get("verbose", False)
try:
dc_opt = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
_show_error(f"Ошибка конфигурации:\n{e}")
return
log.info("Starting proxy on %s:%d ...", host, port)
buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])
pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])
tg_ws_proxy._RECV_BUF = max(4, buf_kb) * 1024
tg_ws_proxy._SEND_BUF = tg_ws_proxy._RECV_BUF
tg_ws_proxy._WS_POOL_SIZE = max(0, pool_size)
_proxy_thread = threading.Thread(
target=_run_proxy_thread,
args=(port, dc_opt, verbose, host),
daemon=True,
name="proxy",
)
_proxy_thread.start()
def stop_proxy():
global _proxy_thread, _async_stop
if _async_stop:
loop, stop_ev = _async_stop
loop.call_soon_threadsafe(stop_ev.set)
if _proxy_thread:
_proxy_thread.join(timeout=2)
_proxy_thread = None
log.info("Proxy stopped")
def restart_proxy():
log.info("Restarting proxy...")
stop_proxy()
time.sleep(0.3)
start_proxy()
def _show_error(text: str, title: str = "TG WS Proxy — Ошибка"):
import tkinter as _tk
from tkinter import messagebox as _mb
root = _tk.Tk()
root.withdraw()
_mb.showerror(title, text, parent=root)
root.destroy()
def _show_info(text: str, title: str = "TG WS Proxy"):
import tkinter as _tk
from tkinter import messagebox as _mb
root = _tk.Tk()
root.withdraw()
_mb.showinfo(title, text, parent=root)
root.destroy()
def _ask_yes_no_dialog(text: str, title: str = "TG WS Proxy") -> bool:
import tkinter as _tk
from tkinter import messagebox as _mb
@ -48,189 +354,273 @@ def _msgbox(kind: str, text: str, title: str, **kw):
root.attributes("-topmost", True)
except Exception:
pass
result = getattr(_mb, kind)(title, text, parent=root, **kw)
r = _mb.askyesno(title, text, parent=root)
root.destroy()
return result
return bool(r)
def _show_error(text: str, title: str = "TG WS Proxy — Ошибка") -> None:
_msgbox("showerror", text, title)
def _maybe_notify_update_async():
def _work():
time.sleep(1.5)
if _exiting:
return
if not _config.get("check_updates", True):
return
try:
from utils.update_check import RELEASES_PAGE_URL, get_status, run_check
run_check(__version__)
st = get_status()
if not st.get("has_update"):
return
url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ver = st.get("latest") or "?"
text = (
f"Доступна новая версия: {ver}\n\n"
f"Открыть страницу релиза в браузере?"
)
if _ask_yes_no_dialog(text, "TG WS Proxy — обновление"):
webbrowser.open(url)
except Exception as exc:
log.debug("Update check failed: %s", exc)
threading.Thread(target=_work, daemon=True, name="update-check").start()
def _show_info(text: str, title: str = "TG WS Proxy") -> None:
_msgbox("showinfo", text, title)
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
return bool(_msgbox("askyesno", text, title))
def _apply_window_icon(root) -> None:
icon_img = load_icon()
if icon_img:
root._ctk_icon_photo = ImageTk.PhotoImage(icon_img.resize((64, 64)))
root.iconphoto(False, root._ctk_icon_photo)
# tray callbacks
def _on_open_in_telegram(icon=None, item=None) -> None:
url = tg_proxy_url(_config)
def _on_open_in_telegram(icon=None, item=None):
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
url = f"tg://socks?server={host}&port={port}"
log.info("Copying %s", url)
try:
pyperclip.copy(url)
_show_info(
f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}"
f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}",
"TG WS Proxy",
)
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_copy_link(icon=None, item=None) -> None:
url = tg_proxy_url(_config)
log.info("Copying link: %s", url)
try:
pyperclip.copy(url)
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_restart(icon=None, item=None):
threading.Thread(target=restart_proxy, daemon=True).start()
def _on_restart(icon=None, item=None) -> None:
threading.Thread(
target=lambda: restart_proxy(_config, _show_error), daemon=True
).start()
def _on_edit_config(icon=None, item=None) -> None:
def _on_edit_config(icon=None, item=None):
threading.Thread(target=_edit_config_dialog, daemon=True).start()
def _on_open_logs(icon=None, item=None) -> None:
def _edit_config_dialog():
if ctk is None:
_show_error("customtkinter не установлен.")
return
cfg = dict(_config)
theme = ctk_theme_for_platform()
w, h = CONFIG_DIALOG_SIZE
root = create_ctk_root(
ctk,
title="TG WS Proxy — Настройки",
width=w,
height=h,
theme=theme,
after_create=_apply_linux_ctk_window_icon,
)
fpx, fpy = CONFIG_DIALOG_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
scroll, footer = tray_settings_scroll_and_footer(ctk, frame, theme)
widgets = install_tray_config_form(
ctk, scroll, theme, cfg, DEFAULT_CONFIG,
show_autostart=False,
)
def on_save():
merged = validate_config_form(
widgets, DEFAULT_CONFIG, include_autostart=False)
if isinstance(merged, str):
_show_error(merged)
return
new_cfg = merged
save_config(new_cfg)
_config.update(new_cfg)
log.info("Config saved: %s", new_cfg)
_tray_icon.menu = _build_menu()
from tkinter import messagebox
if messagebox.askyesno(
"Перезапустить?",
"Настройки сохранены.\n\nПерезапустить прокси сейчас?",
parent=root,
):
root.destroy()
restart_proxy()
else:
root.destroy()
def on_cancel():
root.destroy()
install_tray_config_buttons(
ctk, footer, theme, on_save=on_save, on_cancel=on_cancel)
try:
root.mainloop()
finally:
import tkinter as tk
try:
if root.winfo_exists():
root.destroy()
except tk.TclError:
pass
def _on_open_logs(icon=None, item=None):
log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists():
env = {k: v for k, v in os.environ.items() if k not in ("VIRTUAL_ENV", "PYTHONPATH", "PYTHONHOME")}
env = os.environ.copy()
env.pop("VIRTUAL_ENV", None)
env.pop("PYTHONPATH", None)
env.pop("PYTHONHOME", None)
subprocess.Popen(
["xdg-open", str(LOG_FILE)], env=env,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL, start_new_session=True,
["xdg-open", str(LOG_FILE)],
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
start_new_session=True,
)
else:
_show_info("Файл логов ещё не создан.")
_show_info("Файл логов ещё не создан.", "TG WS Proxy")
def _on_exit(icon=None, item=None) -> None:
def _on_exit(icon=None, item=None):
global _exiting
if _exiting:
os._exit(0)
return
_exiting = True
log.info("User requested exit")
quit_ctk()
threading.Thread(target=lambda: (time.sleep(3), os._exit(0)), daemon=True, name="force-exit").start()
def _force_exit():
time.sleep(3)
os._exit(0)
threading.Thread(target=_force_exit, daemon=True, name="force-exit").start()
if icon:
icon.stop()
# settings dialog
def _edit_config_dialog() -> None:
if not ensure_ctk_thread(ctk):
_show_error("customtkinter не установлен.")
return
cfg = dict(_config)
def _build(done: threading.Event) -> None:
theme = ctk_theme_for_platform()
w, h = CONFIG_DIALOG_SIZE
root = create_ctk_toplevel(
ctk, title="TG WS Proxy — Настройки", width=w, height=h, theme=theme,
after_create=_apply_window_icon,
)
fpx, fpy = CONFIG_DIALOG_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
scroll, footer = tray_settings_scroll_and_footer(ctk, frame, theme)
widgets = install_tray_config_form(ctk, scroll, theme, cfg, DEFAULT_CONFIG, show_autostart=False)
def _finish() -> None:
root.destroy()
done.set()
def on_save() -> None:
from tkinter import messagebox
merged = validate_config_form(widgets, DEFAULT_CONFIG, include_autostart=False)
if isinstance(merged, str):
messagebox.showerror("TG WS Proxy — Ошибка", merged, parent=root)
return
save_config(merged)
_config.update(merged)
log.info("Config saved: %s", merged)
_tray_icon.menu = _build_menu()
do_restart = messagebox.askyesno(
"Перезапустить?",
"Настройки сохранены.\n\nПерезапустить прокси сейчас?",
parent=root,
)
_finish()
if do_restart:
threading.Thread(target=lambda: restart_proxy(_config, _show_error), daemon=True).start()
root.protocol("WM_DELETE_WINDOW", _finish)
install_tray_config_buttons(ctk, footer, theme, on_save=on_save, on_cancel=_finish)
ctk_run_dialog(_build)
# first run
def _show_first_run() -> None:
ensure_dirs()
def _show_first_run():
_ensure_dirs()
if FIRST_RUN_MARKER.exists():
return
if not ensure_ctk_thread(ctk):
FIRST_RUN_MARKER.touch()
return
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
secret = _config.get("secret", DEFAULT_CONFIG["secret"])
def _build(done: threading.Event) -> None:
if ctk is None:
FIRST_RUN_MARKER.touch()
return
theme = ctk_theme_for_platform()
w, h = FIRST_RUN_SIZE
root = create_ctk_toplevel(
ctk, title="TG WS Proxy", width=w, height=h, theme=theme,
after_create=_apply_window_icon,
root = create_ctk_root(
ctk,
title="TG WS Proxy",
width=w,
height=h,
theme=theme,
after_create=_apply_linux_ctk_window_icon,
)
def on_done(open_tg: bool) -> None:
def on_done(open_tg: bool):
FIRST_RUN_MARKER.touch()
root.destroy()
done.set()
if open_tg:
_on_open_in_telegram()
populate_first_run_window(ctk, root, theme, host=host, port=port, secret=secret, on_done=on_done)
populate_first_run_window(
ctk, root, theme, host=host, port=port, on_done=on_done)
ctk_run_dialog(_build)
try:
root.mainloop()
finally:
import tkinter as tk
try:
if root.winfo_exists():
root.destroy()
except tk.TclError:
pass
# tray menu
def _has_ipv6_enabled() -> bool:
import socket as _sock
try:
addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6)
for addr in addrs:
ip = addr[4][0]
if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"):
return True
except Exception:
pass
try:
s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM)
s.bind(("::1", 0))
s.close()
return True
except Exception:
return False
def _check_ipv6_warning():
_ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
if not _has_ipv6_enabled():
return
IPV6_WARN_MARKER.touch()
threading.Thread(target=_show_ipv6_dialog, daemon=True).start()
def _show_ipv6_dialog():
_show_info(
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает или в логах присутствуют ошибки, "
"связанные с попытками подключения по IPv6 - "
"попробуйте отключить в настройках прокси Telegram попытку соединения "
"по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 "
"в системе.\n\n"
"Это предупреждение будет показано только один раз.",
"TG WS Proxy",
)
def _build_menu():
if pystray is None:
return None
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
link_host = tg_ws_proxy.get_link_host(host)
return pystray.Menu(
pystray.MenuItem(f"Открыть в Telegram ({link_host}:{port})", _on_open_in_telegram, default=True),
pystray.MenuItem("Скопировать ссылку", _on_copy_link),
pystray.MenuItem(
f"Открыть в Telegram ({host}:{port})", _on_open_in_telegram, default=True
),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Перезапустить прокси", _on_restart),
pystray.MenuItem("Настройки...", _on_edit_config),
@ -240,18 +630,27 @@ def _build_menu():
)
# entry point
def run_tray() -> None:
def run_tray():
global _tray_icon, _config
_config = load_config()
bootstrap(_config)
save_config(_config)
if LOG_FILE.exists():
try:
LOG_FILE.unlink()
except Exception:
pass
setup_logging(_config.get("verbose", False),
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]))
log.info("TG WS Proxy версия %s, tray app starting", __version__)
log.info("Config: %s", _config)
log.info("Log file: %s", LOG_FILE)
if pystray is None or Image is None:
log.error("pystray or Pillow not installed; running in console mode")
start_proxy(_config, _show_error)
start_proxy()
try:
while True:
time.sleep(1)
@ -259,12 +658,16 @@ def run_tray() -> None:
stop_proxy()
return
start_proxy(_config, _show_error)
maybe_notify_update(_config, lambda: _exiting, _ask_yes_no)
_show_first_run()
check_ipv6_warning(_show_info)
start_proxy()
_maybe_notify_update_async()
_show_first_run()
_check_ipv6_warning()
icon_image = _load_icon()
_tray_icon = pystray.Icon(APP_NAME, icon_image, "TG WS Proxy", menu=_build_menu())
_tray_icon = pystray.Icon(APP_NAME, load_icon(), "TG WS Proxy", menu=_build_menu())
log.info("Tray icon running")
_tray_icon.run()
@ -272,14 +675,15 @@ def run_tray() -> None:
log.info("Tray app exited")
def main() -> None:
if not acquire_lock("linux.py"):
def main():
if not _acquire_lock():
_show_info("Приложение уже запущено.", os.path.basename(sys.argv[0]))
return
try:
run_tray()
finally:
release_lock()
_release_lock()
if __name__ == "__main__":

588
macos.py
View File

@ -1,13 +1,18 @@
from __future__ import annotations
import json
import logging
import logging.handlers
import os
import psutil
import subprocess
import sys
import threading
import time
import webbrowser
import asyncio as _asyncio
from pathlib import Path
from typing import Optional
from typing import Dict, Optional
try:
import rumps
@ -26,134 +31,257 @@ except ImportError:
import proxy.tg_ws_proxy as tg_ws_proxy
from proxy import __version__
from utils.default_config import default_tray_config
from utils.tray_common import (
APP_DIR, APP_NAME, DEFAULT_CONFIG, FIRST_RUN_MARKER, IPV6_WARN_MARKER,
LOG_FILE, acquire_lock, apply_proxy_config, ensure_dirs, load_config,
log, release_lock, save_config, setup_logging, stop_proxy, tg_proxy_url,
)
APP_NAME = "TgWsProxy"
APP_DIR = Path.home() / "Library" / "Application Support" / APP_NAME
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
MENUBAR_ICON_PATH = APP_DIR / "menubar_icon.png"
DEFAULT_CONFIG = default_tray_config()
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[object] = None
_app: Optional[object] = None
_config: dict = {}
_exiting: bool = False
_lock_file_path: Optional[Path] = None
# osascript dialogs
log = logging.getLogger("tg-ws-tray")
def _esc(text: str) -> str:
return text.replace("\\", "\\\\").replace('"', '\\"')
# Single-instance lock
def _osascript(script: str) -> str:
r = subprocess.run(["osascript", "-e", script], capture_output=True, text=True)
return r.stdout.strip()
def _show_error(text: str, title: str = "TG WS Proxy") -> None:
_osascript(
f'display dialog "{_esc(text)}" with title "{_esc(title)}" '
f'buttons {{"OK"}} default button "OK" with icon stop'
)
def _show_info(text: str, title: str = "TG WS Proxy") -> None:
_osascript(
f'display dialog "{_esc(text)}" with title "{_esc(title)}" '
f'buttons {{"OK"}} default button "OK" with icon note'
)
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
return _ask_yes_no_close(text, title) is True
def _ask_yes_no_close(text: str, title: str = "TG WS Proxy") -> Optional[bool]:
r = subprocess.run(
[
"osascript", "-e",
f'button returned of (display dialog "{_esc(text)}" '
f'with title "{_esc(title)}" '
f'buttons {{"Закрыть", "Нет", "Да"}} '
f'default button "Да" cancel button "Закрыть" with icon note)',
],
capture_output=True, text=True,
)
if r.returncode != 0:
return None
btn = r.stdout.strip()
if btn == "Да":
return True
if btn == "Нет":
def _same_process(lock_meta: dict, proc: psutil.Process) -> bool:
try:
lock_ct = float(lock_meta.get("create_time", 0.0))
proc_ct = float(proc.create_time())
if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0:
return False
except Exception:
return False
frozen = bool(getattr(sys, "frozen", False))
if frozen:
return APP_NAME.lower() in proc.name().lower()
return False
return None
def _osascript_input(prompt: str, default: str, title: str = "TG WS Proxy") -> Optional[str]:
r = subprocess.run(
[
"osascript", "-e",
f'text returned of (display dialog "{_esc(prompt)}" '
f'default answer "{_esc(default)}" '
f'with title "{_esc(title)}" '
f'buttons {{"Закрыть", "OK"}} '
f'default button "OK" cancel button "Закрыть")',
],
capture_output=True, text=True,
def _release_lock():
global _lock_file_path
if not _lock_file_path:
return
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
def _acquire_lock() -> bool:
global _lock_file_path
_ensure_dirs()
lock_files = list(APP_DIR.glob("*.lock"))
for f in lock_files:
pid = None
meta: dict = {}
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
meta = {}
try:
proc = psutil.Process(pid)
if _same_process(meta, proc):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
payload = {"create_time": proc.create_time()}
lock_file.write_text(json.dumps(payload, ensure_ascii=False),
encoding="utf-8")
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
# Filesystem helpers
def _ensure_dirs():
APP_DIR.mkdir(parents=True, exist_ok=True)
def load_config() -> dict:
_ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict):
_ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
def setup_logging(verbose: bool = False, log_max_mb: float = 5):
_ensure_dirs()
root = logging.getLogger()
root.setLevel(logging.DEBUG if verbose else logging.INFO)
fh = logging.handlers.RotatingFileHandler(
str(LOG_FILE),
maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024),
backupCount=0,
encoding='utf-8',
)
if r.returncode != 0:
return None
return r.stdout.rstrip("\r\n")
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"))
root.addHandler(fh)
if not getattr(sys, "frozen", False):
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG if verbose else logging.INFO)
ch.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(message)s",
datefmt="%H:%M:%S"))
root.addHandler(ch)
# menubar icon
# Menubar icon
def _make_menubar_icon(size: int = 44):
if Image is None:
return None
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = size // 11
draw.ellipse([margin, margin, size - margin, size - margin], fill=(0, 0, 0, 255))
draw.ellipse([margin, margin, size - margin, size - margin],
fill=(0, 0, 0, 255))
try:
font = ImageFont.truetype("/System/Library/Fonts/Helvetica.ttc", size=int(size * 0.55))
font = ImageFont.truetype(
"/System/Library/Fonts/Helvetica.ttc",
size=int(size * 0.55))
except Exception:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
draw.text(
((size - tw) // 2 - bbox[0], (size - th) // 2 - bbox[1]),
"T", fill=(255, 255, 255, 255), font=font,
)
tx = (size - tw) // 2 - bbox[0]
ty = (size - th) // 2 - bbox[1]
draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font)
return img
def _ensure_menubar_icon() -> None:
# Generate menubar icon PNG if it does not exist.
def _ensure_menubar_icon():
if MENUBAR_ICON_PATH.exists():
return
ensure_dirs()
_ensure_dirs()
img = _make_menubar_icon(44)
if img:
img.save(str(MENUBAR_ICON_PATH), "PNG")
# proxy lifecycle (macOS-local)
# Native macOS dialogs
import asyncio as _asyncio
def _escape_osascript_text(text: str) -> str:
return text.replace('\\', '\\\\').replace('"', '\\"')
def _run_proxy_thread() -> None:
def _osascript(script: str) -> str:
r = subprocess.run(
['osascript', '-e', script],
capture_output=True, text=True)
return r.stdout.strip()
def _show_error(text: str, title: str = "TG WS Proxy"):
text_esc = _escape_osascript_text(text)
title_esc = _escape_osascript_text(title)
_osascript(
f'display dialog "{text_esc}" with title "{title_esc}" '
f'buttons {{"OK"}} default button "OK" with icon stop')
def _show_info(text: str, title: str = "TG WS Proxy"):
text_esc = _escape_osascript_text(text)
title_esc = _escape_osascript_text(title)
_osascript(
f'display dialog "{text_esc}" with title "{title_esc}" '
f'buttons {{"OK"}} default button "OK" with icon note')
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
result = _ask_yes_no_close(text, title)
return result is True
def _ask_yes_no_close(text: str,
title: str = "TG WS Proxy") -> Optional[bool]:
text_esc = _escape_osascript_text(text)
title_esc = _escape_osascript_text(title)
r = subprocess.run(
['osascript', '-e',
f'button returned of (display dialog "{text_esc}" '
f'with title "{title_esc}" '
f'buttons {{"Закрыть", "Нет", "Да"}} '
f'default button "Да" cancel button "Закрыть" with icon note)'],
capture_output=True, text=True)
if r.returncode != 0:
return None
result = r.stdout.strip()
if result == "Да":
return True
if result == "Нет":
return False
return None
# Proxy lifecycle
def _run_proxy_thread(port: int, dc_opt: Dict[int, str], verbose: bool,
host: str = '127.0.0.1'):
global _async_stop
loop = _asyncio.new_event_loop()
_asyncio.set_event_loop(loop)
stop_ev = _asyncio.Event()
_async_stop = (loop, stop_ev)
try:
loop.run_until_complete(tg_ws_proxy._run(stop_event=stop_ev))
loop.run_until_complete(
tg_ws_proxy._run(port, dc_opt, stop_event=stop_ev, host=host))
except Exception as exc:
log.error("Proxy thread crashed: %s", exc)
if "Address already in use" in str(exc):
@ -161,28 +289,47 @@ def _run_proxy_thread() -> None:
"Не удалось запустить прокси:\n"
"Порт уже используется другим приложением.\n\n"
"Закройте приложение, использующее этот порт, "
"или измените порт в настройках прокси и перезапустите."
)
"или измените порт в настройках прокси и перезапустите.")
finally:
loop.close()
_async_stop = None
def _start_proxy() -> None:
global _proxy_thread
def start_proxy():
global _proxy_thread, _config
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
if not apply_proxy_config(_config):
_show_error("Ошибка конфигурации DC → IP.")
cfg = _config
port = cfg.get("port", DEFAULT_CONFIG["port"])
host = cfg.get("host", DEFAULT_CONFIG["host"])
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
verbose = cfg.get("verbose", False)
try:
dc_opt = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
_show_error(f"Ошибка конфигурации:\n{e}")
return
pc = tg_ws_proxy.proxy_config
log.info("Starting proxy on %s:%d ...", pc.host, pc.port)
_proxy_thread = threading.Thread(target=_run_proxy_thread, daemon=True, name="proxy")
log.info("Starting proxy on %s:%d ...", host, port)
buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])
pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])
tg_ws_proxy._RECV_BUF = max(4, buf_kb) * 1024
tg_ws_proxy._SEND_BUF = tg_ws_proxy._RECV_BUF
tg_ws_proxy._WS_POOL_SIZE = max(0, pool_size)
_proxy_thread = threading.Thread(
target=_run_proxy_thread,
args=(port, dc_opt, verbose, host),
daemon=True, name="proxy")
_proxy_thread.start()
def _stop_proxy() -> None:
def stop_proxy():
global _proxy_thread, _async_stop
if _async_stop:
loop, stop_ev = _async_stop
@ -193,21 +340,22 @@ def _stop_proxy() -> None:
log.info("Proxy stopped")
def _restart_proxy() -> None:
def restart_proxy():
log.info("Restarting proxy...")
_stop_proxy()
stop_proxy()
time.sleep(0.3)
_start_proxy()
start_proxy()
# menu callbacks
# Menu callbacks
def _on_open_in_telegram(_=None) -> None:
url = tg_proxy_url(_config)
def _on_open_in_telegram(_=None):
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
url = f"tg://socks?server={host}&port={port}"
log.info("Opening %s", url)
try:
result = subprocess.call(["open", url])
result = subprocess.call(['open', url])
if result != 0:
raise RuntimeError("open command failed")
except Exception:
@ -221,58 +369,67 @@ def _on_open_in_telegram(_=None) -> None:
if pyperclip:
pyperclip.copy(url)
else:
subprocess.run(["pbcopy"], input=url.encode(), check=True)
subprocess.run(['pbcopy'], input=url.encode(),
check=True)
_show_info(
"Не удалось открыть Telegram автоматически.\n\n"
f"Ссылка скопирована в буфер обмена:\n{url}"
)
f"Ссылка скопирована в буфер обмена:\n{url}")
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_copy_link(_=None) -> None:
url = tg_proxy_url(_config)
log.info("Copying link: %s", url)
try:
if pyperclip:
pyperclip.copy(url)
else:
subprocess.run(["pbcopy"], input=url.encode(), check=True)
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_restart(_=None) -> None:
def _do():
def _on_restart(_=None):
def _do_restart():
global _config
_config = load_config()
if _app:
_app.update_menu_title()
_restart_proxy()
restart_proxy()
threading.Thread(target=_do, daemon=True).start()
threading.Thread(target=_do_restart, daemon=True).start()
def _on_open_logs(_=None) -> None:
def _on_open_logs(_=None):
log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists():
subprocess.call(["open", str(LOG_FILE)])
subprocess.call(['open', str(LOG_FILE)])
else:
_show_info("Файл логов ещё не создан.")
# Show a native text input dialog. Returns None if cancelled.
def _osascript_input(prompt: str, default: str,
title: str = "TG WS Proxy") -> Optional[str]:
prompt_esc = _escape_osascript_text(prompt)
default_esc = _escape_osascript_text(default)
title_esc = _escape_osascript_text(title)
r = subprocess.run(
['osascript', '-e',
f'text returned of (display dialog "{prompt_esc}" '
f'default answer "{default_esc}" '
f'with title "{title_esc}" '
f'buttons {{"Закрыть", "OK"}} '
f'default button "OK" cancel button "Закрыть")'],
capture_output=True, text=True)
if r.returncode != 0:
return None
return r.stdout.rstrip("\r\n")
def _on_edit_config(_=None) -> None:
def _on_edit_config(_=None):
threading.Thread(target=_edit_config_dialog, daemon=True).start()
def _check_updates_menu_title() -> str:
on = bool(_config.get("check_updates", True))
return "✓ Проверять обновления при запуске" if on else "Проверять обновления при запуске (выкл)"
return (
"✓ Проверять обновления при запуске"
if on
else "Проверять обновления при запуске (выкл)"
)
def _toggle_check_updates(_=None) -> None:
def _toggle_check_updates(_=None):
global _config
_config["check_updates"] = not bool(_config.get("check_updates", True))
save_config(_config)
@ -280,15 +437,12 @@ def _toggle_check_updates(_=None) -> None:
_app._check_updates_item.title = _check_updates_menu_title()
def _on_open_release_page(_=None) -> None:
def _on_open_release_page(_=None):
from utils.update_check import RELEASES_PAGE_URL
webbrowser.open(RELEASES_PAGE_URL)
# update check
def _maybe_notify_update_async() -> None:
def _maybe_notify_update_async():
def _work():
time.sleep(1.5)
if _exiting:
@ -304,7 +458,8 @@ def _maybe_notify_update_async() -> None:
url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ver = st.get("latest") or "?"
if _ask_yes_no(
f"Доступна новая версия: {ver}\n\nОткрыть страницу релиза в браузере?",
f"Доступна новая версия: {ver}\n\n"
f"Открыть страницу релиза в браузере?",
"TG WS Proxy — обновление",
):
webbrowser.open(url)
@ -314,16 +469,18 @@ def _maybe_notify_update_async() -> None:
threading.Thread(target=_work, daemon=True, name="update-check").start()
# settings dialog
def _edit_config_dialog() -> None:
# Settings via native macOS dialogs
def _edit_config_dialog():
cfg = load_config()
host = _osascript_input("IP-адрес прокси:", cfg.get("host", DEFAULT_CONFIG["host"]))
# Host
host = _osascript_input(
"IP-адрес прокси:",
cfg.get("host", DEFAULT_CONFIG["host"]))
if host is None:
return
host = host.strip()
import socket as _sock
try:
_sock.inet_aton(host)
@ -331,7 +488,10 @@ def _edit_config_dialog() -> None:
_show_error("Некорректный IP-адрес.")
return
port_str = _osascript_input("Порт прокси:", str(cfg.get("port", DEFAULT_CONFIG["port"])))
# Port
port_str = _osascript_input(
"Порт прокси:",
str(cfg.get("port", DEFAULT_CONFIG["port"])))
if port_str is None:
return
try:
@ -342,49 +502,42 @@ def _edit_config_dialog() -> None:
_show_error("Порт должен быть числом 1-65535")
return
secret_str = _osascript_input(
"MTProto Secret (32 hex символа):", cfg.get("secret", DEFAULT_CONFIG["secret"])
)
if secret_str is None:
return
secret_str = secret_str.strip().lower()
if len(secret_str) != 32 or not all(c in "0123456789abcdef" for c in secret_str):
_show_error("Secret должен быть строкой из 32 шестнадцатеричных символов.")
return
# DC-IP mappings
dc_default = ", ".join(cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"]))
dc_str = _osascript_input(
"DC → IP маппинги (через запятую, формат DC:IP):\n"
"Например: 2:149.154.167.220, 4:149.154.167.220",
dc_default,
)
dc_default)
if dc_str is None:
return
dc_lines = [s.strip() for s in dc_str.replace(",", "\n").splitlines() if s.strip()]
dc_lines = [s.strip() for s in dc_str.replace(',', '\n').splitlines()
if s.strip()]
try:
tg_ws_proxy.parse_dc_ip_list(dc_lines)
except ValueError as e:
_show_error(str(e))
return
# Verbose
verbose = _ask_yes_no_close("Включить подробное логирование (verbose)?")
if verbose is None:
return
# Advanced settings
adv_str = _osascript_input(
"Расширенные настройки (буфер KB, WS пул, лог MB):\n"
"Формат: buf_kb,pool_size,log_max_mb",
f"{cfg.get('buf_kb', DEFAULT_CONFIG['buf_kb'])},"
f"{cfg.get('pool_size', DEFAULT_CONFIG['pool_size'])},"
f"{cfg.get('log_max_mb', DEFAULT_CONFIG['log_max_mb'])}",
)
f"{cfg.get('log_max_mb', DEFAULT_CONFIG['log_max_mb'])}")
if adv_str is None:
return
adv = {}
if adv_str:
parts = [s.strip() for s in adv_str.split(",")]
keys = [("buf_kb", int), ("pool_size", int), ("log_max_mb", float)]
parts = [s.strip() for s in adv_str.split(',')]
keys = [("buf_kb", int), ("pool_size", int),
("log_max_mb", float)]
for i, (k, typ) in enumerate(keys):
if i < len(parts):
try:
@ -395,13 +548,11 @@ def _edit_config_dialog() -> None:
new_cfg = {
"host": host,
"port": port,
"secret": secret_str,
"dc_ip": dc_lines,
"verbose": verbose,
"buf_kb": adv.get("buf_kb", cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])),
"pool_size": adv.get("pool_size", cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])),
"log_max_mb": adv.get("log_max_mb", cfg.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"])),
"check_updates": cfg.get("check_updates", True),
}
save_config(new_cfg)
log.info("Config saved: %s", new_cfg)
@ -411,23 +562,21 @@ def _edit_config_dialog() -> None:
if _app:
_app.update_menu_title()
if _ask_yes_no_close("Настройки сохранены.\n\nПерезапустить прокси сейчас?"):
_restart_proxy()
if _ask_yes_no_close(
"Настройки сохранены.\n\nПерезапустить прокси сейчас?"):
restart_proxy()
# first run & ipv6
# First-run & IPv6 dialogs
def _show_first_run() -> None:
ensure_dirs()
def _show_first_run():
_ensure_dirs()
if FIRST_RUN_MARKER.exists():
return
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
secret = _config.get("secret", DEFAULT_CONFIG["secret"])
tg_url = tg_proxy_url(_config)
link_host = tg_ws_proxy.get_link_host(host)
tg_url = f"tg://socks?server={host}&port={port}"
text = (
f"Прокси запущен и работает в строке меню.\n\n"
@ -437,54 +586,54 @@ def _show_first_run() -> None:
f" Или ссылка: {tg_url}\n\n"
f"Вручную:\n"
f" Настройки → Продвинутые → Тип подключения → Прокси\n"
f" MTProto → {link_host} : {port} \n"
f" Secret: dd{secret} \n\n"
f" SOCKS5 → {host} : {port} (без логина/пароля)\n\n"
f"Открыть прокси в Telegram сейчас?"
)
FIRST_RUN_MARKER.touch()
if _ask_yes_no(text, "TG WS Proxy"):
_on_open_in_telegram()
def _check_ipv6_warning() -> None:
ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
def _has_ipv6_enabled() -> bool:
import socket as _sock
has = False
try:
for addr in _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6):
addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6)
for addr in addrs:
ip = addr[4][0]
if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"):
has = True
break
if ip and not ip.startswith('::1') and not ip.startswith('fe80::1'):
return True
except Exception:
pass
if not has:
try:
s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM)
s.bind(("::1", 0))
s.bind(('::1', 0))
s.close()
has = True
return True
except Exception:
pass
if not has:
return False
def _check_ipv6_warning():
_ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
if not _has_ipv6_enabled():
return
IPV6_WARN_MARKER.touch()
_show_info(
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает, попробуйте отключить "
"попытку соединения по IPv6 в настройках прокси Telegram.\n\n"
"Это предупреждение будет показано только один раз."
)
"Это предупреждение будет показано только один раз.")
# rumps app
# rumps menubar app
_TgWsProxyAppBase = rumps.App if rumps else object
@ -492,26 +641,33 @@ _TgWsProxyAppBase = rumps.App if rumps else object
class TgWsProxyApp(_TgWsProxyAppBase):
def __init__(self):
_ensure_menubar_icon()
icon_path = str(MENUBAR_ICON_PATH) if MENUBAR_ICON_PATH.exists() else None
icon_path = (str(MENUBAR_ICON_PATH)
if MENUBAR_ICON_PATH.exists() else None)
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
link_host = tg_ws_proxy.get_link_host(host)
self._open_tg_item = rumps.MenuItem(
f"Открыть в Telegram ({link_host}:{port})", callback=_on_open_in_telegram
)
self._copy_link_item = rumps.MenuItem("Скопировать ссылку", callback=_on_copy_link)
self._restart_item = rumps.MenuItem("Перезапустить прокси", callback=_on_restart)
self._settings_item = rumps.MenuItem("Настройки...", callback=_on_edit_config)
self._logs_item = rumps.MenuItem("Открыть логи", callback=_on_open_logs)
f"Открыть в Telegram ({host}:{port})",
callback=_on_open_in_telegram)
self._restart_item = rumps.MenuItem(
"Перезапустить прокси",
callback=_on_restart)
self._settings_item = rumps.MenuItem(
"Настройки...",
callback=_on_edit_config)
self._logs_item = rumps.MenuItem(
"Открыть логи",
callback=_on_open_logs)
self._release_page_item = rumps.MenuItem(
"Страница релиза на GitHub…", callback=_on_open_release_page
)
"Страница релиза на GitHub…",
callback=_on_open_release_page)
self._check_updates_item = rumps.MenuItem(
_check_updates_menu_title(), callback=_toggle_check_updates
)
self._version_item = rumps.MenuItem(f"Версия {__version__}", callback=lambda _: None)
_check_updates_menu_title(),
callback=_toggle_check_updates)
self._version_item = rumps.MenuItem(
f"Версия {__version__}",
callback=lambda _: None)
super().__init__(
"TG WS Proxy",
@ -520,7 +676,6 @@ class TgWsProxyApp(_TgWsProxyAppBase):
quit_button="Выход",
menu=[
self._open_tg_item,
self._copy_link_item,
None,
self._restart_item,
self._settings_item,
@ -530,20 +685,16 @@ class TgWsProxyApp(_TgWsProxyAppBase):
self._check_updates_item,
None,
self._version_item,
],
)
])
def update_menu_title(self) -> None:
def update_menu_title(self):
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
link_host = tg_ws_proxy.get_link_host(host)
self._open_tg_item.title = f"Открыть в Telegram ({link_host}:{port})"
self._open_tg_item.title = (
f"Открыть в Telegram ({host}:{port})")
# entry point
def run_menubar() -> None:
def run_menubar():
global _app, _config
_config = load_config()
@ -555,26 +706,26 @@ def run_menubar() -> None:
except Exception:
pass
setup_logging(
_config.get("verbose", False),
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]),
)
setup_logging(_config.get("verbose", False),
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]))
log.info("TG WS Proxy версия %s, menubar app starting", __version__)
log.info("Config: %s", _config)
log.info("Log file: %s", LOG_FILE)
if rumps is None or Image is None:
log.error("rumps or Pillow not installed; running in console mode")
_start_proxy()
start_proxy()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
_stop_proxy()
stop_proxy()
return
_start_proxy()
start_proxy()
_maybe_notify_update_async()
_show_first_run()
_check_ipv6_warning()
@ -582,18 +733,19 @@ def run_menubar() -> None:
log.info("Menubar app running")
_app.run()
_stop_proxy()
stop_proxy()
log.info("Menubar app exited")
def main() -> None:
if not acquire_lock("macos.py"):
def main():
if not _acquire_lock():
_show_info("Приложение уже запущено.")
return
try:
run_menubar()
finally:
release_lock()
_release_lock()
if __name__ == "__main__":

View File

@ -1 +1 @@
__version__ = "1.4.0"
__version__ = "1.3.0"

File diff suppressed because it is too large Load Diff

View File

@ -22,7 +22,7 @@ keywords = [
"proxy",
"bypass",
"websocket",
"mtproto",
"socks5",
]
classifiers = [
"Development Status :: 5 - Production/Stable",

View File

@ -1,3 +1,8 @@
"""
Общая светлая тема и фабрика окон CustomTkinter для tray-приложений (Windows / Linux).
Цвета и отступы задаются в одном месте правки темы не дублируются по платформам.
"""
from __future__ import annotations
import sys
@ -8,7 +13,11 @@ from typing import Any, Callable, Optional, Tuple
_tk_variable_del_guard_installed = False
def install_tkinter_variable_del_guard() -> None:
def _install_tkinter_variable_del_guard() -> None:
"""
Убирает «Exception ignored» при выходе процесса: Tcl уже разрушен, а GC ещё
вызывает Variable.__del__ (StringVar и т.д.) напр. окно CTk в фоновом потоке.
"""
global _tk_variable_del_guard_installed
if _tk_variable_del_guard_installed:
return
@ -23,24 +32,24 @@ def install_tkinter_variable_del_guard() -> None:
tkinter.Variable.__del__ = _safe_variable_del # type: ignore[assignment]
_tk_variable_del_guard_installed = True
# Размеры и отступы (единые для диалогов настроек и первого запуска)
CONFIG_DIALOG_SIZE: Tuple[int, int] = (460, 560)
CONFIG_DIALOG_FRAME_PAD: Tuple[int, int] = (20, 14)
FIRST_RUN_SIZE: Tuple[int, int] = (520, 480)
FIRST_RUN_SIZE: Tuple[int, int] = (520, 440)
FIRST_RUN_FRAME_PAD: Tuple[int, int] = (28, 24)
@dataclass(frozen=True)
class CtkTheme:
tg_blue: tuple = ("#3390ec", "#3390ec")
tg_blue_hover: tuple = ("#2b7cd4", "#2b7cd4")
bg: tuple = ("#ffffff", "#1e1e1e")
field_bg: tuple = ("#f0f2f5", "#2b2b2b")
field_border: tuple = ("#d6d9dc", "#3a3a3a")
text_primary: tuple = ("#000000", "#ffffff")
text_secondary: tuple = ("#707579", "#aaaaaa")
"""Палитра Telegram-style и семейства шрифтов для UI и моноширинного текста."""
tg_blue: str = "#3390ec"
tg_blue_hover: str = "#2b7cd4"
bg: str = "#ffffff"
field_bg: str = "#f0f2f5"
field_border: str = "#d6d9dc"
text_primary: str = "#000000"
text_secondary: str = "#707579"
ui_font_family: str = "Sans"
mono_font_family: str = "Monospace"
@ -52,16 +61,17 @@ def ctk_theme_for_platform() -> CtkTheme:
def apply_ctk_appearance(ctk: Any) -> None:
ctk.set_appearance_mode("auto")
ctk.set_appearance_mode("light")
ctk.set_default_color_theme("blue")
def center_ctk_geometry(root: Any, width: int, height: int) -> None:
sw = root.winfo_screenwidth()
sh = root.winfo_screenheight()
root.geometry(f"{width}x{height}+{(sw - width) // 2}+{(sh - height) // 2}")
def create_ctk_toplevel(
def create_ctk_root(
ctk: Any,
*,
title: str,
@ -71,27 +81,21 @@ def create_ctk_toplevel(
topmost: bool = True,
after_create: Optional[Callable[[Any], None]] = None,
) -> Any:
root = ctk.CTkToplevel()
"""
Создаёт CTk: глобальная тема, заголовок, без ресайза, по центру экрана, фон из палитры.
after_create опционально: установка иконки окна (различается по ОС).
"""
_install_tkinter_variable_del_guard()
apply_ctk_appearance(ctk)
root = ctk.CTk()
root.title(title)
root.resizable(False, False)
center_ctk_geometry(root, width, height)
root.configure(fg_color=theme.bg)
if topmost:
root.attributes("-topmost", True)
root.lift()
root.focus_force()
center_ctk_geometry(root, width, height)
root.configure(fg_color=theme.bg)
if after_create:
_after_id = root.after(300, lambda: after_create(root))
_orig_destroy = root.destroy
def _safe_destroy():
try:
root.after_cancel(_after_id)
except Exception:
pass
_orig_destroy()
root.destroy = _safe_destroy
after_create(root)
return root

View File

@ -1,3 +1,7 @@
"""
Всплывающие подсказки для CustomTkinter / tk: задержка, Toplevel без рамки, wrap.
"""
from __future__ import annotations
import tkinter as tk
@ -5,6 +9,8 @@ from typing import Any, List, Optional
class CtkTooltip:
"""Показ текста при наведении на виджет."""
def __init__(
self,
widget: Any,
@ -25,8 +31,6 @@ class CtkTooltip:
widget.bind("<Destroy>", self._on_destroy, add="+")
def _schedule(self, _event: Any = None) -> None:
if self.widget is None:
return
self._cancel_after()
self._after_id = self.widget.after(self.delay_ms, self._show)
@ -85,7 +89,6 @@ class CtkTooltip:
def _on_destroy(self, _event: Any = None) -> None:
self._hide()
self.widget = None
def _is_windows() -> bool:
@ -101,9 +104,11 @@ def attach_ctk_tooltip(
delay_ms: int = 450,
wraplength: int = 320,
) -> None:
"""Повесить подсказку на виджет (CTk или tk)."""
CtkTooltip(widget, text, delay_ms=delay_ms, wraplength=wraplength)
def attach_tooltip_to_widgets(widgets: List[Any], text: str, **kwargs: Any) -> None:
"""Одна и та же подсказка на несколько виджетов (подпись + поле)."""
for w in widgets:
attach_ctk_tooltip(w, text, **kwargs)

View File

@ -1,6 +1,10 @@
"""
Общая разметка CustomTkinter для tray (Windows / Linux): настройки и первый запуск.
Логика сохранения и колбэки остаются в платформенных модулях.
"""
from __future__ import annotations
import os
import webbrowser
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
@ -16,15 +20,15 @@ from ui.ctk_theme import (
)
from ui.ctk_tooltip import attach_ctk_tooltip, attach_tooltip_to_widgets
# Подсказки для формы настроек (новые пользователи)
_TIP_HOST = (
"Адрес, на котором прокси принимает подключения.\n"
"Адрес, на котором прокси принимает SOCKS5-подключения.\n"
"Обычно 127.0.0.1 — локальная сеть, 0.0.0.0 - все интерфейсы"
)
_TIP_PORT = (
"Порт прокси. В Telegram Desktop в настройках прокси должен быть "
"Порт SOCKS5. В Telegram Desktop в настройках прокси должен быть "
"указан тот же порт"
)
_TIP_SECRET = "Секретный ключ для авторизации клиентов"
_TIP_DC = (
"Соответствие номера датацентра Telegram (DC) и IP-адреса сервера.\n"
"Каждая строка: «номер:IP», например 2:149.154.167.220. "
@ -47,62 +51,16 @@ _TIP_LOG_MB = (
)
_TIP_AUTOSTART = (
"Запускать TG WS Proxy при входе в Windows. "
"Если вы переместите программу в другую папку, автозапуск сбросится"
"Если вы переместите программу в другую папку, автозапуска сбросится"
)
_TIP_CHECK_UPDATES = (
"При запуске проверять наличие обновлений"
)
_TIP_CHECK_UPDATES = "При запуске проверять наличие обновлений"
_TIP_SAVE = "Сохранить настройки"
_TIP_CANCEL = "Закрыть окно без сохранения изменений"
_INNER_W = 396
def _entry(ctk, parent, theme, *, var=None, width=0, height=36, radius=10, **kw):
opts = dict(
font=(theme.ui_font_family, 13), corner_radius=radius,
fg_color=theme.bg, border_color=theme.field_border,
border_width=1, text_color=theme.text_primary,
)
if var is not None:
opts["textvariable"] = var
if width:
opts["width"] = width
opts["height"] = height
opts.update(kw)
return ctk.CTkEntry(parent, **opts)
def _checkbox(ctk, parent, theme, text, variable):
return ctk.CTkCheckBox(
parent, text=text, variable=variable,
font=(theme.ui_font_family, 13), text_color=theme.text_primary,
fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover,
corner_radius=6, border_width=2, border_color=theme.field_border,
)
def _label(ctk, parent, theme, text, *, size=12, bold=False, secondary=True, **kw):
weight = "bold" if bold else "normal"
return ctk.CTkLabel(
parent, text=text,
font=(theme.ui_font_family, size, weight),
text_color=theme.text_secondary if secondary else theme.text_primary,
anchor="w", **kw,
)
def _labeled_entry(ctk, parent, theme, label_text, value, *, tip="", width=0, pack_fill=False):
col = ctk.CTkFrame(parent, fg_color="transparent")
lbl = _label(ctk, col, theme, label_text)
lbl.pack(anchor="w", pady=(0, 2))
var = ctk.StringVar(value=str(value))
ent = _entry(ctk, col, theme, var=var, width=width)
if pack_fill:
ent.pack(fill="x")
else:
ent.pack(anchor="w")
if tip:
attach_tooltip_to_widgets([lbl, ent, col], tip)
return col, var
# Внутренняя ширина полей относительно ширины окна настроек (см. CONFIG_DIALOG_SIZE)
_CONFIG_FORM_INNER_WIDTH = 396
def tray_settings_scroll_and_footer(
@ -110,6 +68,10 @@ def tray_settings_scroll_and_footer(
content_parent: Any,
theme: CtkTheme,
) -> Tuple[Any, Any]:
"""
Нижняя панель под кнопки и прокручиваемая область для формы (форма не обрезает кнопки).
Возвращает (scroll_frame, footer_frame).
"""
footer = ctk.CTkFrame(content_parent, fg_color=theme.bg)
footer.pack(side="bottom", fill="x")
scroll = ctk.CTkScrollableFrame(
@ -131,12 +93,22 @@ def _config_section(
*,
bottom_spacer: int = 6,
) -> Any:
"""Заголовок секции и карточка с рамкой для группировки полей."""
wrap = ctk.CTkFrame(parent, fg_color="transparent")
wrap.pack(fill="x", pady=(0, bottom_spacer))
_label(ctk, wrap, theme, title, secondary=False, bold=True).pack(anchor="w", pady=(0, 2))
ctk.CTkLabel(
wrap,
text=title,
font=(theme.ui_font_family, 12, "bold"),
text_color=theme.text_primary,
anchor="w",
).pack(anchor="w", pady=(0, 2))
card = ctk.CTkFrame(
wrap, fg_color=theme.field_bg, corner_radius=10,
border_width=1, border_color=theme.field_border,
wrap,
fg_color=theme.field_bg,
corner_radius=10,
border_width=1,
border_color=theme.field_border,
)
card.pack(fill="x")
inner = ctk.CTkFrame(card, fg_color="transparent")
@ -148,7 +120,6 @@ def _config_section(
class TrayConfigFormWidgets:
host_var: Any
port_var: Any
secret_var: Any
dc_textbox: Any
verbose_var: Any
adv_entries: List[Any]
@ -167,67 +138,102 @@ def install_tray_config_form(
show_autostart: bool = False,
autostart_value: bool = False,
) -> TrayConfigFormWidgets:
"""Поля настроек прокси внутри уже созданного `frame`."""
header = ctk.CTkFrame(frame, fg_color="transparent")
header.pack(fill="x", pady=(0, 2))
ctk.CTkLabel(
header, text="Настройки прокси",
header,
text="Настройки прокси",
font=(theme.ui_font_family, 17, "bold"),
text_color=theme.text_primary, anchor="w",
text_color=theme.text_primary,
anchor="w",
).pack(side="left")
ctk.CTkLabel(
header, text=f"v{__version__}",
header,
text=f"v{__version__}",
font=(theme.ui_font_family, 12),
text_color=theme.text_secondary, anchor="e",
text_color=theme.text_secondary,
anchor="e",
).pack(side="right")
conn = _config_section(ctk, frame, theme, "Подключение MTProto")
inner_w = _CONFIG_FORM_INNER_WIDTH
conn = _config_section(ctk, frame, theme, "Подключение SOCKS5")
host_row = ctk.CTkFrame(conn, fg_color="transparent")
host_row.pack(fill="x")
host_col, host_var = _labeled_entry(
ctk, host_row, theme, "IP-адрес",
cfg.get("host", default_config["host"]),
tip=_TIP_HOST, width=160, pack_fill=True,
)
host_col = ctk.CTkFrame(host_row, fg_color="transparent")
host_col.pack(side="left", fill="x", expand=True, padx=(0, 10))
port_col, port_var = _labeled_entry(
ctk, host_row, theme, "Порт",
cfg.get("port", default_config["port"]),
tip=_TIP_PORT, width=100,
host_lbl = ctk.CTkLabel(
host_col,
text="IP-адрес",
font=(theme.ui_font_family, 12),
text_color=theme.text_secondary,
anchor="w",
)
host_lbl.pack(anchor="w", pady=(0, 2))
host_var = ctk.StringVar(value=cfg.get("host", default_config["host"]))
host_entry = ctk.CTkEntry(
host_col,
textvariable=host_var,
width=160,
height=36,
font=(theme.ui_font_family, 13),
corner_radius=10,
fg_color=theme.bg,
border_color=theme.field_border,
border_width=1,
text_color=theme.text_primary,
)
host_entry.pack(fill="x", pady=(0, 0))
attach_tooltip_to_widgets([host_lbl, host_entry, host_col], _TIP_HOST)
port_col = ctk.CTkFrame(host_row, fg_color="transparent")
port_col.pack(side="left")
secret_row = ctk.CTkFrame(conn, fg_color="transparent")
secret_row.pack(fill="x")
secret_col, secret_var = _labeled_entry(
ctk, secret_row, theme, "Secret",
cfg.get("secret", default_config["secret"]),
tip=_TIP_SECRET, width=160, pack_fill=True,
port_lbl = ctk.CTkLabel(
port_col,
text="Порт",
font=(theme.ui_font_family, 12),
text_color=theme.text_secondary,
anchor="w",
)
secret_col.pack(side="left", fill="x", expand=True, padx=(0, 10))
regen_col = ctk.CTkFrame(secret_row, fg_color="transparent")
regen_col.pack(side="left", anchor="s")
ctk.CTkLabel(regen_col, text="", font=(theme.ui_font_family, 12)).pack(pady=(0, 2))
ctk.CTkButton(
regen_col, text="", width=36, height=36,
font=(theme.ui_font_family, 18), corner_radius=10,
fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover,
text_color="#ffffff", border_width=1, border_color=theme.field_border,
command=lambda: secret_var.set(os.urandom(16).hex()),
).pack()
port_lbl.pack(anchor="w", pady=(0, 2))
port_var = ctk.StringVar(value=str(cfg.get("port", default_config["port"])))
port_entry = ctk.CTkEntry(
port_col,
textvariable=port_var,
width=100,
height=36,
font=(theme.ui_font_family, 13),
corner_radius=10,
fg_color=theme.bg,
border_color=theme.field_border,
border_width=1,
text_color=theme.text_primary,
)
port_entry.pack(anchor="w")
attach_tooltip_to_widgets([port_lbl, port_entry, port_col], _TIP_PORT)
dc_inner = _config_section(ctk, frame, theme, "Датацентры Telegram (DC → IP)")
dc_lbl = _label(ctk, dc_inner, theme, "По одному правилу на строку, формат: номер:IP", size=11)
dc_lbl = ctk.CTkLabel(
dc_inner,
text="По одному правилу на строку, формат: номер:IP",
font=(theme.ui_font_family, 11),
text_color=theme.text_secondary,
anchor="w",
)
dc_lbl.pack(anchor="w", pady=(0, 4))
dc_textbox = ctk.CTkTextbox(
dc_inner, width=_INNER_W, height=88,
font=(theme.mono_font_family, 12), corner_radius=10,
fg_color=theme.bg, border_color=theme.field_border,
border_width=1, text_color=theme.text_primary,
dc_inner,
width=inner_w,
height=88,
font=(theme.mono_font_family, 12),
corner_radius=10,
fg_color=theme.bg,
border_color=theme.field_border,
border_width=1,
text_color=theme.text_primary,
)
dc_textbox.pack(fill="x")
dc_textbox.insert("1.0", "\n".join(cfg.get("dc_ip", default_config["dc_ip"])))
@ -236,7 +242,18 @@ def install_tray_config_form(
log_inner = _config_section(ctk, frame, theme, "Логи и производительность")
verbose_var = ctk.BooleanVar(value=cfg.get("verbose", False))
verbose_cb = _checkbox(ctk, log_inner, theme, "Подробное логирование (verbose)", verbose_var)
verbose_cb = ctk.CTkCheckBox(
log_inner,
text="Подробное логирование (verbose)",
variable=verbose_var,
font=(theme.ui_font_family, 13),
text_color=theme.text_primary,
fg_color=theme.tg_blue,
hover_color=theme.tg_blue_hover,
corner_radius=6,
border_width=2,
border_color=theme.field_border,
)
verbose_cb.pack(anchor="w", pady=(0, 6))
attach_ctk_tooltip(verbose_cb, _TIP_VERBOSE)
@ -248,17 +265,33 @@ def install_tray_config_form(
("Пул WebSocket-сессий (по умолчанию 4)", "pool_size", _TIP_POOL),
("Макс. размер лога, МБ (по умолчанию 5)", "log_max_mb", _TIP_LOG_MB),
]
for label_text, key, tip in adv_rows:
col = ctk.CTkFrame(adv_frame, fg_color="transparent")
col.pack(fill="x", pady=(0, 0 if key == "log_max_mb" else 5))
adv_l = _label(ctk, col, theme, label_text, size=11)
for lbl, key, tip in adv_rows:
col_frame = ctk.CTkFrame(adv_frame, fg_color="transparent")
col_frame.pack(fill="x", pady=(0, 0 if key == "log_max_mb" else 5))
adv_l = ctk.CTkLabel(
col_frame,
text=lbl,
font=(theme.ui_font_family, 11),
text_color=theme.text_secondary,
anchor="w",
)
adv_l.pack(anchor="w", pady=(0, 2))
adv_e = _entry(
ctk, col, theme, width=_INNER_W, height=32, radius=8,
textvariable=ctk.StringVar(value=str(cfg.get(key, default_config[key]))),
adv_e = ctk.CTkEntry(
col_frame,
width=inner_w,
height=32,
font=(theme.ui_font_family, 13),
corner_radius=8,
fg_color=theme.bg,
border_color=theme.field_border,
border_width=1,
text_color=theme.text_primary,
textvariable=ctk.StringVar(
value=str(cfg.get(key, default_config[key]))
),
)
adv_e.pack(fill="x")
attach_tooltip_to_widgets([adv_l, adv_e, col], tip)
attach_tooltip_to_widgets([adv_l, adv_e, col_frame], tip)
adv_entries = list(adv_frame.winfo_children())
adv_keys = ("buf_kb", "pool_size", "log_max_mb")
@ -266,9 +299,22 @@ def install_tray_config_form(
upd_inner = _config_section(ctk, frame, theme, "Обновления")
st = get_status()
check_updates_var = ctk.BooleanVar(
value=bool(cfg.get("check_updates", default_config.get("check_updates", True)))
value=bool(
cfg.get("check_updates", default_config.get("check_updates", True))
)
)
upd_cb = ctk.CTkCheckBox(
upd_inner,
text="Проверять обновления при запуске",
variable=check_updates_var,
font=(theme.ui_font_family, 13),
text_color=theme.text_primary,
fg_color=theme.tg_blue,
hover_color=theme.tg_blue_hover,
corner_radius=6,
border_width=2,
border_color=theme.field_border,
)
upd_cb = _checkbox(ctk, upd_inner, theme, "Проверять обновления при запуске", check_updates_var)
upd_cb.pack(anchor="w", pady=(0, 6))
attach_ctk_tooltip(upd_cb, _TIP_CHECK_UPDATES)
@ -289,38 +335,72 @@ def install_tray_config_form(
else:
upd_status = "Установлена последняя известная версия с GitHub."
_label(ctk, upd_inner, theme, upd_status, size=11,
justify="left", wraplength=_INNER_W).pack(anchor="w", pady=(0, 8))
ctk.CTkLabel(
upd_inner,
text=upd_status,
font=(theme.ui_font_family, 11),
text_color=theme.text_secondary,
anchor="w",
justify="left",
wraplength=inner_w,
).pack(anchor="w", pady=(0, 8))
rel_url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ctk.CTkButton(
upd_inner, text="Открыть страницу релиза", height=32,
font=(theme.ui_font_family, 13), corner_radius=8,
fg_color=theme.field_bg, hover_color=theme.field_border,
text_color=theme.text_primary, border_width=1,
open_rel_btn = ctk.CTkButton(
upd_inner,
text="Открыть страницу релиза",
height=32,
font=(theme.ui_font_family, 13),
corner_radius=8,
fg_color=theme.field_bg,
hover_color=theme.field_border,
text_color=theme.text_primary,
border_width=1,
border_color=theme.field_border,
command=lambda u=rel_url: webbrowser.open(u),
).pack(anchor="w")
)
open_rel_btn.pack(anchor="w")
autostart_var = None
if show_autostart:
sys_inner = _config_section(ctk, frame, theme, "Запуск Windows", bottom_spacer=4)
sys_inner = _config_section(
ctk, frame, theme, "Запуск Windows", bottom_spacer=4
)
autostart_var = ctk.BooleanVar(value=autostart_value)
as_cb = _checkbox(ctk, sys_inner, theme, "Автозапуск при включении компьютера", autostart_var)
as_cb = ctk.CTkCheckBox(
sys_inner,
text="Автозапуск при включении компьютера",
variable=autostart_var,
font=(theme.ui_font_family, 13),
text_color=theme.text_primary,
fg_color=theme.tg_blue,
hover_color=theme.tg_blue_hover,
corner_radius=6,
border_width=2,
border_color=theme.field_border,
)
as_cb.pack(anchor="w", pady=(0, 4))
as_hint = _label(
ctk, sys_inner, theme,
"Если переместить программу в другую папку, запись автозапуска может сброситься.",
size=11, justify="left", wraplength=_INNER_W,
as_hint = ctk.CTkLabel(
sys_inner,
text="Если переместить программу в другую папку, запись автозапуска может сброситься.",
font=(theme.ui_font_family, 11),
text_color=theme.text_secondary,
anchor="w",
justify="left",
wraplength=inner_w,
)
as_hint.pack(anchor="w")
attach_tooltip_to_widgets([as_cb, as_hint], _TIP_AUTOSTART)
return TrayConfigFormWidgets(
host_var=host_var, port_var=port_var, secret_var=secret_var,
dc_textbox=dc_textbox, verbose_var=verbose_var,
adv_entries=adv_entries, adv_keys=adv_keys,
autostart_var=autostart_var, check_updates_var=check_updates_var,
host_var=host_var,
port_var=port_var,
dc_textbox=dc_textbox,
verbose_var=verbose_var,
adv_entries=adv_entries,
adv_keys=adv_keys,
autostart_var=autostart_var,
check_updates_var=check_updates_var,
)
@ -329,6 +409,7 @@ def merge_adv_from_form(
base: Dict[str, Any],
default_config: dict,
) -> None:
"""Дополняет base значениями buf_kb / pool_size / log_max_mb (in-place)."""
for i, key in enumerate(widgets.adv_keys):
col_frame = widgets.adv_entries[i]
entry = col_frame.winfo_children()[1]
@ -347,6 +428,9 @@ def validate_config_form(
*,
include_autostart: bool,
) -> Union[dict, str]:
"""
Возвращает словарь полей конфига или строку ошибки для показа пользователю.
"""
import socket as _sock
host_val = widgets.host_var.get().strip()
@ -372,18 +456,9 @@ def validate_config_form(
except ValueError as e:
return str(e)
secret_val = widgets.secret_var.get().strip()
if len(secret_val) != 32:
return "Secret должен содержать ровно 32 hex-символа (16 байт)."
try:
bytes.fromhex(secret_val)
except ValueError:
return "Secret должен состоять только из hex-символов (0-9, a-f)."
new_cfg: Dict[str, Any] = {
"host": host_val,
"port": port_val,
"secret": secret_val,
"dc_ip": lines,
"verbose": widgets.verbose_var.get(),
}
@ -442,11 +517,12 @@ def populate_first_run_window(
*,
host: str,
port: int,
secret: str,
on_done: Callable[[bool], None],
) -> None:
link_host = tg_ws_proxy.get_link_host(host)
tg_url = f"tg://proxy?server={link_host}&port={port}&secret=dd{secret}"
"""
Содержимое окна первого запуска. on_done(open_in_telegram) по «Начать» и по закрытию окна.
"""
tg_url = f"tg://socks?server={host}&port={port}"
fpx, fpy = FIRST_RUN_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
@ -465,35 +541,18 @@ def populate_first_run_window(
("Как подключить Telegram Desktop:", True),
(" Автоматически:", True),
(" ПКМ по иконке в трее → «Открыть в Telegram»", False),
(f" Или скопировать ссылку, отправить её себе в TG и нажать по ней: {tg_url}", False),
(f" Или ссылка: {tg_url}", False),
("\n Вручную:", True),
(" Настройки → Продвинутые → Тип подключения → Прокси", False),
(f" MTProto → {link_host} : {port}", False),
(f" Secret: dd{secret}", False),
(f" SOCKS5 → {host} : {port} (без логина/пароля)", False),
]
textbox = ctk.CTkTextbox(
frame,
font=(theme.ui_font_family, 13),
fg_color=theme.bg,
border_width=0,
text_color=theme.text_primary,
activate_scrollbars=False,
wrap="word",
height=275,
)
textbox._textbox.tag_configure("bold", font=(theme.ui_font_family, 13, "bold"))
textbox._textbox.configure(spacing1=1, spacing3=1)
for text, bold in sections:
if text.startswith("\n"):
textbox.insert("end", "\n")
text = text[1:]
if bold:
textbox.insert("end", text + "\n", "bold")
else:
textbox.insert("end", text + "\n")
textbox.configure(state="disabled")
textbox.pack(anchor="w", fill="x")
weight = "bold" if bold else "normal"
ctk.CTkLabel(frame, text=text,
font=(theme.ui_font_family, 13, weight),
text_color=theme.text_primary,
anchor="w", justify="left").pack(anchor="w", pady=1)
ctk.CTkFrame(frame, fg_color="transparent", height=16).pack()
@ -501,8 +560,12 @@ def populate_first_run_window(
corner_radius=0).pack(fill="x", pady=(0, 12))
auto_var = ctk.BooleanVar(value=True)
_checkbox(ctk, frame, theme, "Открыть прокси в Telegram сейчас",
auto_var).pack(anchor="w", pady=(0, 16))
ctk.CTkCheckBox(frame, text="Открыть прокси в Telegram сейчас",
variable=auto_var, font=(theme.ui_font_family, 13),
text_color=theme.text_primary,
fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover,
corner_radius=6, border_width=2,
border_color=theme.field_border).pack(anchor="w", pady=(0, 16))
def on_ok():
on_done(auto_var.get())

View File

@ -5,11 +5,10 @@
from __future__ import annotations
import sys
import os
from typing import Any, Dict
_TRAY_DEFAULTS_COMMON: Dict[str, Any] = {
"port": 1443,
"port": 1080,
"host": "127.0.0.1",
"dc_ip": ["2:149.154.167.220", "4:149.154.167.220"],
"verbose": False,
@ -21,10 +20,8 @@ _TRAY_DEFAULTS_COMMON: Dict[str, Any] = {
def default_tray_config() -> Dict[str, Any]:
"""Новая копия конфига по умолчанию для текущей ОС."""
cfg = dict(_TRAY_DEFAULTS_COMMON)
cfg["secret"] = os.urandom(16).hex()
if sys.platform == "win32":
cfg["autostart"] = False
return cfg

View File

@ -1,460 +0,0 @@
from __future__ import annotations
import asyncio
import json
import logging
import logging.handlers
import os
import socket as _socket
import sys
import threading
import time
from pathlib import Path
from typing import Any, Callable, Dict, Optional, Tuple
import psutil
import proxy.tg_ws_proxy as tg_ws_proxy
from proxy import __version__
from utils.default_config import default_tray_config
log = logging.getLogger("tg-ws-tray")
APP_NAME = "TgWsProxy"
def _app_dir() -> Path:
if sys.platform == "win32":
return Path(os.environ.get("APPDATA", Path.home())) / APP_NAME
if sys.platform == "darwin":
return Path.home() / "Library" / "Application Support" / APP_NAME
return Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / APP_NAME
APP_DIR = _app_dir()
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done_mtproto"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
DEFAULT_CONFIG: Dict[str, Any] = default_tray_config()
IS_FROZEN = bool(getattr(sys, "frozen", False))
def ensure_dirs() -> None:
APP_DIR.mkdir(parents=True, exist_ok=True)
# single-instance lock
_lock_file_path: Optional[Path] = None
def _same_process(meta: dict, proc: psutil.Process, script_hint: str) -> bool:
try:
lock_ct = float(meta.get("create_time", 0.0))
if lock_ct > 0 and abs(lock_ct - proc.create_time()) > 1.0:
return False
except Exception:
return False
if IS_FROZEN:
return APP_NAME.lower() in proc.name().lower()
try:
for arg in proc.cmdline():
if script_hint in arg:
return True
except Exception:
pass
return False
def acquire_lock(script_hint: str = "") -> bool:
global _lock_file_path
ensure_dirs()
for f in list(APP_DIR.glob("*.lock")):
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
meta: dict = {}
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
pass
try:
if _same_process(meta, psutil.Process(pid), script_hint):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
lock_file.write_text(
json.dumps({"create_time": proc.create_time()}, ensure_ascii=False),
encoding="utf-8",
)
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
def release_lock() -> None:
global _lock_file_path
if _lock_file_path:
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
# config
def load_config() -> dict:
ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict) -> None:
ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
# logging
_LOG_FMT_FILE = "%(asctime)s %(levelname)-5s %(name)s %(message)s"
_LOG_FMT_CONSOLE = "%(asctime)s %(levelname)-5s %(message)s"
def setup_logging(verbose: bool = False, log_max_mb: float = 5) -> None:
ensure_dirs()
level = logging.DEBUG if verbose else logging.INFO
root = logging.getLogger()
root.setLevel(level)
fh = logging.handlers.RotatingFileHandler(
str(LOG_FILE),
maxBytes=max(32 * 1024, int(log_max_mb * 1024 * 1024)),
backupCount=0,
encoding="utf-8",
)
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(_LOG_FMT_FILE, datefmt="%Y-%m-%d %H:%M:%S"))
root.addHandler(fh)
if not IS_FROZEN:
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(level)
ch.setFormatter(logging.Formatter(_LOG_FMT_CONSOLE, datefmt="%H:%M:%S"))
root.addHandler(ch)
# icon
def make_icon_image(size: int = 64, *, color: Tuple[int, ...] = (0, 136, 204, 255)):
from PIL import Image, ImageDraw, ImageFont
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = 2
draw.ellipse([margin, margin, size - margin, size - margin], fill=color)
for path in _font_paths():
try:
font = ImageFont.truetype(path, size=int(size * 0.55))
break
except Exception:
continue
else:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
draw.text(
((size - tw) // 2 - bbox[0], (size - th) // 2 - bbox[1]),
"T",
fill=(255, 255, 255, 255),
font=font,
)
return img
def _font_paths():
if sys.platform == "win32":
return ["arial.ttf"]
if sys.platform == "darwin":
return ["/System/Library/Fonts/Helvetica.ttc"]
return [
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/TTF/DejaVuSans-Bold.ttf",
]
def load_icon():
from PIL import Image
icon_path = Path(__file__).parents[1] / "icon.ico"
if icon_path.exists():
try:
return Image.open(str(icon_path))
except Exception:
pass
return make_icon_image(64)
# proxy lifecycle
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[Tuple[asyncio.AbstractEventLoop, asyncio.Event]] = None
def _run_proxy_thread(on_port_busy: Callable[[str], None]) -> None:
global _async_stop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
stop_ev = asyncio.Event()
_async_stop = (loop, stop_ev)
try:
loop.run_until_complete(tg_ws_proxy._run(stop_event=stop_ev))
except Exception as exc:
log.error("Proxy thread crashed: %s", exc)
if "Address already in use" in str(exc) or "10048" in str(exc):
on_port_busy(
"Не удалось запустить прокси:\n"
"Порт уже используется другим приложением.\n\n"
"Закройте приложение, использующее этот порт, "
"или измените порт в настройках прокси и перезапустите."
)
finally:
loop.close()
_async_stop = None
def apply_proxy_config(cfg: dict) -> bool:
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
try:
dc_redirects = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
return False
pc = tg_ws_proxy.proxy_config
pc.port = cfg.get("port", DEFAULT_CONFIG["port"])
pc.host = cfg.get("host", DEFAULT_CONFIG["host"])
pc.secret = cfg.get("secret", DEFAULT_CONFIG["secret"])
pc.dc_redirects = dc_redirects
pc.buffer_size = max(4, cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])) * 1024
pc.pool_size = max(0, cfg.get("pool_size", DEFAULT_CONFIG["pool_size"]))
return True
def start_proxy(cfg: dict, on_error: Callable[[str], None]) -> None:
global _proxy_thread
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
if not apply_proxy_config(cfg):
on_error("Ошибка конфигурации DC → IP.")
return
pc = tg_ws_proxy.proxy_config
log.info("Starting proxy on %s:%d ...", pc.host, pc.port)
_proxy_thread = threading.Thread(
target=_run_proxy_thread, args=(on_error,), daemon=True, name="proxy"
)
_proxy_thread.start()
def stop_proxy() -> None:
global _proxy_thread, _async_stop
if _async_stop:
loop, stop_ev = _async_stop
loop.call_soon_threadsafe(stop_ev.set)
if _proxy_thread:
_proxy_thread.join(timeout=5)
_proxy_thread = None
log.info("Proxy stopped")
def restart_proxy(cfg: dict, on_error: Callable[[str], None]) -> None:
log.info("Restarting proxy...")
stop_proxy()
time.sleep(0.3)
start_proxy(cfg, on_error)
def tg_proxy_url(cfg: dict) -> str:
host = cfg.get("host", DEFAULT_CONFIG["host"])
port = cfg.get("port", DEFAULT_CONFIG["port"])
secret = cfg.get("secret", DEFAULT_CONFIG["secret"])
link_host = tg_ws_proxy.get_link_host(host)
return f"tg://proxy?server={link_host}&port={port}&secret=dd{secret}"
_IPV6_WARNING = (
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает или в логах присутствуют ошибки, "
"связанные с попытками подключения по IPv6 - "
"попробуйте отключить в настройках прокси Telegram попытку соединения "
"по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 "
"в системе.\n\n"
"Это предупреждение будет показано только один раз."
)
def _has_ipv6() -> bool:
try:
for addr in _socket.getaddrinfo(_socket.gethostname(), None, _socket.AF_INET6):
ip = addr[4][0]
if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"):
return True
except Exception:
pass
try:
s = _socket.socket(_socket.AF_INET6, _socket.SOCK_STREAM)
s.bind(("::1", 0))
s.close()
return True
except Exception:
return False
def check_ipv6_warning(show_info: Callable[[str, str], None]) -> None:
ensure_dirs()
if IPV6_WARN_MARKER.exists() or not _has_ipv6():
return
IPV6_WARN_MARKER.touch()
threading.Thread(
target=lambda: show_info(_IPV6_WARNING, "TG WS Proxy"),
daemon=True,
).start()
# update check
def maybe_notify_update(
cfg: dict,
is_exiting: Callable[[], bool],
ask_open: Callable[[str, str], bool],
) -> None:
if not cfg.get("check_updates", True):
return
def _work():
time.sleep(1.5)
if is_exiting():
return
try:
from utils.update_check import RELEASES_PAGE_URL, get_status, run_check
import webbrowser
run_check(__version__)
st = get_status()
if not st.get("has_update"):
return
url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ver = st.get("latest") or "?"
if ask_open(
f"Доступна новая версия: {ver}\n\nОткрыть страницу релиза в браузере?",
"TG WS Proxy — обновление",
):
webbrowser.open(url)
except Exception as exc:
log.debug("Update check failed: %s", exc)
threading.Thread(target=_work, daemon=True, name="update-check").start()
# ctk thread (windows / linux)
_ctk_root: Any = None
_ctk_root_ready = threading.Event()
def ensure_ctk_thread(ctk: Any) -> bool:
global _ctk_root
if ctk is None:
return False
if _ctk_root_ready.is_set():
return True
def _run():
global _ctk_root
from ui.ctk_theme import apply_ctk_appearance, install_tkinter_variable_del_guard
install_tkinter_variable_del_guard()
apply_ctk_appearance(ctk)
_ctk_root = ctk.CTk()
_ctk_root.withdraw()
_ctk_root_ready.set()
_ctk_root.mainloop()
threading.Thread(target=_run, daemon=True, name="ctk-root").start()
_ctk_root_ready.wait(timeout=5.0)
return _ctk_root is not None
def ctk_run_dialog(build_fn: Callable[[threading.Event], None]) -> None:
if _ctk_root is None:
return
done = threading.Event()
def _invoke():
try:
build_fn(done)
except Exception:
log.exception("CTk dialog failed")
done.set()
_ctk_root.after(0, _invoke)
done.wait()
import gc
gc.collect()
def quit_ctk() -> None:
if _ctk_root is not None:
try:
_ctk_root.after(0, _ctk_root.quit)
except Exception:
pass
# common bootstrap
def bootstrap(cfg: dict) -> None:
save_config(cfg)
if LOG_FILE.exists():
try:
LOG_FILE.unlink()
except Exception:
pass
setup_logging(
cfg.get("verbose", False),
log_max_mb=cfg.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]),
)
log.info("TG WS Proxy версия %s starting", __version__)
log.info("Config: %s", cfg)
log.info("Log file: %s", LOG_FILE)

View File

@ -1,18 +1,11 @@
"""
Минимальная проверка новой версии через GitHub Releases API (без сторонних зависимостей).
Ограничение частоты запросов: не чаще одного раза в час на машину (кэш в каталоге
данных приложения). Поддерживается If-None-Match (ETag) для ответа 304.
"""
from __future__ import annotations
import json
import os
import sys
import time
from itertools import zip_longest
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
from typing import Any, Dict, Optional
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
@ -20,9 +13,6 @@ REPO = "Flowseal/tg-ws-proxy"
RELEASES_LATEST_API = f"https://api.github.com/repos/{REPO}/releases/latest"
RELEASES_PAGE_URL = f"https://github.com/{REPO}/releases/latest"
# Не чаще одного полного запроса к API в час (без учёта 304 с тем же ETag).
_MIN_FETCH_INTERVAL_SEC = 3600.0
_state: Dict[str, Any] = {
"checked": False,
"has_update": False,
@ -33,39 +23,6 @@ _state: Dict[str, Any] = {
}
def _cache_file() -> Optional[Path]:
try:
if sys.platform == "win32":
root = Path(os.environ.get("APPDATA", str(Path.home()))) / "TgWsProxy"
elif sys.platform == "darwin":
root = Path.home() / "Library/Application Support/TgWsProxy"
else:
xdg = os.environ.get("XDG_CONFIG_HOME")
root = (Path(xdg).expanduser() if xdg else Path.home() / ".config") / "TgWsProxy"
root.mkdir(parents=True, exist_ok=True)
return root / ".update_check_cache.json"
except OSError:
return None
def _load_cache(path: Optional[Path]) -> Dict[str, Any]:
if not path or not path.is_file():
return {}
try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {}
def _save_cache(path: Optional[Path], data: Dict[str, Any]) -> None:
if not path:
return
try:
path.write_text(json.dumps(data), encoding="utf-8")
except OSError:
pass
def _parse_version_tuple(s: str) -> tuple:
s = (s or "").strip().lstrip("vV")
if not s:
@ -95,56 +52,18 @@ def _version_gt(a: str, b: str) -> bool:
return False
def _apply_release_tag(
tag: str, html_url: str, current_version: str,
) -> None:
global _state
if not tag:
_state["has_update"] = False
_state["ahead_of_release"] = False
_state["latest"] = None
_state["html_url"] = html_url.strip() or RELEASES_PAGE_URL
return
latest_clean = tag.lstrip("vV")
cur = (current_version or "").strip().lstrip("vV")
_state["latest"] = latest_clean
_state["html_url"] = html_url.strip() or RELEASES_PAGE_URL
_state["has_update"] = _version_gt(latest_clean, cur)
_state["ahead_of_release"] = bool(latest_clean) and _version_gt(
cur, latest_clean
)
def fetch_latest_release(
timeout: float = 12.0,
etag: Optional[str] = None,
) -> Tuple[Optional[dict], Optional[str], int]:
"""
GET releases/latest. Возвращает (data или None при 304, etag или None, HTTP-код).
"""
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "tg-ws-proxy-update-check",
}
if etag:
headers["If-None-Match"] = etag
def fetch_latest_release(timeout: float = 12.0) -> Optional[dict]:
req = Request(
RELEASES_LATEST_API,
headers=headers,
headers={
"Accept": "application/vnd.github+json",
"User-Agent": "tg-ws-proxy-update-check",
},
method="GET",
)
try:
with urlopen(req, timeout=timeout) as resp:
code = getattr(resp, "status", None) or resp.getcode()
new_etag = resp.headers.get("ETag")
raw = resp.read().decode("utf-8", errors="replace")
return json.loads(raw), new_etag, int(code)
except HTTPError as e:
if e.code == 304:
hdrs = e.headers
new_etag = hdrs.get("ETag") if hdrs else None
return None, new_etag or etag, 304
raise
return json.loads(raw)
def run_check(current_version: str) -> None:
@ -152,41 +71,8 @@ def run_check(current_version: str) -> None:
global _state
_state["checked"] = True
_state["error"] = None
cache_path = _cache_file()
cache = _load_cache(cache_path)
now = time.time()
last_attempt = float(cache.get("last_attempt_at") or 0)
if last_attempt and (now - last_attempt) < _MIN_FETCH_INTERVAL_SEC:
tag = (cache.get("tag_name") or "").strip()
if tag:
_apply_release_tag(tag, cache.get("html_url") or "", current_version)
return
err = cache.get("last_error")
_state["error"] = (
err if err else "Проверка обновлений отложена (интервал между запросами)."
)
_state["has_update"] = False
_state["ahead_of_release"] = False
_state["latest"] = None
_state["html_url"] = RELEASES_PAGE_URL
return
etag = (cache.get("etag") or "").strip() or None
try:
data, new_etag, code = fetch_latest_release(etag=etag)
cache["last_attempt_at"] = now
if code == 304:
tag = (cache.get("tag_name") or "").strip()
url = (cache.get("html_url") or "").strip() or RELEASES_PAGE_URL
_apply_release_tag(tag, url, current_version)
if new_etag:
cache["etag"] = new_etag
_save_cache(cache_path, cache)
return
assert data is not None
data = fetch_latest_release()
tag = (data.get("tag_name") or "").strip()
html_url = (data.get("html_url") or "").strip() or RELEASES_PAGE_URL
if not tag:
@ -194,24 +80,17 @@ def run_check(current_version: str) -> None:
_state["ahead_of_release"] = False
_state["latest"] = None
_state["html_url"] = html_url
else:
_apply_release_tag(tag, html_url, current_version)
if new_etag:
cache["etag"] = new_etag
cache["tag_name"] = tag
cache["html_url"] = html_url
cache.pop("last_error", None)
_save_cache(cache_path, cache)
except (HTTPError, URLError, OSError, TimeoutError, ValueError, json.JSONDecodeError) as e:
cache["last_attempt_at"] = now
msg = str(e)
if isinstance(e, HTTPError) and e.code == 403:
msg = (
"GitHub API вернул 403 (лимит или доступ). Повторите позже."
return
latest_clean = tag.lstrip("vV")
cur = (current_version or "").strip().lstrip("vV")
_state["latest"] = latest_clean
_state["html_url"] = html_url
_state["has_update"] = _version_gt(latest_clean, cur)
_state["ahead_of_release"] = bool(latest_clean) and _version_gt(
cur, latest_clean
)
cache["last_error"] = msg
_save_cache(cache_path, cache)
_state["error"] = msg
except (HTTPError, URLError, OSError, TimeoutError, ValueError, json.JSONDecodeError) as e:
_state["error"] = str(e)
_state["has_update"] = False
_state["ahead_of_release"] = False
_state["latest"] = None

View File

@ -1,14 +1,20 @@
from __future__ import annotations
import ctypes
import ipaddress
import json
import logging
import logging.handlers
import os
import winreg
import psutil
import sys
import threading
import time
import webbrowser
import winreg
import asyncio as _asyncio
from pathlib import Path
from typing import Optional
from typing import Dict, Optional
try:
import pyperclip
@ -26,62 +32,196 @@ except ImportError:
ctk = None
try:
from PIL import Image
from PIL import Image, ImageDraw, ImageFont
except ImportError:
Image = None
Image = ImageDraw = ImageFont = None
import proxy.tg_ws_proxy as tg_ws_proxy
from utils.tray_common import (
APP_NAME, DEFAULT_CONFIG, FIRST_RUN_MARKER, IS_FROZEN, LOG_FILE,
acquire_lock, bootstrap, check_ipv6_warning, ctk_run_dialog,
ensure_ctk_thread, ensure_dirs, load_config, load_icon, log,
maybe_notify_update, quit_ctk, release_lock, restart_proxy,
save_config, start_proxy, stop_proxy, tg_proxy_url,
)
from proxy import __version__
from utils.default_config import default_tray_config
from ui.ctk_tray_ui import (
install_tray_config_buttons, install_tray_config_form,
populate_first_run_window, tray_settings_scroll_and_footer,
install_tray_config_buttons,
install_tray_config_form,
populate_first_run_window,
tray_settings_scroll_and_footer,
validate_config_form,
)
from ui.ctk_theme import (
CONFIG_DIALOG_FRAME_PAD, CONFIG_DIALOG_SIZE, FIRST_RUN_SIZE,
create_ctk_toplevel, ctk_theme_for_platform, main_content_frame,
CONFIG_DIALOG_FRAME_PAD,
CONFIG_DIALOG_SIZE,
FIRST_RUN_SIZE,
create_ctk_root,
ctk_theme_for_platform,
main_content_frame,
)
IS_FROZEN = bool(getattr(sys, "frozen", False))
APP_NAME = "TgWsProxy"
APP_DIR = Path(os.environ.get("APPDATA", Path.home())) / APP_NAME
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
DEFAULT_CONFIG = default_tray_config()
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[object] = None
_tray_icon: Optional[object] = None
_config: dict = {}
_exiting = False
_exiting: bool = False
_lock_file_path: Optional[Path] = None
ICON_PATH = str(Path(__file__).parent / "icon.ico")
log = logging.getLogger("tg-ws-tray")
# win32 dialogs
_u32 = ctypes.windll.user32
_u32.MessageBoxW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint]
_u32.MessageBoxW.restype = ctypes.c_int
_MB_OK_ERR = 0x10
_MB_OK_INFO = 0x40
_MB_YESNO_Q = 0x24
_IDYES = 6
_user32 = ctypes.windll.user32
_user32.MessageBoxW.argtypes = [
ctypes.c_void_p,
ctypes.c_wchar_p,
ctypes.c_wchar_p,
ctypes.c_uint,
]
_user32.MessageBoxW.restype = ctypes.c_int
def _show_error(text: str, title: str = "TG WS Proxy — Ошибка") -> None:
_u32.MessageBoxW(None, text, title, _MB_OK_ERR)
def _same_process(lock_meta: dict, proc: psutil.Process) -> bool:
try:
lock_ct = float(lock_meta.get("create_time", 0.0))
proc_ct = float(proc.create_time())
if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0:
return False
except Exception:
return False
try:
for arg in proc.cmdline():
if "windows.py" in arg:
return True
except Exception:
pass
frozen = bool(getattr(sys, "frozen", False))
if frozen:
return (
os.path.basename(sys.executable).lower() == proc.name().lower()
)
return False
def _show_info(text: str, title: str = "TG WS Proxy") -> None:
_u32.MessageBoxW(None, text, title, _MB_OK_INFO)
def _release_lock():
global _lock_file_path
if not _lock_file_path:
return
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
return _u32.MessageBoxW(None, text, title, _MB_YESNO_Q) == _IDYES
def _acquire_lock() -> bool:
global _lock_file_path
_ensure_dirs()
lock_files = list(APP_DIR.glob("*.lock"))
for f in lock_files:
pid = None
meta: dict = {}
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
meta = {}
try:
proc = psutil.Process(pid)
if _same_process(meta, proc):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
payload = {
"create_time": proc.create_time(),
}
lock_file.write_text(json.dumps(payload, ensure_ascii=False),
encoding="utf-8")
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
# autostart (registry)
def _ensure_dirs():
APP_DIR.mkdir(parents=True, exist_ok=True)
_RUN_KEY = r"Software\Microsoft\Windows\CurrentVersion\Run"
def load_config() -> dict:
_ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict):
_ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
def setup_logging(verbose: bool = False, log_max_mb: float = 5):
_ensure_dirs()
root = logging.getLogger()
root.setLevel(logging.DEBUG if verbose else logging.INFO)
fh = logging.handlers.RotatingFileHandler(
str(LOG_FILE),
maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024),
backupCount=0,
encoding='utf-8',
)
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"))
root.addHandler(fh)
if not getattr(sys, "frozen", False):
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG if verbose else logging.INFO)
ch.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(message)s",
datefmt="%H:%M:%S"))
root.addHandler(ch)
def _autostart_reg_name() -> str:
return APP_NAME
def _supports_autostart() -> bool:
@ -94,213 +234,472 @@ def _autostart_command() -> str:
def is_autostart_enabled() -> bool:
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, _RUN_KEY, 0, winreg.KEY_READ) as k:
val, _ = winreg.QueryValueEx(k, APP_NAME)
return str(val).strip() == _autostart_command().strip()
except (FileNotFoundError, OSError):
with winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
0,
winreg.KEY_READ,
) as k:
val, _ = winreg.QueryValueEx(k, _autostart_reg_name())
stored = str(val).strip()
expected = _autostart_command().strip()
return stored == expected
except FileNotFoundError:
return False
except OSError:
return False
def set_autostart_enabled(enabled: bool) -> None:
try:
with winreg.CreateKey(winreg.HKEY_CURRENT_USER, _RUN_KEY) as k:
with winreg.CreateKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
) as k:
if enabled:
winreg.SetValueEx(k, APP_NAME, 0, winreg.REG_SZ, _autostart_command())
winreg.SetValueEx(
k,
_autostart_reg_name(),
0,
winreg.REG_SZ,
_autostart_command(),
)
else:
try:
winreg.DeleteValue(k, APP_NAME)
winreg.DeleteValue(k, _autostart_reg_name())
except FileNotFoundError:
pass
except OSError as exc:
log.error("Failed to update autostart: %s", exc)
_show_error(
"Не удалось изменить автозапуск.\n\n"
"Попробуйте запустить приложение от имени пользователя "
f"с правами на реестр.\n\nОшибка: {exc}"
"Попробуйте запустить приложение от имени пользователя с правами на реестр.\n\n"
f"Ошибка: {exc}"
)
# tray callbacks
def _make_icon_image(size: int = 64):
if Image is None:
raise RuntimeError("Pillow is required for tray icon")
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
def _on_open_in_telegram(icon=None, item=None) -> None:
url = tg_proxy_url(_config)
margin = 2
draw.ellipse([margin, margin, size - margin, size - margin],
fill=(0, 136, 204, 255))
try:
font = ImageFont.truetype("arial.ttf", size=int(size * 0.55))
except Exception:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
tx = (size - tw) // 2 - bbox[0]
ty = (size - th) // 2 - bbox[1]
draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font)
return img
def _load_icon():
icon_path = Path(__file__).parent / "icon.ico"
if icon_path.exists() and Image:
try:
return Image.open(str(icon_path))
except Exception:
pass
return _make_icon_image()
def _run_proxy_thread(port: int, dc_opt: Dict[int, str], verbose: bool,
host: str = '127.0.0.1'):
global _async_stop
loop = _asyncio.new_event_loop()
_asyncio.set_event_loop(loop)
stop_ev = _asyncio.Event()
_async_stop = (loop, stop_ev)
try:
loop.run_until_complete(
tg_ws_proxy._run(port, dc_opt, stop_event=stop_ev, host=host))
except Exception as exc:
log.error("Proxy thread crashed: %s", exc)
if "10048" in str(exc) or "Address already in use" in str(exc):
_show_error("Не удалось запустить прокси:\nПорт уже используется другим приложением.\n\nЗакройте приложение, использующее этот порт, или измените порт в настройках прокси и перезапустите.")
finally:
loop.close()
_async_stop = None
def start_proxy():
global _proxy_thread, _config
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
cfg = _config
port = cfg.get("port", DEFAULT_CONFIG["port"])
host = cfg.get("host", DEFAULT_CONFIG["host"])
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
verbose = cfg.get("verbose", False)
try:
dc_opt = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
_show_error(f"Ошибка конфигурации:\n{e}")
return
log.info("Starting proxy on %s:%d ...", host, port)
buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])
pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])
tg_ws_proxy._RECV_BUF = max(4, buf_kb) * 1024
tg_ws_proxy._SEND_BUF = tg_ws_proxy._RECV_BUF
tg_ws_proxy._WS_POOL_SIZE = max(0, pool_size)
_proxy_thread = threading.Thread(
target=_run_proxy_thread,
args=(port, dc_opt, verbose, host),
daemon=True, name="proxy")
_proxy_thread.start()
def stop_proxy():
global _proxy_thread, _async_stop
if _async_stop:
loop, stop_ev = _async_stop
loop.call_soon_threadsafe(stop_ev.set)
if _proxy_thread:
_proxy_thread.join(timeout=5)
if _proxy_thread.is_alive():
log.warning(
"Proxy thread did not finish within timeout; "
"the process may still exit shortly")
_proxy_thread = None
log.info("Proxy stopped")
def restart_proxy():
log.info("Restarting proxy...")
stop_proxy()
time.sleep(0.3)
start_proxy()
def _show_error(text: str, title: str = "TG WS Proxy — Ошибка"):
_user32.MessageBoxW(None, text, title, 0x10)
def _show_info(text: str, title: str = "TG WS Proxy"):
_user32.MessageBoxW(None, text, title, 0x40)
def _ask_open_release_page(latest_version: str, url: str) -> bool:
"""Win32 Yes/No: открыть страницу релиза."""
MB_YESNO = 0x4
MB_ICONQUESTION = 0x20
IDYES = 6
text = (
f"Доступна новая версия: {latest_version}\n\n"
f"Открыть страницу релиза в браузере?"
)
r = _user32.MessageBoxW(
None,
text,
"TG WS Proxy — обновление",
MB_YESNO | MB_ICONQUESTION,
)
return r == IDYES
def _maybe_notify_update_async():
"""
Фоновая проверка GitHub Releases и уведомление (не блокирует трей).
"""
def _work():
time.sleep(1.5)
if _exiting:
return
if not _config.get("check_updates", True):
return
try:
from utils.update_check import RELEASES_PAGE_URL, get_status, run_check
run_check(__version__)
st = get_status()
if not st.get("has_update"):
return
url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ver = st.get("latest") or "?"
if _ask_open_release_page(str(ver), url):
webbrowser.open(url)
except Exception as exc:
log.debug("Update check failed: %s", exc)
threading.Thread(target=_work, daemon=True, name="update-check").start()
def _on_open_in_telegram(icon=None, item=None):
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
url = f"tg://socks?server={host}&port={port}"
log.info("Opening %s", url)
try:
if not webbrowser.open(url):
raise RuntimeError
result = webbrowser.open(url)
if not result:
raise RuntimeError("webbrowser.open returned False")
except Exception:
log.info("Browser open failed, copying to clipboard")
if pyperclip is None:
_show_error(
"Не удалось открыть Telegram автоматически.\n\n"
f"Установите пакет pyperclip для копирования в буфер или откройте вручную:\n{url}"
)
f"Установите пакет pyperclip для копирования в буфер или откройте вручную:\n{url}")
return
try:
pyperclip.copy(url)
_show_info(
"Не удалось открыть Telegram автоматически.\n\n"
f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}"
)
f"Не удалось открыть Telegram автоматически.\n\n"
f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}",
"TG WS Proxy")
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_copy_link(icon=None, item=None) -> None:
url = tg_proxy_url(_config)
log.info("Copying link: %s", url)
if pyperclip is None:
_show_error(
"Установите пакет pyperclip для копирования в буфер обмена."
)
return
try:
pyperclip.copy(url)
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_restart(icon=None, item=None):
threading.Thread(target=restart_proxy, daemon=True).start()
def _on_restart(icon=None, item=None) -> None:
threading.Thread(
target=lambda: restart_proxy(_config, _show_error), daemon=True
).start()
def _on_edit_config(icon=None, item=None) -> None:
def _on_edit_config(icon=None, item=None):
threading.Thread(target=_edit_config_dialog, daemon=True).start()
def _on_open_logs(icon=None, item=None) -> None:
def _edit_config_dialog():
if ctk is None:
_show_error("customtkinter не установлен.")
return
cfg = dict(_config)
cfg["autostart"] = is_autostart_enabled()
# Make sure that the autostart key is removed if autostart
# is disabled, even if the executable file is moved.
if _supports_autostart() and not cfg["autostart"]:
set_autostart_enabled(False)
theme = ctk_theme_for_platform()
w, h = CONFIG_DIALOG_SIZE
if _supports_autostart():
h += 100
icon_path = str(Path(__file__).parent / "icon.ico")
root = create_ctk_root(
ctk,
title="TG WS Proxy — Настройки",
width=w,
height=h,
theme=theme,
after_create=lambda r: r.iconbitmap(icon_path),
)
fpx, fpy = CONFIG_DIALOG_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
scroll, footer = tray_settings_scroll_and_footer(ctk, frame, theme)
widgets = install_tray_config_form(
ctk,
scroll,
theme,
cfg,
DEFAULT_CONFIG,
show_autostart=_supports_autostart(),
autostart_value=cfg.get("autostart", False),
)
def on_save():
merged = validate_config_form(
widgets,
DEFAULT_CONFIG,
include_autostart=_supports_autostart(),
)
if isinstance(merged, str):
_show_error(merged)
return
new_cfg = merged
save_config(new_cfg)
_config.update(new_cfg)
log.info("Config saved: %s", new_cfg)
if _supports_autostart():
set_autostart_enabled(bool(new_cfg.get("autostart", False)))
_tray_icon.menu = _build_menu()
# Win32 MessageBox из того же потока, что и mainloop CTk, блокирует обработку Tcl/Tk
# и даёт зависание; tkinter.messagebox согласован с циклом окна.
from tkinter import messagebox
if messagebox.askyesno("Перезапустить?",
"Настройки сохранены.\n\n"
"Перезапустить прокси сейчас?",
parent=root):
root.destroy()
restart_proxy()
else:
root.destroy()
def on_cancel():
root.destroy()
install_tray_config_buttons(
ctk, footer, theme, on_save=on_save, on_cancel=on_cancel)
try:
root.mainloop()
finally:
import tkinter as tk
try:
if root.winfo_exists():
root.destroy()
except tk.TclError:
pass
def _on_open_logs(icon=None, item=None):
log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists():
os.startfile(str(LOG_FILE))
else:
_show_info("Файл логов ещё не создан.")
_show_info("Файл логов ещё не создан.", "TG WS Proxy")
def _on_exit(icon=None, item=None) -> None:
def _on_exit(icon=None, item=None):
global _exiting
if _exiting:
os._exit(0)
return
_exiting = True
log.info("User requested exit")
quit_ctk()
threading.Thread(target=lambda: (time.sleep(3), os._exit(0)), daemon=True, name="force-exit").start()
def _force_exit():
time.sleep(3)
os._exit(0)
threading.Thread(target=_force_exit, daemon=True, name="force-exit").start()
if icon:
icon.stop()
# settings dialog
def _edit_config_dialog() -> None:
if not ensure_ctk_thread(ctk):
_show_error("customtkinter не установлен.")
return
cfg = dict(_config)
cfg["autostart"] = is_autostart_enabled()
if _supports_autostart() and not cfg["autostart"]:
set_autostart_enabled(False)
def _build(done: threading.Event) -> None:
theme = ctk_theme_for_platform()
w, h = CONFIG_DIALOG_SIZE
if _supports_autostart():
h += 100
root = create_ctk_toplevel(
ctk, title="TG WS Proxy — Настройки", width=w, height=h, theme=theme,
after_create=lambda r: r.iconbitmap(ICON_PATH),
)
fpx, fpy = CONFIG_DIALOG_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
scroll, footer = tray_settings_scroll_and_footer(ctk, frame, theme)
widgets = install_tray_config_form(
ctk, scroll, theme, cfg, DEFAULT_CONFIG,
show_autostart=_supports_autostart(),
autostart_value=cfg.get("autostart", False),
)
def _finish() -> None:
root.destroy()
done.set()
def on_save() -> None:
from tkinter import messagebox
merged = validate_config_form(widgets, DEFAULT_CONFIG, include_autostart=_supports_autostart())
if isinstance(merged, str):
messagebox.showerror("TG WS Proxy — Ошибка", merged, parent=root)
return
save_config(merged)
_config.update(merged)
log.info("Config saved: %s", merged)
if _supports_autostart():
set_autostart_enabled(bool(merged.get("autostart", False)))
_tray_icon.menu = _build_menu()
do_restart = messagebox.askyesno(
"Перезапустить?",
"Настройки сохранены.\n\nПерезапустить прокси сейчас?",
parent=root,
)
_finish()
if do_restart:
threading.Thread(target=lambda: restart_proxy(_config, _show_error), daemon=True).start()
root.protocol("WM_DELETE_WINDOW", _finish)
install_tray_config_buttons(ctk, footer, theme, on_save=on_save, on_cancel=_finish)
ctk_run_dialog(_build)
# first run
def _show_first_run() -> None:
ensure_dirs()
def _show_first_run():
_ensure_dirs()
if FIRST_RUN_MARKER.exists():
return
if not ensure_ctk_thread(ctk):
FIRST_RUN_MARKER.touch()
return
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
secret = _config.get("secret", DEFAULT_CONFIG["secret"])
def _build(done: threading.Event) -> None:
if ctk is None:
FIRST_RUN_MARKER.touch()
return
theme = ctk_theme_for_platform()
icon_path = str(Path(__file__).parent / "icon.ico")
w, h = FIRST_RUN_SIZE
root = create_ctk_toplevel(
ctk, title="TG WS Proxy", width=w, height=h, theme=theme,
after_create=lambda r: r.iconbitmap(ICON_PATH),
root = create_ctk_root(
ctk,
title="TG WS Proxy",
width=w,
height=h,
theme=theme,
after_create=lambda r: r.iconbitmap(icon_path),
)
def on_done(open_tg: bool) -> None:
def on_done(open_tg: bool):
FIRST_RUN_MARKER.touch()
root.destroy()
done.set()
if open_tg:
_on_open_in_telegram()
populate_first_run_window(ctk, root, theme, host=host, port=port, secret=secret, on_done=on_done)
populate_first_run_window(
ctk, root, theme, host=host, port=port, on_done=on_done)
ctk_run_dialog(_build)
try:
root.mainloop()
finally:
import tkinter as tk
try:
if root.winfo_exists():
root.destroy()
except tk.TclError:
pass
# tray menu
def _has_ipv6_enabled() -> bool:
import socket as _sock
try:
addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6)
for addr in addrs:
ip = addr[4][0]
if not ip or ip.startswith("::1"):
continue
try:
if ipaddress.IPv6Address(ip).is_link_local:
continue
except ValueError:
if ip.startswith("fe80:"):
continue
return True
except Exception:
pass
try:
s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM)
s.bind(('::1', 0))
s.close()
return True
except Exception:
return False
def _check_ipv6_warning():
_ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
if not _has_ipv6_enabled():
return
IPV6_WARN_MARKER.touch()
threading.Thread(target=_show_ipv6_dialog, daemon=True).start()
def _show_ipv6_dialog():
_show_info(
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает или в логах присутствуют ошибки, "
"связанные с попытками подключения по IPv6 - "
"попробуйте отключить в настройках прокси Telegram попытку соединения "
"по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 "
"в системе.\n\n"
"Это предупреждение будет показано только один раз.",
"TG WS Proxy")
def _build_menu():
if pystray is None:
return None
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
link_host = tg_ws_proxy.get_link_host(host)
return pystray.Menu(
pystray.MenuItem(f"Открыть в Telegram ({link_host}:{port})", _on_open_in_telegram, default=True),
pystray.MenuItem("Скопировать ссылку", _on_copy_link),
pystray.MenuItem(
f"Открыть в Telegram ({host}:{port})",
_on_open_in_telegram,
default=True),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Перезапустить прокси", _on_restart),
pystray.MenuItem("Настройки...", _on_edit_config),
@ -310,17 +709,29 @@ def _build_menu():
)
# entry point
def run_tray() -> None:
def run_tray():
global _tray_icon, _config
_config = load_config()
bootstrap(_config)
save_config(_config)
if LOG_FILE.exists():
try:
LOG_FILE.unlink()
except Exception:
pass
setup_logging(_config.get("verbose", False),
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]))
log.info("TG WS Proxy версия %s, tray app starting", __version__)
log.info("Config: %s", _config)
log.info("Log file: %s", LOG_FILE)
if pystray is None or Image is None or ctk is None:
log.error("pystray, Pillow or customtkinter not installed; running in console mode")
start_proxy(_config, _show_error)
log.error(
"pystray, Pillow or customtkinter not installed; "
"running in console mode")
start_proxy()
try:
while True:
time.sleep(1)
@ -328,12 +739,20 @@ def run_tray() -> None:
stop_proxy()
return
start_proxy(_config, _show_error)
maybe_notify_update(_config, lambda: _exiting, _ask_yes_no)
_show_first_run()
check_ipv6_warning(_show_info)
start_proxy()
_maybe_notify_update_async()
_show_first_run()
_check_ipv6_warning()
icon_image = _load_icon()
_tray_icon = pystray.Icon(
APP_NAME,
icon_image,
"TG WS Proxy",
menu=_build_menu())
_tray_icon = pystray.Icon(APP_NAME, load_icon(), "TG WS Proxy", menu=_build_menu())
log.info("Tray icon running")
_tray_icon.run()
@ -341,14 +760,15 @@ def run_tray() -> None:
log.info("Tray app exited")
def main() -> None:
if not acquire_lock("windows.py"):
def main():
if not _acquire_lock():
_show_info("Приложение уже запущено.", os.path.basename(sys.argv[0]))
return
try:
run_tray()
finally:
release_lock()
_release_lock()
if __name__ == "__main__":