tg-ws-proxy/windows.py

574 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import ctypes
import json
import logging
import os
import psutil
import sys
import threading
import time
import webbrowser
import pystray
import pyperclip
import asyncio as _asyncio
import customtkinter as ctk
from pathlib import Path
from typing import Dict, Optional
from PIL import Image, ImageDraw, ImageFont
import proxy.tg_ws_proxy as tg_ws_proxy
APP_NAME = "TgWsProxy"
APP_DIR = Path(os.environ.get("APPDATA", Path.home())) / APP_NAME
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done"
DEFAULT_CONFIG = {
"port": 1080,
"dc_ip": ["2:149.154.167.220", "4:149.154.167.220"],
"verbose": False,
}
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[object] = None
_tray_icon: Optional[object] = None
_config: dict = {}
_exiting: bool = False
log = logging.getLogger("tg-ws-tray")
def _acquire_lock() -> bool:
_ensure_dirs()
lock_files = list(APP_DIR.glob("*.lock"))
for f in lock_files:
try:
pid = int(f.stem)
if psutil.pid_exists(pid):
try:
psutil.Process(pid).status()
return False
except (psutil.NoSuchProcess, psutil.ZombieProcess):
pass
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
lock_file.touch()
return True
def _ensure_dirs():
APP_DIR.mkdir(parents=True, exist_ok=True)
def load_config() -> dict:
_ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict):
_ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
def setup_logging(verbose: bool = False):
_ensure_dirs()
root = logging.getLogger()
root.setLevel(logging.DEBUG if verbose else logging.INFO)
fh = logging.FileHandler(str(LOG_FILE), encoding="utf-8")
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"))
root.addHandler(fh)
if not getattr(sys, "frozen", False):
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG if verbose else logging.INFO)
ch.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(message)s",
datefmt="%H:%M:%S"))
root.addHandler(ch)
def _make_icon_image(size: int = 64):
if Image is None:
raise RuntimeError("Pillow is required for tray icon")
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = 2
draw.ellipse([margin, margin, size - margin, size - margin],
fill=(0, 136, 204, 255))
try:
font = ImageFont.truetype("arial.ttf", size=int(size * 0.55))
except Exception:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
tx = (size - tw) // 2 - bbox[0]
ty = (size - th) // 2 - bbox[1]
draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font)
return img
def _load_icon():
icon_path = Path(__file__).parent / "icon.ico"
if icon_path.exists() and Image:
try:
return Image.open(str(icon_path))
except Exception:
pass
return _make_icon_image()
def _run_proxy_thread(port: int, dc_opt: Dict[int, str], verbose: bool):
global _async_stop
loop = _asyncio.new_event_loop()
_asyncio.set_event_loop(loop)
stop_ev = _asyncio.Event()
_async_stop = (loop, stop_ev)
try:
loop.run_until_complete(
tg_ws_proxy._run(port, dc_opt, stop_event=stop_ev))
except Exception as exc:
log.error("Proxy thread crashed: %s", exc)
if "10048" in str(exc) or "Address already in use" in str(exc):
_show_error("Не удалось запустить прокси:\nПорт уже используется другим приложением.\n\nЗакройте приложение, использующее этот порт, или измените порт в настройках прокси и перезапустите.")
finally:
loop.close()
_async_stop = None
def start_proxy():
global _proxy_thread, _config
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
cfg = _config
port = cfg.get("port", DEFAULT_CONFIG["port"])
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
verbose = cfg.get("verbose", False)
try:
dc_opt = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
_show_error(f"Ошибка конфигурации:\n{e}")
return
log.info("Starting proxy on port %d ...", port)
_proxy_thread = threading.Thread(
target=_run_proxy_thread,
args=(port, dc_opt, verbose),
daemon=True, name="proxy")
_proxy_thread.start()
def stop_proxy():
global _proxy_thread, _async_stop
if _async_stop:
loop, stop_ev = _async_stop
loop.call_soon_threadsafe(stop_ev.set)
if _proxy_thread:
_proxy_thread.join(timeout=2)
_proxy_thread = None
log.info("Proxy stopped")
def restart_proxy():
log.info("Restarting proxy...")
stop_proxy()
time.sleep(0.3)
start_proxy()
def _show_error(text: str, title: str = "TG WS Proxy — Ошибка"):
ctypes.windll.user32.MessageBoxW(0, text, title, 0x10)
def _show_info(text: str, title: str = "TG WS Proxy"):
ctypes.windll.user32.MessageBoxW(0, text, title, 0x40)
def _on_open_in_telegram(icon=None, item=None):
port = _config.get("port", DEFAULT_CONFIG["port"])
url = f"tg://socks?server=127.0.0.1&port={port}"
log.info("Opening %s", url)
try:
result = webbrowser.open(url)
if not result:
raise RuntimeError("webbrowser.open returned False")
except Exception:
log.info("Browser open failed, copying to clipboard")
try:
pyperclip.copy(url)
_show_info(
f"Не удалось открыть Telegram автоматически.\n\n"
f"Ссылка скопирована в буфер обмена, отправьте её в телеграмм и нажмите по ней ЛКМ:\n{url}",
"TG WS Proxy")
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_restart(icon=None, item=None):
threading.Thread(target=restart_proxy, daemon=True).start()
def _on_edit_config(icon=None, item=None):
threading.Thread(target=_edit_config_dialog, daemon=True).start()
def _edit_config_dialog():
if ctk is None:
_show_error("customtkinter не установлен.")
return
cfg = dict(_config)
ctk.set_appearance_mode("light")
ctk.set_default_color_theme("blue")
root = ctk.CTk()
root.title("TG WS Proxy — Настройки")
root.resizable(False, False)
root.attributes("-topmost", True)
TG_BLUE = "#3390ec"
TG_BLUE_HOVER = "#2b7cd4"
BG = "#ffffff"
FIELD_BG = "#f0f2f5"
FIELD_BORDER = "#d6d9dc"
TEXT_PRIMARY = "#000000"
TEXT_SECONDARY = "#707579"
FONT_FAMILY = "Segoe UI"
w, h = 420, 400
sw = root.winfo_screenwidth()
sh = root.winfo_screenheight()
root.geometry(f"{w}x{h}+{(sw-w)//2}+{(sh-h)//2}")
root.configure(fg_color=BG)
frame = ctk.CTkFrame(root, fg_color=BG, corner_radius=0)
frame.pack(fill="both", expand=True, padx=24, pady=20)
# Port
ctk.CTkLabel(frame, text="Порт прокси",
font=(FONT_FAMILY, 13), text_color=TEXT_PRIMARY,
anchor="w").pack(anchor="w", pady=(0, 4))
port_var = ctk.StringVar(value=str(cfg.get("port", 1080)))
port_entry = ctk.CTkEntry(frame, textvariable=port_var, width=120, height=36,
font=(FONT_FAMILY, 13), corner_radius=10,
fg_color=FIELD_BG, border_color=FIELD_BORDER,
border_width=1, text_color=TEXT_PRIMARY)
port_entry.pack(anchor="w", pady=(0, 12))
# DC-IP mappings
ctk.CTkLabel(frame, text="DC → IP маппинги (по одному на строку, формат DC:IP)",
font=(FONT_FAMILY, 13), text_color=TEXT_PRIMARY,
anchor="w").pack(anchor="w", pady=(0, 4))
dc_textbox = ctk.CTkTextbox(frame, width=370, height=120,
font=("Consolas", 12), corner_radius=10,
fg_color=FIELD_BG, border_color=FIELD_BORDER,
border_width=1, text_color=TEXT_PRIMARY)
dc_textbox.pack(anchor="w", pady=(0, 12))
dc_textbox.insert("1.0", "\n".join(cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])))
# Verbose
verbose_var = ctk.BooleanVar(value=cfg.get("verbose", False))
ctk.CTkCheckBox(frame, text="Подробное логирование (verbose)",
variable=verbose_var, font=(FONT_FAMILY, 13),
text_color=TEXT_PRIMARY,
fg_color=TG_BLUE, hover_color=TG_BLUE_HOVER,
corner_radius=6, border_width=2,
border_color=FIELD_BORDER).pack(anchor="w", pady=(0, 8))
# Info label
ctk.CTkLabel(frame, text="Изменения вступят в силу после перезапуска прокси.",
font=(FONT_FAMILY, 11), text_color=TEXT_SECONDARY,
anchor="w").pack(anchor="w", pady=(0, 16))
def on_save():
try:
port_val = int(port_var.get().strip())
if not (1 <= port_val <= 65535):
raise ValueError
except ValueError:
_show_error("Порт должен быть числом 1-65535")
return
lines = [l.strip() for l in dc_textbox.get("1.0", "end").strip().splitlines()
if l.strip()]
try:
tg_ws_proxy.parse_dc_ip_list(lines)
except ValueError as e:
_show_error(str(e))
return
new_cfg = {
"port": port_val,
"dc_ip": lines,
"verbose": verbose_var.get(),
}
save_config(new_cfg)
_config.update(new_cfg)
log.info("Config saved: %s", new_cfg)
from tkinter import messagebox
if messagebox.askyesno("Перезапустить?",
"Настройки сохранены.\n\n"
"Перезапустить прокси сейчас?",
parent=root):
root.destroy()
restart_proxy()
else:
root.destroy()
def on_cancel():
root.destroy()
btn_frame = ctk.CTkFrame(frame, fg_color="transparent")
btn_frame.pack(fill="x")
ctk.CTkButton(btn_frame, text="Сохранить", width=140, height=38,
font=(FONT_FAMILY, 14, "bold"), corner_radius=10,
fg_color=TG_BLUE, hover_color=TG_BLUE_HOVER,
text_color="#ffffff",
command=on_save).pack(side="left", padx=(0, 10))
ctk.CTkButton(btn_frame, text="Отмена", width=140, height=38,
font=(FONT_FAMILY, 14), corner_radius=10,
fg_color=FIELD_BG, hover_color=FIELD_BORDER,
text_color=TEXT_PRIMARY, border_width=1,
border_color=FIELD_BORDER,
command=on_cancel).pack(side="left")
root.mainloop()
def _on_open_logs(icon=None, item=None):
log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists():
os.startfile(str(LOG_FILE))
else:
_show_info("Файл логов ещё не создан.", "TG WS Proxy")
def _on_exit(icon=None, item=None):
global _exiting
if _exiting:
os._exit(0)
return
_exiting = True
log.info("User requested exit")
def _force_exit():
time.sleep(3)
os._exit(0)
threading.Thread(target=_force_exit, daemon=True, name="force-exit").start()
if icon:
icon.stop()
def _show_first_run():
_ensure_dirs()
if FIRST_RUN_MARKER.exists():
return
port = _config.get("port", DEFAULT_CONFIG["port"])
tg_url = f"tg://socks?server=127.0.0.1&port={port}"
if ctk is None:
FIRST_RUN_MARKER.touch()
return
ctk.set_appearance_mode("light")
ctk.set_default_color_theme("blue")
TG_BLUE = "#3390ec"
TG_BLUE_HOVER = "#2b7cd4"
BG = "#ffffff"
FIELD_BG = "#f0f2f5"
FIELD_BORDER = "#d6d9dc"
TEXT_PRIMARY = "#000000"
TEXT_SECONDARY = "#707579"
FONT_FAMILY = "Segoe UI"
root = ctk.CTk()
root.title("TG WS Proxy")
root.resizable(False, False)
root.attributes("-topmost", True)
w, h = 520, 440
sw = root.winfo_screenwidth()
sh = root.winfo_screenheight()
root.geometry(f"{w}x{h}+{(sw-w)//2}+{(sh-h)//2}")
root.configure(fg_color=BG)
frame = ctk.CTkFrame(root, fg_color=BG, corner_radius=0)
frame.pack(fill="both", expand=True, padx=28, pady=24)
title_frame = ctk.CTkFrame(frame, fg_color="transparent")
title_frame.pack(anchor="w", pady=(0, 16), fill="x")
# Blue accent bar
accent_bar = ctk.CTkFrame(title_frame, fg_color=TG_BLUE,
width=4, height=32, corner_radius=2)
accent_bar.pack(side="left", padx=(0, 12))
ctk.CTkLabel(title_frame, text="Прокси запущен и работает в системном трее",
font=(FONT_FAMILY, 17, "bold"),
text_color=TEXT_PRIMARY).pack(side="left")
# Info sections
sections = [
("Как подключить Telegram Desktop:", True),
(" Автоматически:", True),
(f" ПКМ по иконке в трее → «Открыть в Telegram»", False),
(f" Или ссылка: {tg_url}", False),
("\n Вручную:", True),
(" Настройки → Продвинутые → Тип подключения → Прокси", False),
(f" SOCKS5 → 127.0.0.1 : {port} (без логина/пароля)", False),
]
for text, bold in sections:
weight = "bold" if bold else "normal"
ctk.CTkLabel(frame, text=text,
font=(FONT_FAMILY, 13, weight),
text_color=TEXT_PRIMARY,
anchor="w", justify="left").pack(anchor="w", pady=1)
# Spacer
ctk.CTkFrame(frame, fg_color="transparent", height=16).pack()
# Separator
ctk.CTkFrame(frame, fg_color=FIELD_BORDER, height=1,
corner_radius=0).pack(fill="x", pady=(0, 12))
# Checkbox
auto_var = ctk.BooleanVar(value=True)
ctk.CTkCheckBox(frame, text="Открыть прокси в Telegram сейчас",
variable=auto_var, font=(FONT_FAMILY, 13),
text_color=TEXT_PRIMARY,
fg_color=TG_BLUE, hover_color=TG_BLUE_HOVER,
corner_radius=6, border_width=2,
border_color=FIELD_BORDER).pack(anchor="w", pady=(0, 16))
def on_ok():
FIRST_RUN_MARKER.touch()
open_tg = auto_var.get()
root.destroy()
if open_tg:
_on_open_in_telegram()
ctk.CTkButton(frame, text="Начать", width=180, height=42,
font=(FONT_FAMILY, 15, "bold"), corner_radius=10,
fg_color=TG_BLUE, hover_color=TG_BLUE_HOVER,
text_color="#ffffff",
command=on_ok).pack(pady=(0, 0))
root.protocol("WM_DELETE_WINDOW", on_ok)
root.mainloop()
def _build_menu():
if pystray is None:
return None
port = _config.get("port", DEFAULT_CONFIG["port"])
return pystray.Menu(
pystray.MenuItem(
f"Открыть в Telegram (:{port})",
_on_open_in_telegram,
default=True),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Перезапустить прокси", _on_restart),
pystray.MenuItem("Настройки...", _on_edit_config),
pystray.MenuItem("Открыть логи", _on_open_logs),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Выход", _on_exit),
)
def run_tray():
global _tray_icon, _config
_config = load_config()
save_config(_config)
if LOG_FILE.exists():
try:
LOG_FILE.unlink()
except Exception:
pass
setup_logging(_config.get("verbose", False))
log.info("TG WS Proxy tray app starting")
log.info("Config: %s", _config)
log.info("Log file: %s", LOG_FILE)
if pystray is None or Image is None:
log.error("pystray or Pillow not installed; "
"running in console mode")
start_proxy()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop_proxy()
return
start_proxy()
_show_first_run()
icon_image = _load_icon()
_tray_icon = pystray.Icon(
APP_NAME,
icon_image,
"TG WS Proxy",
menu=_build_menu())
log.info("Tray icon running")
_tray_icon.run()
stop_proxy()
log.info("Tray app exited")
def main():
if not _acquire_lock():
_show_info("Приложение уже запущено.", os.path.basename(sys.argv[0]))
return
run_tray()
if __name__ == "__main__":
main()