tg-ws-proxy/macos.py

627 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import logging
import os
import psutil
import subprocess
import sys
import threading
import time
import webbrowser
import asyncio as _asyncio
from pathlib import Path
from typing import Dict, Optional
try:
import rumps
except ImportError:
rumps = None
try:
from PIL import Image, ImageDraw, ImageFont
except ImportError:
Image = ImageDraw = ImageFont = None
try:
import pyperclip
except ImportError:
pyperclip = None
import proxy.tg_ws_proxy as tg_ws_proxy
APP_NAME = "TgWsProxy"
APP_DIR = Path.home() / "Library" / "Application Support" / APP_NAME
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
MENUBAR_ICON_PATH = APP_DIR / "menubar_icon.png"
DEFAULT_CONFIG = {
"port": 1080,
"host": "127.0.0.1",
"dc_ip": ["2:149.154.167.220", "4:149.154.167.220"],
"verbose": False,
}
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[object] = None
_app: Optional[object] = None
_config: dict = {}
_exiting: bool = False
_lock_file_path: Optional[Path] = None
log = logging.getLogger("tg-ws-tray")
# Single-instance lock
def _same_process(lock_meta: dict, proc: psutil.Process) -> bool:
try:
lock_ct = float(lock_meta.get("create_time", 0.0))
proc_ct = float(proc.create_time())
if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0:
return False
except Exception:
return False
frozen = bool(getattr(sys, "frozen", False))
if frozen:
return APP_NAME.lower() in proc.name().lower()
return False
def _release_lock():
global _lock_file_path
if not _lock_file_path:
return
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
def _acquire_lock() -> bool:
global _lock_file_path
_ensure_dirs()
lock_files = list(APP_DIR.glob("*.lock"))
for f in lock_files:
pid = None
meta: dict = {}
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
meta = {}
try:
proc = psutil.Process(pid)
if _same_process(meta, proc):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
payload = {"create_time": proc.create_time()}
lock_file.write_text(json.dumps(payload, ensure_ascii=False),
encoding="utf-8")
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
# Filesystem helpers
def _ensure_dirs():
APP_DIR.mkdir(parents=True, exist_ok=True)
def load_config() -> dict:
_ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict):
_ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
def setup_logging(verbose: bool = False):
_ensure_dirs()
root = logging.getLogger()
root.setLevel(logging.DEBUG if verbose else logging.INFO)
fh = logging.FileHandler(str(LOG_FILE), encoding="utf-8")
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"))
root.addHandler(fh)
if not getattr(sys, "frozen", False):
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG if verbose else logging.INFO)
ch.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(message)s",
datefmt="%H:%M:%S"))
root.addHandler(ch)
# Menubar icon
def _make_menubar_icon(size: int = 44):
if Image is None:
return None
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = size // 11
draw.ellipse([margin, margin, size - margin, size - margin],
fill=(0, 0, 0, 255))
try:
font = ImageFont.truetype(
"/System/Library/Fonts/Helvetica.ttc",
size=int(size * 0.55))
except Exception:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
tx = (size - tw) // 2 - bbox[0]
ty = (size - th) // 2 - bbox[1]
draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font)
return img
# Generate menubar icon PNG if it does not exist.
def _ensure_menubar_icon():
if MENUBAR_ICON_PATH.exists():
return
_ensure_dirs()
img = _make_menubar_icon(44)
if img:
img.save(str(MENUBAR_ICON_PATH), "PNG")
# Native macOS dialogs
def _osascript(script: str) -> str:
r = subprocess.run(
['osascript', '-e', script],
capture_output=True, text=True)
return r.stdout.strip()
def _show_error(text: str, title: str = "TG WS Proxy"):
text_esc = text.replace('\\', '\\\\').replace('"', '\\"')
title_esc = title.replace('\\', '\\\\').replace('"', '\\"')
_osascript(
f'display dialog "{text_esc}" with title "{title_esc}" '
f'buttons {{"OK"}} default button "OK" with icon stop')
def _show_info(text: str, title: str = "TG WS Proxy"):
text_esc = text.replace('\\', '\\\\').replace('"', '\\"')
title_esc = title.replace('\\', '\\\\').replace('"', '\\"')
_osascript(
f'display dialog "{text_esc}" with title "{title_esc}" '
f'buttons {{"OK"}} default button "OK" with icon note')
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
text_esc = text.replace('\\', '\\\\').replace('"', '\\"')
title_esc = title.replace('\\', '\\\\').replace('"', '\\"')
result = _osascript(
f'display dialog "{text_esc}" with title "{title_esc}" '
f'buttons {{"Нет", "Да"}} default button "Да" with icon note')
return "Да" in result
# Proxy lifecycle
def _run_proxy_thread(port: int, dc_opt: Dict[int, str], verbose: bool,
host: str = '127.0.0.1'):
global _async_stop
loop = _asyncio.new_event_loop()
_asyncio.set_event_loop(loop)
stop_ev = _asyncio.Event()
_async_stop = (loop, stop_ev)
try:
loop.run_until_complete(
tg_ws_proxy._run(port, dc_opt, stop_event=stop_ev, host=host))
except Exception as exc:
log.error("Proxy thread crashed: %s", exc)
if "Address already in use" in str(exc):
_show_error(
"Не удалось запустить прокси:\n"
"Порт уже используется другим приложением.\n\n"
"Закройте приложение, использующее этот порт, "
"или измените порт в настройках прокси и перезапустите.")
finally:
loop.close()
_async_stop = None
def start_proxy():
global _proxy_thread, _config
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
cfg = _config
port = cfg.get("port", DEFAULT_CONFIG["port"])
host = cfg.get("host", DEFAULT_CONFIG["host"])
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
verbose = cfg.get("verbose", False)
try:
dc_opt = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
_show_error(f"Ошибка конфигурации:\n{e}")
return
log.info("Starting proxy on %s:%d ...", host, port)
_proxy_thread = threading.Thread(
target=_run_proxy_thread,
args=(port, dc_opt, verbose, host),
daemon=True, name="proxy")
_proxy_thread.start()
def stop_proxy():
global _proxy_thread, _async_stop
if _async_stop:
loop, stop_ev = _async_stop
loop.call_soon_threadsafe(stop_ev.set)
if _proxy_thread:
_proxy_thread.join(timeout=2)
_proxy_thread = None
log.info("Proxy stopped")
def restart_proxy():
log.info("Restarting proxy...")
stop_proxy()
time.sleep(0.3)
start_proxy()
# Menu callbacks
def _on_open_in_telegram(_=None):
port = _config.get("port", DEFAULT_CONFIG["port"])
url = f"tg://socks?server=127.0.0.1&port={port}"
log.info("Opening %s", url)
try:
result = subprocess.call(['open', url])
if result != 0:
raise RuntimeError("open command failed")
except Exception:
log.info("open command failed, trying webbrowser")
try:
if not webbrowser.open(url):
raise RuntimeError("webbrowser.open returned False")
except Exception:
log.info("Browser open failed, copying to clipboard")
try:
if pyperclip:
pyperclip.copy(url)
else:
subprocess.run(['pbcopy'], input=url.encode(),
check=True)
_show_info(
"Не удалось открыть Telegram автоматически.\n\n"
f"Ссылка скопирована в буфер обмена:\n{url}")
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_restart(_=None):
def _do_restart():
global _config
_config = load_config()
if _app:
_app.update_menu_title()
restart_proxy()
threading.Thread(target=_do_restart, daemon=True).start()
def _on_open_logs(_=None):
log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists():
subprocess.call(['open', str(LOG_FILE)])
else:
_show_info("Файл логов ещё не создан.")
# Show a native text input dialog. Returns None if cancelled.
def _osascript_input(prompt: str, default: str,
title: str = "TG WS Proxy") -> Optional[str]:
prompt_esc = prompt.replace('\\', '\\\\').replace('"', '\\"')
default_esc = default.replace('\\', '\\\\').replace('"', '\\"')
title_esc = title.replace('\\', '\\\\').replace('"', '\\"')
r = subprocess.run(
['osascript', '-e',
f'display dialog "{prompt_esc}" '
f'default answer "{default_esc}" '
f'with title "{title_esc}" '
f'buttons {{"Отмена", "OK"}} default button "OK"'],
capture_output=True, text=True)
if r.returncode != 0:
return None
for part in r.stdout.strip().split(', '):
if part.startswith('text returned:'):
return part[len('text returned:'):]
return None
def _on_edit_config(_=None):
threading.Thread(target=_edit_config_dialog, daemon=True).start()
# Settings via native macOS dialogs
def _edit_config_dialog():
cfg = load_config()
# Host
host = _osascript_input(
"IP-адрес прокси:",
cfg.get("host", DEFAULT_CONFIG["host"]))
if host is None:
return
host = host.strip()
import socket as _sock
try:
_sock.inet_aton(host)
except OSError:
_show_error("Некорректный IP-адрес.")
return
# Port
port_str = _osascript_input(
"Порт прокси:",
str(cfg.get("port", DEFAULT_CONFIG["port"])))
if port_str is None:
return
try:
port = int(port_str.strip())
if not (1 <= port <= 65535):
raise ValueError
except ValueError:
_show_error("Порт должен быть числом 1-65535")
return
# DC-IP mappings
dc_default = ", ".join(cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"]))
dc_str = _osascript_input(
"DC → IP маппинги (через запятую, формат DC:IP):\n"
"Например: 2:149.154.167.220, 4:149.154.167.220",
dc_default)
if dc_str is None:
return
dc_lines = [s.strip() for s in dc_str.replace(',', '\n').splitlines()
if s.strip()]
try:
tg_ws_proxy.parse_dc_ip_list(dc_lines)
except ValueError as e:
_show_error(str(e))
return
# Verbose
verbose = _ask_yes_no("Включить подробное логирование (verbose)?")
new_cfg = {
"host": host,
"port": port,
"dc_ip": dc_lines,
"verbose": verbose,
}
save_config(new_cfg)
log.info("Config saved: %s", new_cfg)
global _config
_config = new_cfg
if _app:
_app.update_menu_title()
if _ask_yes_no("Настройки сохранены.\n\nПерезапустить прокси сейчас?"):
restart_proxy()
# First-run & IPv6 dialogs
def _show_first_run():
_ensure_dirs()
if FIRST_RUN_MARKER.exists():
return
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
tg_url = f"tg://socks?server={host}&port={port}"
text = (
f"Прокси запущен и работает в строке меню.\n\n"
f"Как подключить Telegram Desktop:\n\n"
f"Автоматически:\n"
f" Нажмите «Открыть в Telegram» в меню\n"
f" Или ссылка: {tg_url}\n\n"
f"Вручную:\n"
f" Настройки → Продвинутые → Тип подключения → Прокси\n"
f" SOCKS5 → {host} : {port} (без логина/пароля)\n\n"
f"Открыть прокси в Telegram сейчас?"
)
FIRST_RUN_MARKER.touch()
if _ask_yes_no(text, "TG WS Proxy"):
_on_open_in_telegram()
def _has_ipv6_enabled() -> bool:
import socket as _sock
try:
addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6)
for addr in addrs:
ip = addr[4][0]
if ip and not ip.startswith('::1') and not ip.startswith('fe80::1'):
return True
except Exception:
pass
try:
s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM)
s.bind(('::1', 0))
s.close()
return True
except Exception:
return False
def _check_ipv6_warning():
_ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
if not _has_ipv6_enabled():
return
IPV6_WARN_MARKER.touch()
_show_info(
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает, попробуйте отключить "
"попытку соединения по IPv6 в настройках прокси Telegram.\n\n"
"Это предупреждение будет показано только один раз.")
# rumps menubar app
_TgWsProxyAppBase = rumps.App if rumps else object
class TgWsProxyApp(_TgWsProxyAppBase):
def __init__(self):
_ensure_menubar_icon()
icon_path = (str(MENUBAR_ICON_PATH)
if MENUBAR_ICON_PATH.exists() else None)
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
self._open_tg_item = rumps.MenuItem(
f"Открыть в Telegram ({host}:{port})",
callback=_on_open_in_telegram)
self._restart_item = rumps.MenuItem(
"Перезапустить прокси",
callback=_on_restart)
self._settings_item = rumps.MenuItem(
"Настройки...",
callback=_on_edit_config)
self._logs_item = rumps.MenuItem(
"Открыть логи",
callback=_on_open_logs)
super().__init__(
"TG WS Proxy",
icon=icon_path,
template=True,
quit_button="Выход",
menu=[
self._open_tg_item,
None,
self._restart_item,
self._settings_item,
self._logs_item,
])
def update_menu_title(self):
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
self._open_tg_item.title = (
f"Открыть в Telegram ({host}:{port})")
def run_menubar():
global _app, _config
_config = load_config()
save_config(_config)
if LOG_FILE.exists():
try:
LOG_FILE.unlink()
except Exception:
pass
setup_logging(_config.get("verbose", False))
log.info("TG WS Proxy menubar app starting")
log.info("Config: %s", _config)
log.info("Log file: %s", LOG_FILE)
if rumps is None or Image is None:
log.error("rumps or Pillow not installed; running in console mode")
start_proxy()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop_proxy()
return
start_proxy()
_show_first_run()
_check_ipv6_warning()
_app = TgWsProxyApp()
log.info("Menubar app running")
_app.run()
stop_proxy()
log.info("Menubar app exited")
def main():
if not _acquire_lock():
_show_info("Приложение уже запущено.")
return
try:
run_menubar()
finally:
_release_lock()
if __name__ == "__main__":
main()