Compare commits

..

No commits in common. "main" and "v1.2.1" have entirely different histories.
main ... v1.2.1

20 changed files with 2702 additions and 2849 deletions

View File

@ -1,28 +0,0 @@
.git
.github
.gitignore
__pycache__/
*.py[cod]
*.pyo
*.egg-info/
.pytest_cache/
.mypy_cache/
.ruff_cache/
.venv/
venv/
dist/
build/
packaging/
windows.py
icon.ico
*.spec
*.spec.bak
*.manifest
*.log
.vscode/
.idea/
*.swp
*.swo
.DS_Store
Thumbs.db
Desktop.ini

View File

@ -17,20 +17,20 @@ permissions:
contents: write contents: write
jobs: jobs:
build-windows: build:
runs-on: windows-latest runs-on: windows-latest
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v6 uses: actions/checkout@v4
- name: Setup Python - name: Setup Python
uses: actions/setup-python@v6 uses: actions/setup-python@v5
with: with:
python-version: "3.12" python-version: "3.12"
cache: "pip" cache: "pip"
- name: Install dependencies - name: Install dependencies
run: pip install . run: pip install ".[win10]"
- name: Install pyinstaller - name: Install pyinstaller
run: pip install "pyinstaller==6.13.0" run: pip install "pyinstaller==6.13.0"
@ -42,49 +42,47 @@ jobs:
run: mv dist/TgWsProxy.exe dist/TgWsProxy_windows.exe run: mv dist/TgWsProxy.exe dist/TgWsProxy_windows.exe
- name: Upload artifact - name: Upload artifact
uses: actions/upload-artifact@v7 uses: actions/upload-artifact@v4
with: with:
name: TgWsProxy name: TgWsProxy
path: dist/TgWsProxy_windows.exe path: |
dist/TgWsProxy_windows.exe
build-win7: build-win7:
runs-on: windows-latest runs-on: windows-latest
strategy:
matrix:
include:
- arch: x64
suffix: 64bit
- arch: x86
suffix: 32bit
steps: steps:
- uses: actions/checkout@v6 - name: Checkout
uses: actions/checkout@v4
- uses: actions/setup-python@v6 - name: Setup Python 3.8 (last version supporting Win7)
uses: actions/setup-python@v5
with: with:
python-version: "3.8" python-version: "3.8"
architecture: ${{ matrix.arch }}
cache: "pip" cache: "pip"
- name: Install dependencies & pyinstaller - name: Install dependencies (Win7-compatible)
run: pip install . "pyinstaller==5.13.2" run: pip install ".[win7]"
- name: Build EXE with PyInstaller - name: Install pyinstaller
run: pip install "pyinstaller==5.13.2"
- name: Build EXE with PyInstaller (Win7)
run: pyinstaller packaging/windows.spec --noconfirm run: pyinstaller packaging/windows.spec --noconfirm
- name: Rename artifact - name: Rename artifact
run: mv dist/TgWsProxy.exe dist/TgWsProxy_windows_7_${{ matrix.suffix }}.exe run: mv dist/TgWsProxy.exe dist/TgWsProxy_windows_7.exe
- name: Upload artifact - name: Upload artifact
uses: actions/upload-artifact@v7 uses: actions/upload-artifact@v4
with: with:
name: TgWsProxy-win7-${{ matrix.suffix }} name: TgWsProxy-win7
path: dist/TgWsProxy_windows_7_${{ matrix.suffix }}.exe path: dist/TgWsProxy_windows_7.exe
build-macos: build-macos:
runs-on: macos-latest runs-on: macos-latest
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v6 uses: actions/checkout@v4
- name: Install universal2 Python - name: Install universal2 Python
run: | run: |
@ -144,7 +142,7 @@ jobs:
-w wheelhouse/universal2 -w wheelhouse/universal2
python3.12 -m pip install --no-deps wheelhouse/universal2/*.whl python3.12 -m pip install --no-deps wheelhouse/universal2/*.whl
python3.12 -m pip install . python3.12 -m pip install ".[macos]"
python3.12 -m pip install pyinstaller==6.13.0 python3.12 -m pip install pyinstaller==6.13.0
- name: Create macOS icon from ICO - name: Create macOS icon from ICO
@ -219,7 +217,7 @@ jobs:
rm -rf "$DMG_TEMP" rm -rf "$DMG_TEMP"
- name: Upload artifact - name: Upload artifact
uses: actions/upload-artifact@v7 uses: actions/upload-artifact@v4
with: with:
name: TgWsProxy-macOS name: TgWsProxy-macOS
path: dist/TgWsProxy_macos_universal.dmg path: dist/TgWsProxy_macos_universal.dmg
@ -228,7 +226,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v6 uses: actions/checkout@v4
- name: Install system dependencies - name: Install system dependencies
run: | run: |
@ -246,7 +244,7 @@ jobs:
- name: Install dependencies - name: Install dependencies
run: | run: |
.venv/bin/pip install --upgrade pip .venv/bin/pip install --upgrade pip
.venv/bin/pip install . .venv/bin/pip install ".[linux]"
.venv/bin/pip install "pyinstaller==6.13.0" .venv/bin/pip install "pyinstaller==6.13.0"
- name: Build binary with PyInstaller - name: Build binary with PyInstaller
@ -303,7 +301,7 @@ jobs:
Maintainer: Flowseal Maintainer: Flowseal
Depends: libgtk-3-0, libayatana-appindicator3-1, python3-tk Depends: libgtk-3-0, libayatana-appindicator3-1, python3-tk
Description: Telegram Desktop WebSocket Bridge Proxy Description: Telegram Desktop WebSocket Bridge Proxy
MTProto/WebSocket bridge proxy for Telegram Desktop with tray UI. SOCKS5/WebSocket bridge proxy for Telegram Desktop with tray UI.
EOF EOF
dpkg-deb --build --root-owner-group \ dpkg-deb --build --root-owner-group \
@ -311,7 +309,7 @@ jobs:
"dist/TgWsProxy_linux_amd64.deb" "dist/TgWsProxy_linux_amd64.deb"
- name: Upload artifact - name: Upload artifact
uses: actions/upload-artifact@v7 uses: actions/upload-artifact@v4
with: with:
name: TgWsProxy-linux name: TgWsProxy-linux
path: | path: |
@ -319,15 +317,33 @@ jobs:
dist/TgWsProxy_linux_amd64.deb dist/TgWsProxy_linux_amd64.deb
release: release:
needs: [build-windows, build-win7, build-macos, build-linux] needs: [build, build-win7, build-macos, build-linux]
runs-on: ubuntu-latest runs-on: ubuntu-latest
if: ${{ github.event.inputs.make_release == 'true' }} if: ${{ github.event.inputs.make_release == 'true' }}
steps: steps:
- uses: actions/download-artifact@v8 - name: Download main build
uses: actions/download-artifact@v4
with: with:
pattern: TgWsProxy* name: TgWsProxy
path: dist
- name: Download Win7 build
uses: actions/download-artifact@v4
with:
name: TgWsProxy-win7
path: dist
- name: Download macOS build
uses: actions/download-artifact@v4
with:
name: TgWsProxy-macOS
path: dist
- name: Download Linux build
uses: actions/download-artifact@v4
with:
name: TgWsProxy-linux
path: dist path: dist
merge-multiple: true
- name: Create GitHub Release - name: Create GitHub Release
uses: softprops/action-gh-release@v2 uses: softprops/action-gh-release@v2
@ -338,8 +354,7 @@ jobs:
## TG WS Proxy ${{ github.event.inputs.version }} ## TG WS Proxy ${{ github.event.inputs.version }}
files: | files: |
dist/TgWsProxy_windows.exe dist/TgWsProxy_windows.exe
dist/TgWsProxy_windows_7_64bit.exe dist/TgWsProxy_windows_7.exe
dist/TgWsProxy_windows_7_32bit.exe
dist/TgWsProxy_macos_universal.dmg dist/TgWsProxy_macos_universal.dmg
dist/TgWsProxy_linux_amd64 dist/TgWsProxy_linux_amd64
dist/TgWsProxy_linux_amd64.deb dist/TgWsProxy_linux_amd64.deb

View File

@ -1,45 +0,0 @@
# syntax=docker/dockerfile:1.7
FROM python:3.12-slim AS builder
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1 \
PIP_NO_CACHE_DIR=1 \
VIRTUAL_ENV=/opt/venv
RUN apt-get update \
&& apt-get install -y --no-install-recommends build-essential cargo libffi-dev libssl-dev \
&& python -m venv "$VIRTUAL_ENV" \
&& "$VIRTUAL_ENV/bin/pip" install --upgrade pip setuptools wheel \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
RUN "$VIRTUAL_ENV/bin/pip" install cryptography==46.0.5
FROM python:3.12-slim AS runtime
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PATH=/opt/venv/bin:$PATH \
TG_WS_PROXY_HOST=0.0.0.0 \
TG_WS_PROXY_PORT=1443 \
TG_WS_PROXY_DC_IPS="2:149.154.167.220 4:149.154.167.220"
RUN apt-get update \
&& apt-get install -y --no-install-recommends tini ca-certificates \
&& rm -rf /var/lib/apt/lists/* \
&& groupadd --system app \
&& useradd --system --gid app --create-home --home-dir /home/app app
WORKDIR /app
COPY --from=builder /opt/venv /opt/venv
COPY proxy ./proxy
COPY README.md LICENSE ./
USER app
EXPOSE 1443/tcp
ENTRYPOINT ["/usr/bin/tini", "--", "/bin/sh", "-lc", "set -eu; args=\"--host ${TG_WS_PROXY_HOST} --port ${TG_WS_PROXY_PORT}\"; for dc in ${TG_WS_PROXY_DC_IPS}; do args=\"$args --dc-ip $dc\"; done; exec python -u proxy/tg_ws_proxy.py $args \"$@\"", "--"]
CMD []

View File

@ -12,17 +12,17 @@
# TG WS Proxy # TG WS Proxy
**Локальный MTProto-прокси** для Telegram Desktop, который **ускоряет работу Telegram**, перенаправляя трафик через WebSocket-соединения. Данные передаются в том же зашифрованном виде, а для работы не нужны сторонние сервера. **Локальный SOCKS5-прокси** для Telegram Desktop, который **ускоряет работу Telegram**, перенаправляя трафик через WebSocket-соединения. Данные передаются в том же зашифрованном виде, а для работы не нужны сторонние сервера.
<img width="529" height="487" alt="image" src="https://github.com/user-attachments/assets/6a4cf683-0df8-43af-86c1-0e8f08682b62" /> <img width="529" height="487" alt="image" src="https://github.com/user-attachments/assets/6a4cf683-0df8-43af-86c1-0e8f08682b62" />
## Как это работает ## Как это работает
``` ```
Telegram Desktop → MTProto Proxy (127.0.0.1:1443) → WebSocket → Telegram DC Telegram Desktop → SOCKS5 (127.0.0.1:1080) → TG WS Proxy → WSS → Telegram DC
``` ```
1. Приложение поднимает MTProto прокси на `127.0.0.1:1443` 1. Приложение поднимает локальный SOCKS5-прокси на `127.0.0.1:1080`
2. Перехватывает подключения к IP-адресам Telegram 2. Перехватывает подключения к IP-адресам Telegram
3. Извлекает DC ID из MTProto obfuscation init-пакета 3. Извлекает DC ID из MTProto obfuscation init-пакета
4. Устанавливает WebSocket (TLS) соединение к соответствующему DC через домены Telegram 4. Устанавливает WebSocket (TLS) соединение к соответствующему DC через домены Telegram
@ -38,14 +38,12 @@ Telegram Desktop → MTProto Proxy (127.0.0.1:1443) → WebSocket → Telegram D
**Меню трея:** **Меню трея:**
- **Открыть в Telegram** — автоматически настроить прокси через `tg://proxy` ссылку - **Открыть в Telegram** — автоматически настроить прокси через `tg://socks` ссылку
- **Перезапустить прокси** — перезапуск без выхода из приложения - **Перезапустить прокси** — перезапуск без выхода из приложения
- **Настройки...** — GUI-редактор конфигурации (в т.ч. версия приложения, опциональная проверка обновлений с GitHub) - **Настройки...** — GUI-редактор конфигурации
- **Открыть логи** — открыть файл логов - **Открыть логи** — открыть файл логов
- **Выход** — остановить прокси и закрыть приложение - **Выход** — остановить прокси и закрыть приложение
При первом запуске после старта может появиться запрос об открытии страницы релиза, если на GitHub вышла новая версия (отключается в настройках).
### macOS ### macOS
Перейдите на [страницу релизов](https://github.com/Flowseal/tg-ws-proxy/releases) и скачайте **`TgWsProxy_macos_universal.dmg`** — универсальная сборка для Apple Silicon и Intel. Перейдите на [страницу релизов](https://github.com/Flowseal/tg-ws-proxy/releases) и скачайте **`TgWsProxy_macos_universal.dmg`** — универсальная сборка для Apple Silicon и Intel.
@ -58,22 +56,6 @@ Telegram Desktop → MTProto Proxy (127.0.0.1:1443) → WebSocket → Telegram D
Для Debian/Ubuntu скачайте со [страницы релизов](https://github.com/Flowseal/tg-ws-proxy/releases) пакет **`TgWsProxy_linux_amd64.deb`**. Для Debian/Ubuntu скачайте со [страницы релизов](https://github.com/Flowseal/tg-ws-proxy/releases) пакет **`TgWsProxy_linux_amd64.deb`**.
Для Arch и Arch-Based дистрибутивов подготовлены пакеты в AUR: [tg-ws-proxy-bin](https://aur.archlinux.org/packages/tg-ws-proxy-bin), [tg-ws-proxy-git](https://aur.archlinux.org/packages/tg-ws-proxy-git), [tg-ws-proxy-cli](https://aur.archlinux.org/packages/tg-ws-proxy-cli)
```shell
# Установка без AUR-helper
git clone https://aur.archlinux.org/tg-ws-proxy-bin.git
cd tg-ws-proxy-bin
makepkg -si
# При помощи AUR-helper
paru -S tg-ws-proxy-bin
# Если вы установили -cli пакет, то запуск осуществляется через systemctl, где 8888 это номер порта,
# разделитель ":" и secret, который можно сгенерировать командой: openssl rand -hex 16
sudo systemctl start tg-ws-proxy-cli@8888:3075abe65830f0325116bb0416cadf9f
```
Для остальных дистрибутивов можно использовать **`TgWsProxy_linux_amd64`** (бинарный файл для x86_64). Для остальных дистрибутивов можно использовать **`TgWsProxy_linux_amd64`** (бинарный файл для x86_64).
```bash ```bash
@ -87,31 +69,38 @@ chmod +x TgWsProxy_linux_amd64
### Консольный proxy ### Консольный proxy
Для запуска только proxy без tray-интерфейса достаточно базовой установки: Для запуска только SOCKS5/WebSocket proxy без tray-интерфейса достаточно базовой установки:
```bash ```bash
pip install -e . pip install -e .
tg-ws-proxy tg-ws-proxy
``` ```
### Windows 7/10+ ### Windows 10+
```bash ```bash
pip install -e . pip install -e ".[win10]"
tg-ws-proxy-tray-win
```
### Windows 7
```bash
pip install -e ".[win7]"
tg-ws-proxy-tray-win tg-ws-proxy-tray-win
``` ```
### macOS ### macOS
```bash ```bash
pip install -e . pip install -e ".[macos]"
tg-ws-proxy-tray-macos tg-ws-proxy-tray-macos
``` ```
### Linux ### Linux
```bash ```bash
pip install -e . pip install -e ".[linux]"
tg-ws-proxy-tray-linux tg-ws-proxy-tray-linux
``` ```
@ -125,15 +114,9 @@ tg-ws-proxy [--port PORT] [--host HOST] [--dc-ip DC:IP ...] [-v]
| Аргумент | По умолчанию | Описание | | Аргумент | По умолчанию | Описание |
|---|---|---| |---|---|---|
| `--port` | `1443` | Порт прокси | | `--port` | `1080` | Порт SOCKS5-прокси |
| `--host` | `127.0.0.1` | Хост прокси | | `--host` | `127.0.0.1` | Хост SOCKS5-прокси |
| `--secret` | `random` | 32 hex chars secret для авторизации клиентов |
| `--dc-ip` | `2:149.154.167.220`, `4:149.154.167.220` | Целевой IP для DC (можно указать несколько раз) | | `--dc-ip` | `2:149.154.167.220`, `4:149.154.167.220` | Целевой IP для DC (можно указать несколько раз) |
| `--buf-kb` | `256` | Размер буфера в КБ
| `--pool-size` | `4` | Количество заготовленных соединений на каждый DC
| `--log-file` | выкл. | Путь до файла, в который сохранять логи
| `--log-max-mb` | `5` | Максимальный размер файла логов в МБ (после идёт перезапись)
| `--log-backups` | `0` | Количество сохранений логов после перезаписи
| `-v`, `--verbose` | выкл. | Подробное логирование (DEBUG) | | `-v`, `--verbose` | выкл. | Подробное логирование (DEBUG) |
**Примеры:** **Примеры:**
@ -173,10 +156,10 @@ tg-ws-proxy-tray-linux = "linux:main"
1. Telegram → **Настройки****Продвинутые настройки****Тип подключения** → **Прокси** 1. Telegram → **Настройки****Продвинутые настройки****Тип подключения** → **Прокси**
2. Добавить прокси: 2. Добавить прокси:
- **Тип:** MTProto - **Тип:** SOCKS5
- **Сервер:** `127.0.0.1` (или переопределенный вами) - **Сервер:** `127.0.0.1`
- **Порт:** `1443` (или переопределенный вами) - **Порт:** `1080`
- **Secret:** из настроек или логов - **Логин/Пароль:** оставить пустыми
## Конфигурация ## Конфигурация
@ -188,23 +171,15 @@ Tray-приложение хранит данные в:
```json ```json
{ {
"host": "127.0.0.1", "port": 1080,
"port": 1443,
"secret": "...",
"dc_ip": [ "dc_ip": [
"2:149.154.167.220", "2:149.154.167.220",
"4:149.154.167.220" "4:149.154.167.220"
], ],
"verbose": false, "verbose": false
"buf_kb": 256,
"pool_size": 4,
"log_max_mb": 5.0,
"check_updates": true
} }
``` ```
Ключ **`check_updates`** — при `true` при запросе к GitHub сравнивается версия с последним релизом (только уведомление и ссылка на страницу загрузки). На Windows в конфиге может быть **`autostart`** (автозапуск при входе в систему).
## Автоматическая сборка ## Автоматическая сборка
Проект содержит спецификации PyInstaller ([`packaging/windows.spec`](packaging/windows.spec), [`packaging/macos.spec`](packaging/macos.spec), [`packaging/linux.spec`](packaging/linux.spec)) и GitHub Actions workflow ([`.github/workflows/build.yml`](.github/workflows/build.yml)) для автоматической сборки. Проект содержит спецификации PyInstaller ([`packaging/windows.spec`](packaging/windows.spec), [`packaging/macos.spec`](packaging/macos.spec), [`packaging/linux.spec`](packaging/linux.spec)) и GitHub Actions workflow ([`.github/workflows/build.yml`](.github/workflows/build.yml)) для автоматической сборки.
@ -212,8 +187,7 @@ Tray-приложение хранит данные в:
Минимально поддерживаемые версии ОС для текущих бинарных сборок: Минимально поддерживаемые версии ОС для текущих бинарных сборок:
- Windows 10+ для `TgWsProxy_windows.exe` - Windows 10+ для `TgWsProxy_windows.exe`
- Windows 7 (x64) для `TgWsProxy_windows_7_64bit.exe` - Windows 7 для `TgWsProxy_windows_7.exe`
- Windows 7 (x32) для `TgWsProxy_windows_7_32bit.exe`
- Intel macOS 10.15+ - Intel macOS 10.15+
- Apple Silicon macOS 11.0+ - Apple Silicon macOS 11.0+
- Linux x86_64 (требуется AppIndicator для системного трея) - Linux x86_64 (требуется AppIndicator для системного трея)

BIN
icon.ico

Binary file not shown.

Before

Width:  |  Height:  |  Size: 473 B

After

Width:  |  Height:  |  Size: 5.7 KiB

911
linux.py

File diff suppressed because it is too large Load Diff

607
macos.py
View File

@ -1,13 +1,18 @@
from __future__ import annotations from __future__ import annotations
import json
import logging
import logging.handlers
import os import os
import psutil
import subprocess import subprocess
import sys import sys
import threading import threading
import time import time
import webbrowser import webbrowser
import asyncio as _asyncio
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Dict, Optional
try: try:
import rumps import rumps
@ -25,135 +30,242 @@ except ImportError:
pyperclip = None pyperclip = None
import proxy.tg_ws_proxy as tg_ws_proxy import proxy.tg_ws_proxy as tg_ws_proxy
from proxy import __version__
from utils.tray_common import (
APP_DIR, APP_NAME, DEFAULT_CONFIG, FIRST_RUN_MARKER, IPV6_WARN_MARKER,
LOG_FILE, acquire_lock, apply_proxy_config, ensure_dirs, load_config,
log, release_lock, save_config, setup_logging, stop_proxy, tg_proxy_url,
)
APP_NAME = "TgWsProxy"
APP_DIR = Path.home() / "Library" / "Application Support" / APP_NAME
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
MENUBAR_ICON_PATH = APP_DIR / "menubar_icon.png" MENUBAR_ICON_PATH = APP_DIR / "menubar_icon.png"
DEFAULT_CONFIG = {
"port": 1080,
"host": "127.0.0.1",
"dc_ip": ["2:149.154.167.220", "4:149.154.167.220"],
"verbose": False,
"log_max_mb": 5,
"buf_kb": 256,
"pool_size": 4,
}
_proxy_thread: Optional[threading.Thread] = None _proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[object] = None _async_stop: Optional[object] = None
_app: Optional[object] = None _app: Optional[object] = None
_config: dict = {} _config: dict = {}
_exiting: bool = False _exiting: bool = False
_lock_file_path: Optional[Path] = None
# osascript dialogs log = logging.getLogger("tg-ws-tray")
def _esc(text: str) -> str: # Single-instance lock
return text.replace("\\", "\\\\").replace('"', '\\"')
def _same_process(lock_meta: dict, proc: psutil.Process) -> bool:
def _osascript(script: str) -> str: try:
r = subprocess.run(["osascript", "-e", script], capture_output=True, text=True) lock_ct = float(lock_meta.get("create_time", 0.0))
return r.stdout.strip() proc_ct = float(proc.create_time())
if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0:
return False
def _show_error(text: str, title: str = "TG WS Proxy") -> None: except Exception:
_osascript( return False
f'display dialog "{_esc(text)}" with title "{_esc(title)}" '
f'buttons {{"OK"}} default button "OK" with icon stop' frozen = bool(getattr(sys, "frozen", False))
) if frozen:
return APP_NAME.lower() in proc.name().lower()
def _show_info(text: str, title: str = "TG WS Proxy") -> None:
_osascript(
f'display dialog "{_esc(text)}" with title "{_esc(title)}" '
f'buttons {{"OK"}} default button "OK" with icon note'
)
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
return _ask_yes_no_close(text, title) is True
def _ask_yes_no_close(text: str, title: str = "TG WS Proxy") -> Optional[bool]:
r = subprocess.run(
[
"osascript", "-e",
f'button returned of (display dialog "{_esc(text)}" '
f'with title "{_esc(title)}" '
f'buttons {{"Закрыть", "Нет", "Да"}} '
f'default button "Да" cancel button "Закрыть" with icon note)',
],
capture_output=True, text=True,
)
if r.returncode != 0:
return None
btn = r.stdout.strip()
if btn == "Да":
return True
if btn == "Нет":
return False return False
return None
def _osascript_input(prompt: str, default: str, title: str = "TG WS Proxy") -> Optional[str]: def _release_lock():
r = subprocess.run( global _lock_file_path
[ if not _lock_file_path:
"osascript", "-e", return
f'text returned of (display dialog "{_esc(prompt)}" ' try:
f'default answer "{_esc(default)}" ' _lock_file_path.unlink(missing_ok=True)
f'with title "{_esc(title)}" ' except Exception:
f'buttons {{"Закрыть", "OK"}} ' pass
f'default button "OK" cancel button "Закрыть")', _lock_file_path = None
],
capture_output=True, text=True,
def _acquire_lock() -> bool:
global _lock_file_path
_ensure_dirs()
lock_files = list(APP_DIR.glob("*.lock"))
for f in lock_files:
pid = None
meta: dict = {}
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
meta = {}
try:
proc = psutil.Process(pid)
if _same_process(meta, proc):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
payload = {"create_time": proc.create_time()}
lock_file.write_text(json.dumps(payload, ensure_ascii=False),
encoding="utf-8")
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
# Filesystem helpers
def _ensure_dirs():
APP_DIR.mkdir(parents=True, exist_ok=True)
def load_config() -> dict:
_ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict):
_ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
def setup_logging(verbose: bool = False, log_max_mb: float = 5):
_ensure_dirs()
root = logging.getLogger()
root.setLevel(logging.DEBUG if verbose else logging.INFO)
fh = logging.handlers.RotatingFileHandler(
str(LOG_FILE),
maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024),
backupCount=0,
encoding='utf-8',
) )
if r.returncode != 0: fh.setLevel(logging.DEBUG)
return None fh.setFormatter(logging.Formatter(
return r.stdout.rstrip("\r\n") "%(asctime)s %(levelname)-5s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"))
root.addHandler(fh)
if not getattr(sys, "frozen", False):
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG if verbose else logging.INFO)
ch.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(message)s",
datefmt="%H:%M:%S"))
root.addHandler(ch)
# menubar icon # Menubar icon
def _make_menubar_icon(size: int = 44): def _make_menubar_icon(size: int = 44):
if Image is None: if Image is None:
return None return None
img = Image.new("RGBA", (size, size), (0, 0, 0, 0)) img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img) draw = ImageDraw.Draw(img)
margin = size // 11 margin = size // 11
draw.ellipse([margin, margin, size - margin, size - margin], fill=(0, 0, 0, 255)) draw.ellipse([margin, margin, size - margin, size - margin],
fill=(0, 0, 0, 255))
try: try:
font = ImageFont.truetype("/System/Library/Fonts/Helvetica.ttc", size=int(size * 0.55)) font = ImageFont.truetype(
"/System/Library/Fonts/Helvetica.ttc",
size=int(size * 0.55))
except Exception: except Exception:
font = ImageFont.load_default() font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font) bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
draw.text( tx = (size - tw) // 2 - bbox[0]
((size - tw) // 2 - bbox[0], (size - th) // 2 - bbox[1]), ty = (size - th) // 2 - bbox[1]
"T", fill=(255, 255, 255, 255), font=font, draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font)
)
return img return img
# Generate menubar icon PNG if it does not exist.
def _ensure_menubar_icon() -> None: def _ensure_menubar_icon():
if MENUBAR_ICON_PATH.exists(): if MENUBAR_ICON_PATH.exists():
return return
ensure_dirs() _ensure_dirs()
img = _make_menubar_icon(44) img = _make_menubar_icon(44)
if img: if img:
img.save(str(MENUBAR_ICON_PATH), "PNG") img.save(str(MENUBAR_ICON_PATH), "PNG")
# proxy lifecycle (macOS-local) # Native macOS dialogs
import asyncio as _asyncio def _osascript(script: str) -> str:
r = subprocess.run(
['osascript', '-e', script],
capture_output=True, text=True)
return r.stdout.strip()
def _run_proxy_thread() -> None: def _show_error(text: str, title: str = "TG WS Proxy"):
text_esc = text.replace('\\', '\\\\').replace('"', '\\"')
title_esc = title.replace('\\', '\\\\').replace('"', '\\"')
_osascript(
f'display dialog "{text_esc}" with title "{title_esc}" '
f'buttons {{"OK"}} default button "OK" with icon stop')
def _show_info(text: str, title: str = "TG WS Proxy"):
text_esc = text.replace('\\', '\\\\').replace('"', '\\"')
title_esc = title.replace('\\', '\\\\').replace('"', '\\"')
_osascript(
f'display dialog "{text_esc}" with title "{title_esc}" '
f'buttons {{"OK"}} default button "OK" with icon note')
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
text_esc = text.replace('\\', '\\\\').replace('"', '\\"')
title_esc = title.replace('\\', '\\\\').replace('"', '\\"')
result = _osascript(
f'display dialog "{text_esc}" with title "{title_esc}" '
f'buttons {{"Нет", "Да"}} default button "Да" with icon note')
return "Да" in result
# Proxy lifecycle
def _run_proxy_thread(port: int, dc_opt: Dict[int, str], verbose: bool,
host: str = '127.0.0.1'):
global _async_stop global _async_stop
loop = _asyncio.new_event_loop() loop = _asyncio.new_event_loop()
_asyncio.set_event_loop(loop) _asyncio.set_event_loop(loop)
stop_ev = _asyncio.Event() stop_ev = _asyncio.Event()
_async_stop = (loop, stop_ev) _async_stop = (loop, stop_ev)
try: try:
loop.run_until_complete(tg_ws_proxy._run(stop_event=stop_ev)) loop.run_until_complete(
tg_ws_proxy._run(port, dc_opt, stop_event=stop_ev, host=host))
except Exception as exc: except Exception as exc:
log.error("Proxy thread crashed: %s", exc) log.error("Proxy thread crashed: %s", exc)
if "Address already in use" in str(exc): if "Address already in use" in str(exc):
@ -161,28 +273,47 @@ def _run_proxy_thread() -> None:
"Не удалось запустить прокси:\n" "Не удалось запустить прокси:\n"
"Порт уже используется другим приложением.\n\n" "Порт уже используется другим приложением.\n\n"
"Закройте приложение, использующее этот порт, " "Закройте приложение, использующее этот порт, "
"или измените порт в настройках прокси и перезапустите." "или измените порт в настройках прокси и перезапустите.")
)
finally: finally:
loop.close() loop.close()
_async_stop = None _async_stop = None
def _start_proxy() -> None: def start_proxy():
global _proxy_thread global _proxy_thread, _config
if _proxy_thread and _proxy_thread.is_alive(): if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running") log.info("Proxy already running")
return return
if not apply_proxy_config(_config):
_show_error("Ошибка конфигурации DC → IP.") cfg = _config
port = cfg.get("port", DEFAULT_CONFIG["port"])
host = cfg.get("host", DEFAULT_CONFIG["host"])
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
verbose = cfg.get("verbose", False)
try:
dc_opt = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
_show_error(f"Ошибка конфигурации:\n{e}")
return return
pc = tg_ws_proxy.proxy_config
log.info("Starting proxy on %s:%d ...", pc.host, pc.port) log.info("Starting proxy on %s:%d ...", host, port)
_proxy_thread = threading.Thread(target=_run_proxy_thread, daemon=True, name="proxy")
buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])
pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])
tg_ws_proxy._RECV_BUF = max(4, buf_kb) * 1024
tg_ws_proxy._SEND_BUF = tg_ws_proxy._RECV_BUF
tg_ws_proxy._WS_POOL_SIZE = max(0, pool_size)
_proxy_thread = threading.Thread(
target=_run_proxy_thread,
args=(port, dc_opt, verbose, host),
daemon=True, name="proxy")
_proxy_thread.start() _proxy_thread.start()
def _stop_proxy() -> None: def stop_proxy():
global _proxy_thread, _async_stop global _proxy_thread, _async_stop
if _async_stop: if _async_stop:
loop, stop_ev = _async_stop loop, stop_ev = _async_stop
@ -193,21 +324,21 @@ def _stop_proxy() -> None:
log.info("Proxy stopped") log.info("Proxy stopped")
def _restart_proxy() -> None: def restart_proxy():
log.info("Restarting proxy...") log.info("Restarting proxy...")
_stop_proxy() stop_proxy()
time.sleep(0.3) time.sleep(0.3)
_start_proxy() start_proxy()
# menu callbacks # Menu callbacks
def _on_open_in_telegram(_=None):
def _on_open_in_telegram(_=None) -> None: port = _config.get("port", DEFAULT_CONFIG["port"])
url = tg_proxy_url(_config) url = f"tg://socks?server=127.0.0.1&port={port}"
log.info("Opening %s", url) log.info("Opening %s", url)
try: try:
result = subprocess.call(["open", url]) result = subprocess.call(['open', url])
if result != 0: if result != 0:
raise RuntimeError("open command failed") raise RuntimeError("open command failed")
except Exception: except Exception:
@ -221,109 +352,68 @@ def _on_open_in_telegram(_=None) -> None:
if pyperclip: if pyperclip:
pyperclip.copy(url) pyperclip.copy(url)
else: else:
subprocess.run(["pbcopy"], input=url.encode(), check=True) subprocess.run(['pbcopy'], input=url.encode(),
check=True)
_show_info( _show_info(
"Не удалось открыть Telegram автоматически.\n\n" "Не удалось открыть Telegram автоматически.\n\n"
f"Ссылка скопирована в буфер обмена:\n{url}" f"Ссылка скопирована в буфер обмена:\n{url}")
)
except Exception as exc: except Exception as exc:
log.error("Clipboard copy failed: %s", exc) log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}") _show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_copy_link(_=None) -> None: def _on_restart(_=None):
url = tg_proxy_url(_config) def _do_restart():
log.info("Copying link: %s", url)
try:
if pyperclip:
pyperclip.copy(url)
else:
subprocess.run(["pbcopy"], input=url.encode(), check=True)
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_restart(_=None) -> None:
def _do():
global _config global _config
_config = load_config() _config = load_config()
if _app: if _app:
_app.update_menu_title() _app.update_menu_title()
_restart_proxy() restart_proxy()
threading.Thread(target=_do, daemon=True).start() threading.Thread(target=_do_restart, daemon=True).start()
def _on_open_logs(_=None) -> None: def _on_open_logs(_=None):
log.info("Opening log file: %s", LOG_FILE) log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists(): if LOG_FILE.exists():
subprocess.call(["open", str(LOG_FILE)]) subprocess.call(['open', str(LOG_FILE)])
else: else:
_show_info("Файл логов ещё не создан.") _show_info("Файл логов ещё не создан.")
# Show a native text input dialog. Returns None if cancelled.
def _osascript_input(prompt: str, default: str,
title: str = "TG WS Proxy") -> Optional[str]:
prompt_esc = prompt.replace('\\', '\\\\').replace('"', '\\"')
default_esc = default.replace('\\', '\\\\').replace('"', '\\"')
title_esc = title.replace('\\', '\\\\').replace('"', '\\"')
r = subprocess.run(
['osascript', '-e',
f'text returned of (display dialog "{prompt_esc}" '
f'default answer "{default_esc}" '
f'with title "{title_esc}" '
f'buttons {{"Отмена", "OK"}} default button "OK")'],
capture_output=True, text=True)
if r.returncode != 0:
return None
return r.stdout.rstrip("\r\n")
def _on_edit_config(_=None) -> None:
def _on_edit_config(_=None):
threading.Thread(target=_edit_config_dialog, daemon=True).start() threading.Thread(target=_edit_config_dialog, daemon=True).start()
def _check_updates_menu_title() -> str: # Settings via native macOS dialogs
on = bool(_config.get("check_updates", True)) def _edit_config_dialog():
return "✓ Проверять обновления при запуске" if on else "Проверять обновления при запуске (выкл)"
def _toggle_check_updates(_=None) -> None:
global _config
_config["check_updates"] = not bool(_config.get("check_updates", True))
save_config(_config)
if _app is not None:
_app._check_updates_item.title = _check_updates_menu_title()
def _on_open_release_page(_=None) -> None:
from utils.update_check import RELEASES_PAGE_URL
webbrowser.open(RELEASES_PAGE_URL)
# update check
def _maybe_notify_update_async() -> None:
def _work():
time.sleep(1.5)
if _exiting:
return
if not _config.get("check_updates", True):
return
try:
from utils.update_check import RELEASES_PAGE_URL, get_status, run_check
run_check(__version__)
st = get_status()
if not st.get("has_update"):
return
url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ver = st.get("latest") or "?"
if _ask_yes_no(
f"Доступна новая версия: {ver}\n\nОткрыть страницу релиза в браузере?",
"TG WS Proxy — обновление",
):
webbrowser.open(url)
except Exception as exc:
log.debug("Update check failed: %s", exc)
threading.Thread(target=_work, daemon=True, name="update-check").start()
# settings dialog
def _edit_config_dialog() -> None:
cfg = load_config() cfg = load_config()
host = _osascript_input("IP-адрес прокси:", cfg.get("host", DEFAULT_CONFIG["host"])) # Host
host = _osascript_input(
"IP-адрес прокси:",
cfg.get("host", DEFAULT_CONFIG["host"]))
if host is None: if host is None:
return return
host = host.strip() host = host.strip()
import socket as _sock import socket as _sock
try: try:
_sock.inet_aton(host) _sock.inet_aton(host)
@ -331,7 +421,10 @@ def _edit_config_dialog() -> None:
_show_error("Некорректный IP-адрес.") _show_error("Некорректный IP-адрес.")
return return
port_str = _osascript_input("Порт прокси:", str(cfg.get("port", DEFAULT_CONFIG["port"]))) # Port
port_str = _osascript_input(
"Порт прокси:",
str(cfg.get("port", DEFAULT_CONFIG["port"])))
if port_str is None: if port_str is None:
return return
try: try:
@ -342,49 +435,38 @@ def _edit_config_dialog() -> None:
_show_error("Порт должен быть числом 1-65535") _show_error("Порт должен быть числом 1-65535")
return return
secret_str = _osascript_input( # DC-IP mappings
"MTProto Secret (32 hex символа):", cfg.get("secret", DEFAULT_CONFIG["secret"])
)
if secret_str is None:
return
secret_str = secret_str.strip().lower()
if len(secret_str) != 32 or not all(c in "0123456789abcdef" for c in secret_str):
_show_error("Secret должен быть строкой из 32 шестнадцатеричных символов.")
return
dc_default = ", ".join(cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])) dc_default = ", ".join(cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"]))
dc_str = _osascript_input( dc_str = _osascript_input(
"DC → IP маппинги (через запятую, формат DC:IP):\n" "DC → IP маппинги (через запятую, формат DC:IP):\n"
"Например: 2:149.154.167.220, 4:149.154.167.220", "Например: 2:149.154.167.220, 4:149.154.167.220",
dc_default, dc_default)
)
if dc_str is None: if dc_str is None:
return return
dc_lines = [s.strip() for s in dc_str.replace(",", "\n").splitlines() if s.strip()] dc_lines = [s.strip() for s in dc_str.replace(',', '\n').splitlines()
if s.strip()]
try: try:
tg_ws_proxy.parse_dc_ip_list(dc_lines) tg_ws_proxy.parse_dc_ip_list(dc_lines)
except ValueError as e: except ValueError as e:
_show_error(str(e)) _show_error(str(e))
return return
verbose = _ask_yes_no_close("Включить подробное логирование (verbose)?") # Verbose
if verbose is None: verbose = _ask_yes_no("Включить подробное логирование (verbose)?")
return
# Advanced settings
adv_str = _osascript_input( adv_str = _osascript_input(
"Расширенные настройки (буфер KB, WS пул, лог MB):\n" "Расширенные настройки (буфер KB, WS пул, лог MB):\n"
"Формат: buf_kb,pool_size,log_max_mb", "Формат: buf_kb,pool_size,log_max_mb",
f"{cfg.get('buf_kb', DEFAULT_CONFIG['buf_kb'])}," f"{cfg.get('buf_kb', DEFAULT_CONFIG['buf_kb'])},"
f"{cfg.get('pool_size', DEFAULT_CONFIG['pool_size'])}," f"{cfg.get('pool_size', DEFAULT_CONFIG['pool_size'])},"
f"{cfg.get('log_max_mb', DEFAULT_CONFIG['log_max_mb'])}", f"{cfg.get('log_max_mb', DEFAULT_CONFIG['log_max_mb'])}")
)
if adv_str is None:
return
adv = {} adv = {}
if adv_str: if adv_str:
parts = [s.strip() for s in adv_str.split(",")] parts = [s.strip() for s in adv_str.split(',')]
keys = [("buf_kb", int), ("pool_size", int), ("log_max_mb", float)] keys = [("buf_kb", int), ("pool_size", int),
("log_max_mb", float)]
for i, (k, typ) in enumerate(keys): for i, (k, typ) in enumerate(keys):
if i < len(parts): if i < len(parts):
try: try:
@ -395,13 +477,11 @@ def _edit_config_dialog() -> None:
new_cfg = { new_cfg = {
"host": host, "host": host,
"port": port, "port": port,
"secret": secret_str,
"dc_ip": dc_lines, "dc_ip": dc_lines,
"verbose": verbose, "verbose": verbose,
"buf_kb": adv.get("buf_kb", cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])), "buf_kb": adv.get("buf_kb", cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])),
"pool_size": adv.get("pool_size", cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])), "pool_size": adv.get("pool_size", cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])),
"log_max_mb": adv.get("log_max_mb", cfg.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"])), "log_max_mb": adv.get("log_max_mb", cfg.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"])),
"check_updates": cfg.get("check_updates", True),
} }
save_config(new_cfg) save_config(new_cfg)
log.info("Config saved: %s", new_cfg) log.info("Config saved: %s", new_cfg)
@ -411,23 +491,20 @@ def _edit_config_dialog() -> None:
if _app: if _app:
_app.update_menu_title() _app.update_menu_title()
if _ask_yes_no_close("Настройки сохранены.\n\nПерезапустить прокси сейчас?"): if _ask_yes_no("Настройки сохранены.\n\nПерезапустить прокси сейчас?"):
_restart_proxy() restart_proxy()
# first run & ipv6 # First-run & IPv6 dialogs
def _show_first_run():
def _show_first_run() -> None: _ensure_dirs()
ensure_dirs()
if FIRST_RUN_MARKER.exists(): if FIRST_RUN_MARKER.exists():
return return
host = _config.get("host", DEFAULT_CONFIG["host"]) host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"]) port = _config.get("port", DEFAULT_CONFIG["port"])
secret = _config.get("secret", DEFAULT_CONFIG["secret"]) tg_url = f"tg://socks?server={host}&port={port}"
tg_url = tg_proxy_url(_config)
link_host = tg_ws_proxy.get_link_host(host)
text = ( text = (
f"Прокси запущен и работает в строке меню.\n\n" f"Прокси запущен и работает в строке меню.\n\n"
@ -437,54 +514,54 @@ def _show_first_run() -> None:
f" Или ссылка: {tg_url}\n\n" f" Или ссылка: {tg_url}\n\n"
f"Вручную:\n" f"Вручную:\n"
f" Настройки → Продвинутые → Тип подключения → Прокси\n" f" Настройки → Продвинутые → Тип подключения → Прокси\n"
f" MTProto → {link_host} : {port} \n" f" SOCKS5 → {host} : {port} (без логина/пароля)\n\n"
f" Secret: dd{secret} \n\n"
f"Открыть прокси в Telegram сейчас?" f"Открыть прокси в Telegram сейчас?"
) )
FIRST_RUN_MARKER.touch() FIRST_RUN_MARKER.touch()
if _ask_yes_no(text, "TG WS Proxy"): if _ask_yes_no(text, "TG WS Proxy"):
_on_open_in_telegram() _on_open_in_telegram()
def _check_ipv6_warning() -> None: def _has_ipv6_enabled() -> bool:
ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
import socket as _sock import socket as _sock
has = False
try: try:
for addr in _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6): addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6)
for addr in addrs:
ip = addr[4][0] ip = addr[4][0]
if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"): if ip and not ip.startswith('::1') and not ip.startswith('fe80::1'):
has = True return True
break
except Exception: except Exception:
pass pass
if not has:
try: try:
s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM) s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM)
s.bind(("::1", 0)) s.bind(('::1', 0))
s.close() s.close()
has = True return True
except Exception: except Exception:
pass return False
if not has:
def _check_ipv6_warning():
_ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
if not _has_ipv6_enabled():
return return
IPV6_WARN_MARKER.touch() IPV6_WARN_MARKER.touch()
_show_info( _show_info(
"На вашем компьютере включена поддержка подключения по IPv6.\n\n" "На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, " "Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n" "что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает, попробуйте отключить " "Если прокси не работает, попробуйте отключить "
"попытку соединения по IPv6 в настройках прокси Telegram.\n\n" "попытку соединения по IPv6 в настройках прокси Telegram.\n\n"
"Это предупреждение будет показано только один раз." "Это предупреждение будет показано только один раз.")
)
# rumps app # rumps menubar app
_TgWsProxyAppBase = rumps.App if rumps else object _TgWsProxyAppBase = rumps.App if rumps else object
@ -492,26 +569,24 @@ _TgWsProxyAppBase = rumps.App if rumps else object
class TgWsProxyApp(_TgWsProxyAppBase): class TgWsProxyApp(_TgWsProxyAppBase):
def __init__(self): def __init__(self):
_ensure_menubar_icon() _ensure_menubar_icon()
icon_path = str(MENUBAR_ICON_PATH) if MENUBAR_ICON_PATH.exists() else None icon_path = (str(MENUBAR_ICON_PATH)
if MENUBAR_ICON_PATH.exists() else None)
host = _config.get("host", DEFAULT_CONFIG["host"]) host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"]) port = _config.get("port", DEFAULT_CONFIG["port"])
link_host = tg_ws_proxy.get_link_host(host)
self._open_tg_item = rumps.MenuItem( self._open_tg_item = rumps.MenuItem(
f"Открыть в Telegram ({link_host}:{port})", callback=_on_open_in_telegram f"Открыть в Telegram ({host}:{port})",
) callback=_on_open_in_telegram)
self._copy_link_item = rumps.MenuItem("Скопировать ссылку", callback=_on_copy_link) self._restart_item = rumps.MenuItem(
self._restart_item = rumps.MenuItem("Перезапустить прокси", callback=_on_restart) "Перезапустить прокси",
self._settings_item = rumps.MenuItem("Настройки...", callback=_on_edit_config) callback=_on_restart)
self._logs_item = rumps.MenuItem("Открыть логи", callback=_on_open_logs) self._settings_item = rumps.MenuItem(
self._release_page_item = rumps.MenuItem( "Настройки...",
"Страница релиза на GitHub…", callback=_on_open_release_page callback=_on_edit_config)
) self._logs_item = rumps.MenuItem(
self._check_updates_item = rumps.MenuItem( "Открыть логи",
_check_updates_menu_title(), callback=_toggle_check_updates callback=_on_open_logs)
)
self._version_item = rumps.MenuItem(f"Версия {__version__}", callback=lambda _: None)
super().__init__( super().__init__(
"TG WS Proxy", "TG WS Proxy",
@ -520,30 +595,20 @@ class TgWsProxyApp(_TgWsProxyAppBase):
quit_button="Выход", quit_button="Выход",
menu=[ menu=[
self._open_tg_item, self._open_tg_item,
self._copy_link_item,
None, None,
self._restart_item, self._restart_item,
self._settings_item, self._settings_item,
self._logs_item, self._logs_item,
None, ])
self._release_page_item,
self._check_updates_item,
None,
self._version_item,
],
)
def update_menu_title(self) -> None: def update_menu_title(self):
host = _config.get("host", DEFAULT_CONFIG["host"]) host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"]) port = _config.get("port", DEFAULT_CONFIG["port"])
link_host = tg_ws_proxy.get_link_host(host) self._open_tg_item.title = (
self._open_tg_item.title = f"Открыть в Telegram ({link_host}:{port})" f"Открыть в Telegram ({host}:{port})")
# entry point def run_menubar():
def run_menubar() -> None:
global _app, _config global _app, _config
_config = load_config() _config = load_config()
@ -555,26 +620,23 @@ def run_menubar() -> None:
except Exception: except Exception:
pass pass
setup_logging( setup_logging(_config.get("verbose", False),
_config.get("verbose", False), log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]))
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]), log.info("TG WS Proxy menubar app starting")
)
log.info("TG WS Proxy версия %s, menubar app starting", __version__)
log.info("Config: %s", _config) log.info("Config: %s", _config)
log.info("Log file: %s", LOG_FILE) log.info("Log file: %s", LOG_FILE)
if rumps is None or Image is None: if rumps is None or Image is None:
log.error("rumps or Pillow not installed; running in console mode") log.error("rumps or Pillow not installed; running in console mode")
_start_proxy() start_proxy()
try: try:
while True: while True:
time.sleep(1) time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
_stop_proxy() stop_proxy()
return return
_start_proxy() start_proxy()
_maybe_notify_update_async()
_show_first_run() _show_first_run()
_check_ipv6_warning() _check_ipv6_warning()
@ -582,18 +644,19 @@ def run_menubar() -> None:
log.info("Menubar app running") log.info("Menubar app running")
_app.run() _app.run()
_stop_proxy() stop_proxy()
log.info("Menubar app exited") log.info("Menubar app exited")
def main() -> None: def main():
if not acquire_lock("macos.py"): if not _acquire_lock():
_show_info("Приложение уже запущено.") _show_info("Приложение уже запущено.")
return return
try: try:
run_menubar() run_menubar()
finally: finally:
release_lock() _release_lock()
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1 +1 @@
__version__ = "1.4.0" __version__ = "1.1.3"

File diff suppressed because it is too large Load Diff

View File

@ -18,37 +18,59 @@ authors = [
keywords = [ keywords = [
"telegram", "telegram",
"tdesktop",
"proxy", "proxy",
"bypass", "websocket"
"websocket",
"mtproto",
] ]
classifiers = [ classifiers = [
"Development Status :: 5 - Production/Stable", "Development Status :: 5 - Production/Stable",
"Environment :: Console", "Environment :: Console",
"Environment :: MacOS X :: Cocoa",
"Environment :: Win32 (MS Windows)",
"Environment :: X11 Applications :: GTK",
"Intended Audience :: Customer Service", "Intended Audience :: Customer Service",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License", "License :: OSI Approved :: MIT License",
"Operating System :: OS Independent", "Operating System :: MacOS :: MacOS X",
"Operating System :: Microsoft :: Windows",
"Operating System :: POSIX :: Linux",
"Topic :: System :: Networking :: Firewalls", "Topic :: System :: Networking :: Firewalls",
] ]
dependencies = [ dependencies = [
"pyperclip==1.9.0",
"psutil==5.9.8; platform_system == 'Windows' and python_version < '3.9'",
"cryptography==41.0.7; platform_system == 'Windows' and python_version < '3.9'", "cryptography==41.0.7; platform_system == 'Windows' and python_version < '3.9'",
"Pillow==10.4.0; platform_system == 'Windows' and python_version < '3.9'",
"psutil==7.0.0; platform_system != 'Windows' or python_version >= '3.9'",
"cryptography==46.0.5; platform_system != 'Windows' or python_version >= '3.9'", "cryptography==46.0.5; platform_system != 'Windows' or python_version >= '3.9'",
"Pillow==12.1.1; (platform_system != 'Windows' or python_version >= '3.9') and platform_system != 'Darwin'", ]
"customtkinter==5.2.2; platform_system != 'Darwin'", [project.optional-dependencies]
"pystray==0.19.5; platform_system != 'Darwin'", win7 = [
"rumps==0.4.0; platform_system == 'Darwin'", "customtkinter==5.2.2",
"Pillow==12.1.0; platform_system == 'Darwin'", "Pillow==10.4.0",
"psutil==5.9.8",
"pystray==0.19.5",
"pyperclip==1.9.0",
]
win10 = [
"customtkinter==5.2.2",
"Pillow==12.1.1",
"psutil==7.0.0",
"pystray==0.19.5",
"pyperclip==1.9.0",
]
macos = [
"Pillow==12.1.0",
"psutil==7.0.0",
"pyperclip==1.9.0",
"rumps==0.4.0",
]
linux = [
"customtkinter==5.2.2",
"Pillow==12.1.1",
"psutil==7.0.0",
"pystray==0.19.5",
"pyperclip==1.9.0",
] ]
[project.scripts] [project.scripts]
@ -62,7 +84,7 @@ Source = "https://github.com/Flowseal/tg-ws-proxy"
Issues = "https://github.com/Flowseal/tg-ws-proxy/issues" Issues = "https://github.com/Flowseal/tg-ws-proxy/issues"
[tool.hatch.build.targets.wheel] [tool.hatch.build.targets.wheel]
packages = ["proxy", "ui", "utils"] packages = ["proxy"]
[tool.hatch.build.force-include] [tool.hatch.build.force-include]
"windows.py" = "windows.py" "windows.py" = "windows.py"

246
stress_test.py Normal file
View File

@ -0,0 +1,246 @@
"""
Stress-test: сравнение OLD vs NEW реализаций горячих функций прокси.
Тестируются:
1. _build_frame сборка WS-фрейма (masked binary)
2. _build_frame сборка WS-фрейма (unmasked)
3. _socks5_reply генерация SOCKS5-ответа
4. _dc_from_init XOR-часть (bytes(a^b for ) vs int.from_bytes)
5. mask key generation (os.urandom vs PRNG)
"""
import gc
import os
import random
import struct
import time
# ── Размеры данных, типичные для Telegram ──────────────────────────
SMALL = 64 # init-пакет / ack
MEDIUM = 1024 # текстовое сообщение
LARGE = 65536 # фото / голосовое
# ═══════════════════════════════════════════════════════════════════
# XOR mask (не менялся — для полноты)
# ═══════════════════════════════════════════════════════════════════
def xor_mask(data: bytes, mask: bytes) -> bytes:
if not data:
return data
n = len(data)
mask_rep = (mask * (n // 4 + 1))[:n]
return (int.from_bytes(data, 'big') ^ int.from_bytes(mask_rep, 'big')).to_bytes(n, 'big')
# ═══════════════════════════════════════════════════════════════════
# _build_frame
# ═══════════════════════════════════════════════════════════════════
def build_frame_old(opcode: int, data: bytes, mask: bool = False) -> bytes:
"""Старая: bytearray + append/extend + os.urandom."""
header = bytearray()
header.append(0x80 | opcode)
length = len(data)
mask_bit = 0x80 if mask else 0x00
if length < 126:
header.append(mask_bit | length)
elif length < 65536:
header.append(mask_bit | 126)
header.extend(struct.pack('>H', length))
else:
header.append(mask_bit | 127)
header.extend(struct.pack('>Q', length))
if mask:
mask_key = os.urandom(4)
header.extend(mask_key)
return bytes(header) + xor_mask(data, mask_key)
return bytes(header) + data
# ── Новая: pre-compiled struct + PRNG ──────────────────────────────
_st_BB = struct.Struct('>BB')
_st_BBH = struct.Struct('>BBH')
_st_BBQ = struct.Struct('>BBQ')
_st_BB4s = struct.Struct('>BB4s')
_st_BBH4s = struct.Struct('>BBH4s')
_st_BBQ4s = struct.Struct('>BBQ4s')
_mask_rng = random.Random(int.from_bytes(os.urandom(16), 'big'))
_mask_pack = struct.Struct('>I').pack
def _random_mask_key() -> bytes:
return _mask_pack(_mask_rng.getrandbits(32))
def build_frame_new(opcode: int, data: bytes, mask: bool = False) -> bytes:
"""Новая: struct.pack + PRNG mask."""
length = len(data)
fb = 0x80 | opcode
if not mask:
if length < 126:
return _st_BB.pack(fb, length) + data
if length < 65536:
return _st_BBH.pack(fb, 126, length) + data
return _st_BBQ.pack(fb, 127, length) + data
mask_key = _random_mask_key()
masked = xor_mask(data, mask_key)
if length < 126:
return _st_BB4s.pack(fb, 0x80 | length, mask_key) + masked
if length < 65536:
return _st_BBH4s.pack(fb, 0x80 | 126, length, mask_key) + masked
return _st_BBQ4s.pack(fb, 0x80 | 127, length, mask_key) + masked
# ═══════════════════════════════════════════════════════════════════
# _socks5_reply
# ═══════════════════════════════════════════════════════════════════
def socks5_reply_old(status):
return bytes([0x05, status, 0x00, 0x01]) + b'\x00' * 6
_SOCKS5_REPLIES = {s: bytes([0x05, s, 0x00, 0x01, 0, 0, 0, 0, 0, 0])
for s in (0x00, 0x05, 0x07, 0x08)}
def socks5_reply_new(status):
return _SOCKS5_REPLIES[status]
# ═══════════════════════════════════════════════════════════════════
# dc_from_init XOR (8 байт keystream ^ data)
# ═══════════════════════════════════════════════════════════════════
def dc_xor_old(data8: bytes, ks8: bytes) -> bytes:
"""Старая: генераторное выражение."""
return bytes(a ^ b for a, b in zip(data8, ks8))
def dc_xor_new(data8: bytes, ks8: bytes) -> bytes:
"""Новая: int.from_bytes."""
return (int.from_bytes(data8, 'big') ^ int.from_bytes(ks8, 'big')).to_bytes(8, 'big')
# ═══════════════════════════════════════════════════════════════════
# mask key: os.urandom(4) vs PRNG
# ═══════════════════════════════════════════════════════════════════
def mask_key_old() -> bytes:
return os.urandom(4)
def mask_key_new() -> bytes:
return _random_mask_key()
# ═══════════════════════════════════════════════════════════════════
# Бенчмарк
# ═══════════════════════════════════════════════════════════════════
def bench(func, args_list: list, iters: int) -> float:
gc.collect()
for i in range(min(100, iters)):
func(*args_list[i % len(args_list)])
start = time.perf_counter()
for i in range(iters):
func(*args_list[i % len(args_list)])
elapsed = time.perf_counter() - start
return elapsed / iters * 1_000_000 # мкс
def compare(name: str, old_fn, new_fn, args_list: list, iters: int):
t_old = bench(old_fn, args_list, iters)
t_new = bench(new_fn, args_list, iters)
speedup = t_old / t_new if t_new > 0 else float('inf')
marker = '' if speedup >= 1.0 else '⚠️'
print(f" {name:.<42s} OLD {t_old:8.3f} мкс | NEW {t_new:8.3f} мкс | {speedup:5.2f}x {marker}")
# ═══════════════════════════════════════════════════════════════════
def main():
print("=" * 74)
print(" Stress Test: OLD vs NEW (горячие функции tg_ws_proxy)")
print("=" * 74)
N = 500_000
# # ── 1. _build_frame masked ────────────────────────────────────
# print(f"\n── _build_frame masked ({N:,} итераций) ──")
# for size, label in [(SMALL, "64B"), (MEDIUM, "1KB"), (LARGE, "64KB")]:
# data_list = [(0x2, os.urandom(size), True) for _ in range(1000)]
# compare(f"build_frame masked {label}",
# build_frame_old, build_frame_new, data_list, N)
# # ── 2. _build_frame unmasked ──────────────────────────────────
# print(f"\n── _build_frame unmasked ({N:,} итераций) ──")
# for size, label in [(SMALL, "64B"), (MEDIUM, "1KB"), (LARGE, "64KB")]:
# data_list = [(0x2, os.urandom(size), False) for _ in range(1000)]
# compare(f"build_frame unmasked {label}",
# build_frame_old, build_frame_new, data_list, N)
# # ── 3. mask key generation ────────────────────────────────────
# print(f"\n── mask key: os.urandom(4) vs PRNG ({N:,} итераций) ──")
# compare("mask_key", mask_key_old, mask_key_new, [()] * 100, N)
# # ── 4. _socks5_reply ─────────────────────────────────────────
N2 = 2_000_000
# print(f"\n── _socks5_reply ({N2:,} итераций) ──")
# compare("socks5_reply", socks5_reply_old, socks5_reply_new,
# [(s,) for s in (0x00, 0x05, 0x07, 0x08)], N2)
# # ── 5. dc_from_init XOR (8 bytes) ────────────────────────────
# print(f"\n── dc_xor 8B: generator vs int.from_bytes ({N2:,} итераций) ──")
# compare("dc_xor_8B", dc_xor_old, dc_xor_new,
# [(os.urandom(8), os.urandom(8)) for _ in range(1000)], N2)
# ── 6. _read_frame struct.unpack vs pre-compiled ─────────────
print(f"\n── struct unpack read-path ({N2:,} итераций) ──")
_st_H_pre = struct.Struct('>H')
_st_Q_pre = struct.Struct('>Q')
h_bufs = [(os.urandom(2),) for _ in range(1000)]
q_bufs = [(os.urandom(8),) for _ in range(1000)]
compare("unpack >H",
lambda b: struct.unpack('>H', b),
lambda b: _st_H_pre.unpack(b),
h_bufs, N2)
compare("unpack >Q",
lambda b: struct.unpack('>Q', b),
lambda b: _st_Q_pre.unpack(b),
q_bufs, N2)
# ── 7. dc_from_init: 2x unpack vs 1x merged ─────────────────
print(f"\n── dc_from_init unpack: 2 calls vs 1 merged ({N2:,} итераций) ──")
_st_Ih = struct.Struct('<Ih')
plains = [(os.urandom(8),) for _ in range(1000)]
def dc_unpack_old(p):
return struct.unpack('<I', p[0:4])[0], struct.unpack('<h', p[4:6])[0]
def dc_unpack_new(p):
return _st_Ih.unpack(p[:6])
compare("dc_unpack", dc_unpack_old, dc_unpack_new, plains, N2)
# ── 8. bytes() copy vs direct slice ──────────────────────────
print(f"\n── bytes(slice) vs direct slice ({N2:,} итераций) ──")
raw_data = [(os.urandom(64),) for _ in range(1000)]
def slice_copy(d):
return bytes(d[8:40]), bytes(d[40:56])
def slice_direct(d):
return d[8:40], d[40:56]
compare("bytes(slice) vs slice", slice_copy, slice_direct, raw_data, N2)
# ── 9. MsgSplitter unpack_from: struct vs pre-compiled ───────
print(f"\n── unpack_from <I: struct vs pre-compiled ({N2:,} итераций) ──")
_st_I_le = struct.Struct('<I')
splitter_bufs = [(os.urandom(64), 1) for _ in range(1000)]
compare("unpack_from <I",
lambda b, p: struct.unpack_from('<I', b, p),
lambda b, p: _st_I_le.unpack_from(b, p),
splitter_bufs, N2)
print("\n" + "=" * 74)
print(" Готово!")
print("=" * 74)
if __name__ == "__main__":
main()

View File

@ -1,4 +0,0 @@
"""
Интерфейс tray (CustomTkinter): тема, диалоги настроек, подсказки.
Ядро прокси пакет `proxy`.
"""

View File

@ -1,108 +0,0 @@
from __future__ import annotations
import sys
import tkinter
from dataclasses import dataclass
from typing import Any, Callable, Optional, Tuple
_tk_variable_del_guard_installed = False
def install_tkinter_variable_del_guard() -> None:
global _tk_variable_del_guard_installed
if _tk_variable_del_guard_installed:
return
_orig = tkinter.Variable.__del__
def _safe_variable_del(self: Any, _orig: Any = _orig) -> None:
try:
_orig(self)
except (RuntimeError, tkinter.TclError):
pass
tkinter.Variable.__del__ = _safe_variable_del # type: ignore[assignment]
_tk_variable_del_guard_installed = True
CONFIG_DIALOG_SIZE: Tuple[int, int] = (460, 560)
CONFIG_DIALOG_FRAME_PAD: Tuple[int, int] = (20, 14)
FIRST_RUN_SIZE: Tuple[int, int] = (520, 480)
FIRST_RUN_FRAME_PAD: Tuple[int, int] = (28, 24)
@dataclass(frozen=True)
class CtkTheme:
tg_blue: tuple = ("#3390ec", "#3390ec")
tg_blue_hover: tuple = ("#2b7cd4", "#2b7cd4")
bg: tuple = ("#ffffff", "#1e1e1e")
field_bg: tuple = ("#f0f2f5", "#2b2b2b")
field_border: tuple = ("#d6d9dc", "#3a3a3a")
text_primary: tuple = ("#000000", "#ffffff")
text_secondary: tuple = ("#707579", "#aaaaaa")
ui_font_family: str = "Sans"
mono_font_family: str = "Monospace"
def ctk_theme_for_platform() -> CtkTheme:
if sys.platform == "win32":
return CtkTheme(ui_font_family="Segoe UI", mono_font_family="Consolas")
return CtkTheme()
def apply_ctk_appearance(ctk: Any) -> None:
ctk.set_appearance_mode("auto")
ctk.set_default_color_theme("blue")
def center_ctk_geometry(root: Any, width: int, height: int) -> None:
sw = root.winfo_screenwidth()
sh = root.winfo_screenheight()
root.geometry(f"{width}x{height}+{(sw - width) // 2}+{(sh - height) // 2}")
def create_ctk_toplevel(
ctk: Any,
*,
title: str,
width: int,
height: int,
theme: CtkTheme,
topmost: bool = True,
after_create: Optional[Callable[[Any], None]] = None,
) -> Any:
root = ctk.CTkToplevel()
root.title(title)
root.resizable(False, False)
center_ctk_geometry(root, width, height)
root.configure(fg_color=theme.bg)
if topmost:
root.attributes("-topmost", True)
root.lift()
root.focus_force()
if after_create:
_after_id = root.after(300, lambda: after_create(root))
_orig_destroy = root.destroy
def _safe_destroy():
try:
root.after_cancel(_after_id)
except Exception:
pass
_orig_destroy()
root.destroy = _safe_destroy
return root
def main_content_frame(
ctk: Any,
root: Any,
theme: CtkTheme,
*,
padx: int,
pady: int,
) -> Any:
frame = ctk.CTkFrame(root, fg_color=theme.bg, corner_radius=0)
frame.pack(fill="both", expand=True, padx=padx, pady=pady)
return frame

View File

@ -1,109 +0,0 @@
from __future__ import annotations
import tkinter as tk
from typing import Any, List, Optional
class CtkTooltip:
def __init__(
self,
widget: Any,
text: str,
*,
delay_ms: int = 450,
wraplength: int = 320,
) -> None:
self.widget = widget
self.text = text
self.delay_ms = delay_ms
self.wraplength = wraplength
self._after_id: Optional[str] = None
self._tip: Optional[tk.Toplevel] = None
widget.bind("<Enter>", self._schedule, add="+")
widget.bind("<Leave>", self._hide, add="+")
widget.bind("<Button>", self._hide, add="+")
widget.bind("<Destroy>", self._on_destroy, add="+")
def _schedule(self, _event: Any = None) -> None:
if self.widget is None:
return
self._cancel_after()
self._after_id = self.widget.after(self.delay_ms, self._show)
def _cancel_after(self) -> None:
if self._after_id is not None:
try:
self.widget.after_cancel(self._after_id)
except Exception:
pass
self._after_id = None
def _show(self) -> None:
self._after_id = None
if self._tip is not None:
return
try:
if not self.widget.winfo_exists():
return
except Exception:
return
tw = tk.Toplevel(self.widget.winfo_toplevel())
tw.wm_overrideredirect(True)
try:
tw.wm_attributes("-topmost", True)
except Exception:
pass
tw.configure(bg="#2b2b2b")
lbl = tk.Label(
tw,
text=self.text,
justify="left",
wraplength=self.wraplength,
background="#2b2b2b",
foreground="#f0f0f0",
relief="flat",
borderwidth=0,
padx=10,
pady=8,
font=("Segoe UI", 10) if _is_windows() else None,
)
lbl.pack()
x = self.widget.winfo_rootx() + 12
y = self.widget.winfo_rooty() + self.widget.winfo_height() + 4
tw.wm_geometry(f"+{x}+{y}")
self._tip = tw
def _hide(self, _event: Any = None) -> None:
self._cancel_after()
if self._tip is not None:
try:
self._tip.destroy()
except Exception:
pass
self._tip = None
def _on_destroy(self, _event: Any = None) -> None:
self._hide()
self.widget = None
def _is_windows() -> bool:
import sys
return sys.platform == "win32"
def attach_ctk_tooltip(
widget: Any,
text: str,
*,
delay_ms: int = 450,
wraplength: int = 320,
) -> None:
CtkTooltip(widget, text, delay_ms=delay_ms, wraplength=wraplength)
def attach_tooltip_to_widgets(widgets: List[Any], text: str, **kwargs: Any) -> None:
for w in widgets:
attach_ctk_tooltip(w, text, **kwargs)

View File

@ -1,516 +0,0 @@
from __future__ import annotations
import os
import webbrowser
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import proxy.tg_ws_proxy as tg_ws_proxy
from proxy import __version__
from utils.update_check import RELEASES_PAGE_URL, get_status
from ui.ctk_theme import (
FIRST_RUN_FRAME_PAD,
CtkTheme,
main_content_frame,
)
from ui.ctk_tooltip import attach_ctk_tooltip, attach_tooltip_to_widgets
_TIP_HOST = (
"Адрес, на котором прокси принимает подключения.\n"
"Обычно 127.0.0.1 — локальная сеть, 0.0.0.0 - все интерфейсы"
)
_TIP_PORT = (
"Порт прокси. В Telegram Desktop в настройках прокси должен быть "
"указан тот же порт"
)
_TIP_SECRET = "Секретный ключ для авторизации клиентов"
_TIP_DC = (
"Соответствие номера датацентра Telegram (DC) и IP-адреса сервера.\n"
"Каждая строка: «номер:IP», например 2:149.154.167.220. "
"Прокси по этим правилам направляет трафик к нужным серверам Telegram"
)
_TIP_VERBOSE = (
"Если включено, в файл логов пишется больше подробностей — "
"необходимо при поиске неполадок"
)
_TIP_BUF_KB = (
"Размер буфера приёма/передачи в килобайтах.\n"
"Больше значение — больше выделение памяти на сокет"
)
_TIP_POOL = (
"Сколько параллельных WebSocket-сессий к одному датацентру можно держать.\n"
"Увеличение может помочь при высокой нагрузке"
)
_TIP_LOG_MB = (
"Максимальный размер файла лога; при достижении лимита файл перезаписывается"
)
_TIP_AUTOSTART = (
"Запускать TG WS Proxy при входе в Windows. "
"Если вы переместите программу в другую папку, автозапуск сбросится"
)
_TIP_CHECK_UPDATES = "При запуске проверять наличие обновлений"
_TIP_SAVE = "Сохранить настройки"
_TIP_CANCEL = "Закрыть окно без сохранения изменений"
_INNER_W = 396
def _entry(ctk, parent, theme, *, var=None, width=0, height=36, radius=10, **kw):
opts = dict(
font=(theme.ui_font_family, 13), corner_radius=radius,
fg_color=theme.bg, border_color=theme.field_border,
border_width=1, text_color=theme.text_primary,
)
if var is not None:
opts["textvariable"] = var
if width:
opts["width"] = width
opts["height"] = height
opts.update(kw)
return ctk.CTkEntry(parent, **opts)
def _checkbox(ctk, parent, theme, text, variable):
return ctk.CTkCheckBox(
parent, text=text, variable=variable,
font=(theme.ui_font_family, 13), text_color=theme.text_primary,
fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover,
corner_radius=6, border_width=2, border_color=theme.field_border,
)
def _label(ctk, parent, theme, text, *, size=12, bold=False, secondary=True, **kw):
weight = "bold" if bold else "normal"
return ctk.CTkLabel(
parent, text=text,
font=(theme.ui_font_family, size, weight),
text_color=theme.text_secondary if secondary else theme.text_primary,
anchor="w", **kw,
)
def _labeled_entry(ctk, parent, theme, label_text, value, *, tip="", width=0, pack_fill=False):
col = ctk.CTkFrame(parent, fg_color="transparent")
lbl = _label(ctk, col, theme, label_text)
lbl.pack(anchor="w", pady=(0, 2))
var = ctk.StringVar(value=str(value))
ent = _entry(ctk, col, theme, var=var, width=width)
if pack_fill:
ent.pack(fill="x")
else:
ent.pack(anchor="w")
if tip:
attach_tooltip_to_widgets([lbl, ent, col], tip)
return col, var
def tray_settings_scroll_and_footer(
ctk: Any,
content_parent: Any,
theme: CtkTheme,
) -> Tuple[Any, Any]:
footer = ctk.CTkFrame(content_parent, fg_color=theme.bg)
footer.pack(side="bottom", fill="x")
scroll = ctk.CTkScrollableFrame(
content_parent,
fg_color=theme.bg,
corner_radius=0,
scrollbar_button_color=theme.field_border,
scrollbar_button_hover_color=theme.text_secondary,
)
scroll.pack(fill="both", expand=True)
return scroll, footer
def _config_section(
ctk: Any,
parent: Any,
theme: CtkTheme,
title: str,
*,
bottom_spacer: int = 6,
) -> Any:
wrap = ctk.CTkFrame(parent, fg_color="transparent")
wrap.pack(fill="x", pady=(0, bottom_spacer))
_label(ctk, wrap, theme, title, secondary=False, bold=True).pack(anchor="w", pady=(0, 2))
card = ctk.CTkFrame(
wrap, fg_color=theme.field_bg, corner_radius=10,
border_width=1, border_color=theme.field_border,
)
card.pack(fill="x")
inner = ctk.CTkFrame(card, fg_color="transparent")
inner.pack(fill="x", padx=10, pady=8)
return inner
@dataclass
class TrayConfigFormWidgets:
host_var: Any
port_var: Any
secret_var: Any
dc_textbox: Any
verbose_var: Any
adv_entries: List[Any]
adv_keys: Tuple[str, ...]
autostart_var: Optional[Any]
check_updates_var: Optional[Any]
def install_tray_config_form(
ctk: Any,
frame: Any,
theme: CtkTheme,
cfg: dict,
default_config: dict,
*,
show_autostart: bool = False,
autostart_value: bool = False,
) -> TrayConfigFormWidgets:
header = ctk.CTkFrame(frame, fg_color="transparent")
header.pack(fill="x", pady=(0, 2))
ctk.CTkLabel(
header, text="Настройки прокси",
font=(theme.ui_font_family, 17, "bold"),
text_color=theme.text_primary, anchor="w",
).pack(side="left")
ctk.CTkLabel(
header, text=f"v{__version__}",
font=(theme.ui_font_family, 12),
text_color=theme.text_secondary, anchor="e",
).pack(side="right")
conn = _config_section(ctk, frame, theme, "Подключение MTProto")
host_row = ctk.CTkFrame(conn, fg_color="transparent")
host_row.pack(fill="x")
host_col, host_var = _labeled_entry(
ctk, host_row, theme, "IP-адрес",
cfg.get("host", default_config["host"]),
tip=_TIP_HOST, width=160, pack_fill=True,
)
host_col.pack(side="left", fill="x", expand=True, padx=(0, 10))
port_col, port_var = _labeled_entry(
ctk, host_row, theme, "Порт",
cfg.get("port", default_config["port"]),
tip=_TIP_PORT, width=100,
)
port_col.pack(side="left")
secret_row = ctk.CTkFrame(conn, fg_color="transparent")
secret_row.pack(fill="x")
secret_col, secret_var = _labeled_entry(
ctk, secret_row, theme, "Secret",
cfg.get("secret", default_config["secret"]),
tip=_TIP_SECRET, width=160, pack_fill=True,
)
secret_col.pack(side="left", fill="x", expand=True, padx=(0, 10))
regen_col = ctk.CTkFrame(secret_row, fg_color="transparent")
regen_col.pack(side="left", anchor="s")
ctk.CTkLabel(regen_col, text="", font=(theme.ui_font_family, 12)).pack(pady=(0, 2))
ctk.CTkButton(
regen_col, text="", width=36, height=36,
font=(theme.ui_font_family, 18), corner_radius=10,
fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover,
text_color="#ffffff", border_width=1, border_color=theme.field_border,
command=lambda: secret_var.set(os.urandom(16).hex()),
).pack()
dc_inner = _config_section(ctk, frame, theme, "Датацентры Telegram (DC → IP)")
dc_lbl = _label(ctk, dc_inner, theme, "По одному правилу на строку, формат: номер:IP", size=11)
dc_lbl.pack(anchor="w", pady=(0, 4))
dc_textbox = ctk.CTkTextbox(
dc_inner, width=_INNER_W, height=88,
font=(theme.mono_font_family, 12), corner_radius=10,
fg_color=theme.bg, border_color=theme.field_border,
border_width=1, text_color=theme.text_primary,
)
dc_textbox.pack(fill="x")
dc_textbox.insert("1.0", "\n".join(cfg.get("dc_ip", default_config["dc_ip"])))
attach_tooltip_to_widgets([dc_lbl, dc_textbox], _TIP_DC)
log_inner = _config_section(ctk, frame, theme, "Логи и производительность")
verbose_var = ctk.BooleanVar(value=cfg.get("verbose", False))
verbose_cb = _checkbox(ctk, log_inner, theme, "Подробное логирование (verbose)", verbose_var)
verbose_cb.pack(anchor="w", pady=(0, 6))
attach_ctk_tooltip(verbose_cb, _TIP_VERBOSE)
adv_frame = ctk.CTkFrame(log_inner, fg_color="transparent")
adv_frame.pack(fill="x")
adv_rows = [
("Буфер, КБ (по умолчанию 256)", "buf_kb", _TIP_BUF_KB),
("Пул WebSocket-сессий (по умолчанию 4)", "pool_size", _TIP_POOL),
("Макс. размер лога, МБ (по умолчанию 5)", "log_max_mb", _TIP_LOG_MB),
]
for label_text, key, tip in adv_rows:
col = ctk.CTkFrame(adv_frame, fg_color="transparent")
col.pack(fill="x", pady=(0, 0 if key == "log_max_mb" else 5))
adv_l = _label(ctk, col, theme, label_text, size=11)
adv_l.pack(anchor="w", pady=(0, 2))
adv_e = _entry(
ctk, col, theme, width=_INNER_W, height=32, radius=8,
textvariable=ctk.StringVar(value=str(cfg.get(key, default_config[key]))),
)
adv_e.pack(fill="x")
attach_tooltip_to_widgets([adv_l, adv_e, col], tip)
adv_entries = list(adv_frame.winfo_children())
adv_keys = ("buf_kb", "pool_size", "log_max_mb")
upd_inner = _config_section(ctk, frame, theme, "Обновления")
st = get_status()
check_updates_var = ctk.BooleanVar(
value=bool(cfg.get("check_updates", default_config.get("check_updates", True)))
)
upd_cb = _checkbox(ctk, upd_inner, theme, "Проверять обновления при запуске", check_updates_var)
upd_cb.pack(anchor="w", pady=(0, 6))
attach_ctk_tooltip(upd_cb, _TIP_CHECK_UPDATES)
if st.get("error"):
upd_status = "Не удалось связаться с GitHub. Проверьте сеть."
elif not st.get("checked"):
upd_status = "Статус появится после фоновой проверки при запуске."
elif st.get("has_update") and st.get("latest"):
upd_status = (
f"На GitHub доступна версия {st['latest']} "
f"(у вас {__version__})."
)
elif st.get("ahead_of_release") and st.get("latest"):
upd_status = (
f"У вас {__version__} — новее последнего релиза на GitHub "
f"({st['latest']})."
)
else:
upd_status = "Установлена последняя известная версия с GitHub."
_label(ctk, upd_inner, theme, upd_status, size=11,
justify="left", wraplength=_INNER_W).pack(anchor="w", pady=(0, 8))
rel_url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ctk.CTkButton(
upd_inner, text="Открыть страницу релиза", height=32,
font=(theme.ui_font_family, 13), corner_radius=8,
fg_color=theme.field_bg, hover_color=theme.field_border,
text_color=theme.text_primary, border_width=1,
border_color=theme.field_border,
command=lambda u=rel_url: webbrowser.open(u),
).pack(anchor="w")
autostart_var = None
if show_autostart:
sys_inner = _config_section(ctk, frame, theme, "Запуск Windows", bottom_spacer=4)
autostart_var = ctk.BooleanVar(value=autostart_value)
as_cb = _checkbox(ctk, sys_inner, theme, "Автозапуск при включении компьютера", autostart_var)
as_cb.pack(anchor="w", pady=(0, 4))
as_hint = _label(
ctk, sys_inner, theme,
"Если переместить программу в другую папку, запись автозапуска может сброситься.",
size=11, justify="left", wraplength=_INNER_W,
)
as_hint.pack(anchor="w")
attach_tooltip_to_widgets([as_cb, as_hint], _TIP_AUTOSTART)
return TrayConfigFormWidgets(
host_var=host_var, port_var=port_var, secret_var=secret_var,
dc_textbox=dc_textbox, verbose_var=verbose_var,
adv_entries=adv_entries, adv_keys=adv_keys,
autostart_var=autostart_var, check_updates_var=check_updates_var,
)
def merge_adv_from_form(
widgets: TrayConfigFormWidgets,
base: Dict[str, Any],
default_config: dict,
) -> None:
for i, key in enumerate(widgets.adv_keys):
col_frame = widgets.adv_entries[i]
entry = col_frame.winfo_children()[1]
try:
val = float(entry.get().strip())
if key in ("buf_kb", "pool_size"):
val = int(val)
base[key] = val
except ValueError:
base[key] = default_config[key]
def validate_config_form(
widgets: TrayConfigFormWidgets,
default_config: dict,
*,
include_autostart: bool,
) -> Union[dict, str]:
import socket as _sock
host_val = widgets.host_var.get().strip()
try:
_sock.inet_aton(host_val)
except OSError:
return "Некорректный IP-адрес."
try:
port_val = int(widgets.port_var.get().strip())
if not (1 <= port_val <= 65535):
raise ValueError
except ValueError:
return "Порт должен быть числом 1-65535"
lines = [
l.strip()
for l in widgets.dc_textbox.get("1.0", "end").strip().splitlines()
if l.strip()
]
try:
tg_ws_proxy.parse_dc_ip_list(lines)
except ValueError as e:
return str(e)
secret_val = widgets.secret_var.get().strip()
if len(secret_val) != 32:
return "Secret должен содержать ровно 32 hex-символа (16 байт)."
try:
bytes.fromhex(secret_val)
except ValueError:
return "Secret должен состоять только из hex-символов (0-9, a-f)."
new_cfg: Dict[str, Any] = {
"host": host_val,
"port": port_val,
"secret": secret_val,
"dc_ip": lines,
"verbose": widgets.verbose_var.get(),
}
if include_autostart:
new_cfg["autostart"] = (
widgets.autostart_var.get()
if widgets.autostart_var is not None
else False
)
merge_adv_from_form(widgets, new_cfg, default_config)
if widgets.check_updates_var is not None:
new_cfg["check_updates"] = bool(widgets.check_updates_var.get())
return new_cfg
def install_tray_config_buttons(
ctk: Any,
frame: Any,
theme: CtkTheme,
*,
on_save: Callable[[], None],
on_cancel: Callable[[], None],
) -> None:
ctk.CTkFrame(
frame,
fg_color=theme.field_border,
height=1,
corner_radius=0,
).pack(fill="x", pady=(4, 10))
btn_frame = ctk.CTkFrame(frame, fg_color="transparent")
btn_frame.pack(fill="x", pady=(0, 0))
save_btn = ctk.CTkButton(
btn_frame, text="Сохранить", height=38,
font=(theme.ui_font_family, 14, "bold"), corner_radius=10,
fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover,
text_color="#ffffff",
command=on_save)
save_btn.pack(side="left", fill="x", expand=True, padx=(0, 8))
attach_ctk_tooltip(save_btn, _TIP_SAVE)
cancel_btn = ctk.CTkButton(
btn_frame, text="Отмена", height=38,
font=(theme.ui_font_family, 14), corner_radius=10,
fg_color=theme.field_bg, hover_color=theme.field_border,
text_color=theme.text_primary, border_width=1,
border_color=theme.field_border,
command=on_cancel)
cancel_btn.pack(side="right", fill="x", expand=True)
attach_ctk_tooltip(cancel_btn, _TIP_CANCEL)
def populate_first_run_window(
ctk: Any,
root: Any,
theme: CtkTheme,
*,
host: str,
port: int,
secret: str,
on_done: Callable[[bool], None],
) -> None:
link_host = tg_ws_proxy.get_link_host(host)
tg_url = f"tg://proxy?server={link_host}&port={port}&secret=dd{secret}"
fpx, fpy = FIRST_RUN_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
title_frame = ctk.CTkFrame(frame, fg_color="transparent")
title_frame.pack(anchor="w", pady=(0, 16), fill="x")
accent_bar = ctk.CTkFrame(title_frame, fg_color=theme.tg_blue,
width=4, height=32, corner_radius=2)
accent_bar.pack(side="left", padx=(0, 12))
ctk.CTkLabel(title_frame, text="Прокси запущен и работает в системном трее",
font=(theme.ui_font_family, 17, "bold"),
text_color=theme.text_primary).pack(side="left")
sections = [
("Как подключить Telegram Desktop:", True),
(" Автоматически:", True),
(" ПКМ по иконке в трее → «Открыть в Telegram»", False),
(f" Или скопировать ссылку, отправить её себе в TG и нажать по ней: {tg_url}", False),
("\n Вручную:", True),
(" Настройки → Продвинутые → Тип подключения → Прокси", False),
(f" MTProto → {link_host} : {port}", False),
(f" Secret: dd{secret}", False),
]
textbox = ctk.CTkTextbox(
frame,
font=(theme.ui_font_family, 13),
fg_color=theme.bg,
border_width=0,
text_color=theme.text_primary,
activate_scrollbars=False,
wrap="word",
height=275,
)
textbox._textbox.tag_configure("bold", font=(theme.ui_font_family, 13, "bold"))
textbox._textbox.configure(spacing1=1, spacing3=1)
for text, bold in sections:
if text.startswith("\n"):
textbox.insert("end", "\n")
text = text[1:]
if bold:
textbox.insert("end", text + "\n", "bold")
else:
textbox.insert("end", text + "\n")
textbox.configure(state="disabled")
textbox.pack(anchor="w", fill="x")
ctk.CTkFrame(frame, fg_color="transparent", height=16).pack()
ctk.CTkFrame(frame, fg_color=theme.field_border, height=1,
corner_radius=0).pack(fill="x", pady=(0, 12))
auto_var = ctk.BooleanVar(value=True)
_checkbox(ctk, frame, theme, "Открыть прокси в Telegram сейчас",
auto_var).pack(anchor="w", pady=(0, 16))
def on_ok():
on_done(auto_var.get())
ctk.CTkButton(frame, text="Начать", width=180, height=42,
font=(theme.ui_font_family, 15, "bold"), corner_radius=10,
fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover,
text_color="#ffffff",
command=on_ok).pack(pady=(0, 0))
root.protocol("WM_DELETE_WINDOW", on_ok)

View File

@ -1,5 +0,0 @@
"""Вспомогательные утилиты (проверка релизов и т.п.)."""
from utils.update_check import RELEASES_PAGE_URL, get_status, run_check
__all__ = ["RELEASES_PAGE_URL", "get_status", "run_check"]

View File

@ -1,30 +0,0 @@
"""
Общие значения по умолчанию для tray-приложений (Windows / Linux / macOS).
Единственное отличие по платформе ключ autostart только на Windows.
"""
from __future__ import annotations
import sys
import os
from typing import Any, Dict
_TRAY_DEFAULTS_COMMON: Dict[str, Any] = {
"port": 1443,
"host": "127.0.0.1",
"dc_ip": ["2:149.154.167.220", "4:149.154.167.220"],
"verbose": False,
"check_updates": True,
"log_max_mb": 5,
"buf_kb": 256,
"pool_size": 4,
}
def default_tray_config() -> Dict[str, Any]:
cfg = dict(_TRAY_DEFAULTS_COMMON)
cfg["secret"] = os.urandom(16).hex()
if sys.platform == "win32":
cfg["autostart"] = False
return cfg

View File

@ -1,460 +0,0 @@
from __future__ import annotations
import asyncio
import json
import logging
import logging.handlers
import os
import socket as _socket
import sys
import threading
import time
from pathlib import Path
from typing import Any, Callable, Dict, Optional, Tuple
import psutil
import proxy.tg_ws_proxy as tg_ws_proxy
from proxy import __version__
from utils.default_config import default_tray_config
log = logging.getLogger("tg-ws-tray")
APP_NAME = "TgWsProxy"
def _app_dir() -> Path:
if sys.platform == "win32":
return Path(os.environ.get("APPDATA", Path.home())) / APP_NAME
if sys.platform == "darwin":
return Path.home() / "Library" / "Application Support" / APP_NAME
return Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / APP_NAME
APP_DIR = _app_dir()
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done_mtproto"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
DEFAULT_CONFIG: Dict[str, Any] = default_tray_config()
IS_FROZEN = bool(getattr(sys, "frozen", False))
def ensure_dirs() -> None:
APP_DIR.mkdir(parents=True, exist_ok=True)
# single-instance lock
_lock_file_path: Optional[Path] = None
def _same_process(meta: dict, proc: psutil.Process, script_hint: str) -> bool:
try:
lock_ct = float(meta.get("create_time", 0.0))
if lock_ct > 0 and abs(lock_ct - proc.create_time()) > 1.0:
return False
except Exception:
return False
if IS_FROZEN:
return APP_NAME.lower() in proc.name().lower()
try:
for arg in proc.cmdline():
if script_hint in arg:
return True
except Exception:
pass
return False
def acquire_lock(script_hint: str = "") -> bool:
global _lock_file_path
ensure_dirs()
for f in list(APP_DIR.glob("*.lock")):
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
meta: dict = {}
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
pass
try:
if _same_process(meta, psutil.Process(pid), script_hint):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
lock_file.write_text(
json.dumps({"create_time": proc.create_time()}, ensure_ascii=False),
encoding="utf-8",
)
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
def release_lock() -> None:
global _lock_file_path
if _lock_file_path:
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
# config
def load_config() -> dict:
ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict) -> None:
ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
# logging
_LOG_FMT_FILE = "%(asctime)s %(levelname)-5s %(name)s %(message)s"
_LOG_FMT_CONSOLE = "%(asctime)s %(levelname)-5s %(message)s"
def setup_logging(verbose: bool = False, log_max_mb: float = 5) -> None:
ensure_dirs()
level = logging.DEBUG if verbose else logging.INFO
root = logging.getLogger()
root.setLevel(level)
fh = logging.handlers.RotatingFileHandler(
str(LOG_FILE),
maxBytes=max(32 * 1024, int(log_max_mb * 1024 * 1024)),
backupCount=0,
encoding="utf-8",
)
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(_LOG_FMT_FILE, datefmt="%Y-%m-%d %H:%M:%S"))
root.addHandler(fh)
if not IS_FROZEN:
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(level)
ch.setFormatter(logging.Formatter(_LOG_FMT_CONSOLE, datefmt="%H:%M:%S"))
root.addHandler(ch)
# icon
def make_icon_image(size: int = 64, *, color: Tuple[int, ...] = (0, 136, 204, 255)):
from PIL import Image, ImageDraw, ImageFont
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = 2
draw.ellipse([margin, margin, size - margin, size - margin], fill=color)
for path in _font_paths():
try:
font = ImageFont.truetype(path, size=int(size * 0.55))
break
except Exception:
continue
else:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
draw.text(
((size - tw) // 2 - bbox[0], (size - th) // 2 - bbox[1]),
"T",
fill=(255, 255, 255, 255),
font=font,
)
return img
def _font_paths():
if sys.platform == "win32":
return ["arial.ttf"]
if sys.platform == "darwin":
return ["/System/Library/Fonts/Helvetica.ttc"]
return [
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/TTF/DejaVuSans-Bold.ttf",
]
def load_icon():
from PIL import Image
icon_path = Path(__file__).parents[1] / "icon.ico"
if icon_path.exists():
try:
return Image.open(str(icon_path))
except Exception:
pass
return make_icon_image(64)
# proxy lifecycle
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[Tuple[asyncio.AbstractEventLoop, asyncio.Event]] = None
def _run_proxy_thread(on_port_busy: Callable[[str], None]) -> None:
global _async_stop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
stop_ev = asyncio.Event()
_async_stop = (loop, stop_ev)
try:
loop.run_until_complete(tg_ws_proxy._run(stop_event=stop_ev))
except Exception as exc:
log.error("Proxy thread crashed: %s", exc)
if "Address already in use" in str(exc) or "10048" in str(exc):
on_port_busy(
"Не удалось запустить прокси:\n"
"Порт уже используется другим приложением.\n\n"
"Закройте приложение, использующее этот порт, "
"или измените порт в настройках прокси и перезапустите."
)
finally:
loop.close()
_async_stop = None
def apply_proxy_config(cfg: dict) -> bool:
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
try:
dc_redirects = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
return False
pc = tg_ws_proxy.proxy_config
pc.port = cfg.get("port", DEFAULT_CONFIG["port"])
pc.host = cfg.get("host", DEFAULT_CONFIG["host"])
pc.secret = cfg.get("secret", DEFAULT_CONFIG["secret"])
pc.dc_redirects = dc_redirects
pc.buffer_size = max(4, cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])) * 1024
pc.pool_size = max(0, cfg.get("pool_size", DEFAULT_CONFIG["pool_size"]))
return True
def start_proxy(cfg: dict, on_error: Callable[[str], None]) -> None:
global _proxy_thread
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
if not apply_proxy_config(cfg):
on_error("Ошибка конфигурации DC → IP.")
return
pc = tg_ws_proxy.proxy_config
log.info("Starting proxy on %s:%d ...", pc.host, pc.port)
_proxy_thread = threading.Thread(
target=_run_proxy_thread, args=(on_error,), daemon=True, name="proxy"
)
_proxy_thread.start()
def stop_proxy() -> None:
global _proxy_thread, _async_stop
if _async_stop:
loop, stop_ev = _async_stop
loop.call_soon_threadsafe(stop_ev.set)
if _proxy_thread:
_proxy_thread.join(timeout=5)
_proxy_thread = None
log.info("Proxy stopped")
def restart_proxy(cfg: dict, on_error: Callable[[str], None]) -> None:
log.info("Restarting proxy...")
stop_proxy()
time.sleep(0.3)
start_proxy(cfg, on_error)
def tg_proxy_url(cfg: dict) -> str:
host = cfg.get("host", DEFAULT_CONFIG["host"])
port = cfg.get("port", DEFAULT_CONFIG["port"])
secret = cfg.get("secret", DEFAULT_CONFIG["secret"])
link_host = tg_ws_proxy.get_link_host(host)
return f"tg://proxy?server={link_host}&port={port}&secret=dd{secret}"
_IPV6_WARNING = (
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает или в логах присутствуют ошибки, "
"связанные с попытками подключения по IPv6 - "
"попробуйте отключить в настройках прокси Telegram попытку соединения "
"по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 "
"в системе.\n\n"
"Это предупреждение будет показано только один раз."
)
def _has_ipv6() -> bool:
try:
for addr in _socket.getaddrinfo(_socket.gethostname(), None, _socket.AF_INET6):
ip = addr[4][0]
if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"):
return True
except Exception:
pass
try:
s = _socket.socket(_socket.AF_INET6, _socket.SOCK_STREAM)
s.bind(("::1", 0))
s.close()
return True
except Exception:
return False
def check_ipv6_warning(show_info: Callable[[str, str], None]) -> None:
ensure_dirs()
if IPV6_WARN_MARKER.exists() or not _has_ipv6():
return
IPV6_WARN_MARKER.touch()
threading.Thread(
target=lambda: show_info(_IPV6_WARNING, "TG WS Proxy"),
daemon=True,
).start()
# update check
def maybe_notify_update(
cfg: dict,
is_exiting: Callable[[], bool],
ask_open: Callable[[str, str], bool],
) -> None:
if not cfg.get("check_updates", True):
return
def _work():
time.sleep(1.5)
if is_exiting():
return
try:
from utils.update_check import RELEASES_PAGE_URL, get_status, run_check
import webbrowser
run_check(__version__)
st = get_status()
if not st.get("has_update"):
return
url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ver = st.get("latest") or "?"
if ask_open(
f"Доступна новая версия: {ver}\n\nОткрыть страницу релиза в браузере?",
"TG WS Proxy — обновление",
):
webbrowser.open(url)
except Exception as exc:
log.debug("Update check failed: %s", exc)
threading.Thread(target=_work, daemon=True, name="update-check").start()
# ctk thread (windows / linux)
_ctk_root: Any = None
_ctk_root_ready = threading.Event()
def ensure_ctk_thread(ctk: Any) -> bool:
global _ctk_root
if ctk is None:
return False
if _ctk_root_ready.is_set():
return True
def _run():
global _ctk_root
from ui.ctk_theme import apply_ctk_appearance, install_tkinter_variable_del_guard
install_tkinter_variable_del_guard()
apply_ctk_appearance(ctk)
_ctk_root = ctk.CTk()
_ctk_root.withdraw()
_ctk_root_ready.set()
_ctk_root.mainloop()
threading.Thread(target=_run, daemon=True, name="ctk-root").start()
_ctk_root_ready.wait(timeout=5.0)
return _ctk_root is not None
def ctk_run_dialog(build_fn: Callable[[threading.Event], None]) -> None:
if _ctk_root is None:
return
done = threading.Event()
def _invoke():
try:
build_fn(done)
except Exception:
log.exception("CTk dialog failed")
done.set()
_ctk_root.after(0, _invoke)
done.wait()
import gc
gc.collect()
def quit_ctk() -> None:
if _ctk_root is not None:
try:
_ctk_root.after(0, _ctk_root.quit)
except Exception:
pass
# common bootstrap
def bootstrap(cfg: dict) -> None:
save_config(cfg)
if LOG_FILE.exists():
try:
LOG_FILE.unlink()
except Exception:
pass
setup_logging(
cfg.get("verbose", False),
log_max_mb=cfg.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]),
)
log.info("TG WS Proxy версия %s starting", __version__)
log.info("Config: %s", cfg)
log.info("Log file: %s", LOG_FILE)

View File

@ -1,223 +0,0 @@
"""
Минимальная проверка новой версии через GitHub Releases API (без сторонних зависимостей).
Ограничение частоты запросов: не чаще одного раза в час на машину (кэш в каталоге
данных приложения). Поддерживается If-None-Match (ETag) для ответа 304.
"""
from __future__ import annotations
import json
import os
import sys
import time
from itertools import zip_longest
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
REPO = "Flowseal/tg-ws-proxy"
RELEASES_LATEST_API = f"https://api.github.com/repos/{REPO}/releases/latest"
RELEASES_PAGE_URL = f"https://github.com/{REPO}/releases/latest"
# Не чаще одного полного запроса к API в час (без учёта 304 с тем же ETag).
_MIN_FETCH_INTERVAL_SEC = 3600.0
_state: Dict[str, Any] = {
"checked": False,
"has_update": False,
"ahead_of_release": False,
"latest": None,
"html_url": None,
"error": None,
}
def _cache_file() -> Optional[Path]:
try:
if sys.platform == "win32":
root = Path(os.environ.get("APPDATA", str(Path.home()))) / "TgWsProxy"
elif sys.platform == "darwin":
root = Path.home() / "Library/Application Support/TgWsProxy"
else:
xdg = os.environ.get("XDG_CONFIG_HOME")
root = (Path(xdg).expanduser() if xdg else Path.home() / ".config") / "TgWsProxy"
root.mkdir(parents=True, exist_ok=True)
return root / ".update_check_cache.json"
except OSError:
return None
def _load_cache(path: Optional[Path]) -> Dict[str, Any]:
if not path or not path.is_file():
return {}
try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {}
def _save_cache(path: Optional[Path], data: Dict[str, Any]) -> None:
if not path:
return
try:
path.write_text(json.dumps(data), encoding="utf-8")
except OSError:
pass
def _parse_version_tuple(s: str) -> tuple:
s = (s or "").strip().lstrip("vV")
if not s:
return (0,)
parts = []
for seg in s.split("."):
digits = "".join(c for c in seg if c.isdigit())
if digits:
try:
parts.append(int(digits))
except ValueError:
parts.append(0)
else:
parts.append(0)
return tuple(parts) if parts else (0,)
def _version_gt(a: str, b: str) -> bool:
"""True, если версия a новее b (простое сравнение по сегментам)."""
ta = _parse_version_tuple(a)
tb = _parse_version_tuple(b)
for x, y in zip_longest(ta, tb, fillvalue=0):
if x > y:
return True
if x < y:
return False
return False
def _apply_release_tag(
tag: str, html_url: str, current_version: str,
) -> None:
global _state
if not tag:
_state["has_update"] = False
_state["ahead_of_release"] = False
_state["latest"] = None
_state["html_url"] = html_url.strip() or RELEASES_PAGE_URL
return
latest_clean = tag.lstrip("vV")
cur = (current_version or "").strip().lstrip("vV")
_state["latest"] = latest_clean
_state["html_url"] = html_url.strip() or RELEASES_PAGE_URL
_state["has_update"] = _version_gt(latest_clean, cur)
_state["ahead_of_release"] = bool(latest_clean) and _version_gt(
cur, latest_clean
)
def fetch_latest_release(
timeout: float = 12.0,
etag: Optional[str] = None,
) -> Tuple[Optional[dict], Optional[str], int]:
"""
GET releases/latest. Возвращает (data или None при 304, etag или None, HTTP-код).
"""
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "tg-ws-proxy-update-check",
}
if etag:
headers["If-None-Match"] = etag
req = Request(
RELEASES_LATEST_API,
headers=headers,
method="GET",
)
try:
with urlopen(req, timeout=timeout) as resp:
code = getattr(resp, "status", None) or resp.getcode()
new_etag = resp.headers.get("ETag")
raw = resp.read().decode("utf-8", errors="replace")
return json.loads(raw), new_etag, int(code)
except HTTPError as e:
if e.code == 304:
hdrs = e.headers
new_etag = hdrs.get("ETag") if hdrs else None
return None, new_etag or etag, 304
raise
def run_check(current_version: str) -> None:
"""Запрашивает последний релиз и обновляет внутреннее состояние."""
global _state
_state["checked"] = True
_state["error"] = None
cache_path = _cache_file()
cache = _load_cache(cache_path)
now = time.time()
last_attempt = float(cache.get("last_attempt_at") or 0)
if last_attempt and (now - last_attempt) < _MIN_FETCH_INTERVAL_SEC:
tag = (cache.get("tag_name") or "").strip()
if tag:
_apply_release_tag(tag, cache.get("html_url") or "", current_version)
return
err = cache.get("last_error")
_state["error"] = (
err if err else "Проверка обновлений отложена (интервал между запросами)."
)
_state["has_update"] = False
_state["ahead_of_release"] = False
_state["latest"] = None
_state["html_url"] = RELEASES_PAGE_URL
return
etag = (cache.get("etag") or "").strip() or None
try:
data, new_etag, code = fetch_latest_release(etag=etag)
cache["last_attempt_at"] = now
if code == 304:
tag = (cache.get("tag_name") or "").strip()
url = (cache.get("html_url") or "").strip() or RELEASES_PAGE_URL
_apply_release_tag(tag, url, current_version)
if new_etag:
cache["etag"] = new_etag
_save_cache(cache_path, cache)
return
assert data is not None
tag = (data.get("tag_name") or "").strip()
html_url = (data.get("html_url") or "").strip() or RELEASES_PAGE_URL
if not tag:
_state["has_update"] = False
_state["ahead_of_release"] = False
_state["latest"] = None
_state["html_url"] = html_url
else:
_apply_release_tag(tag, html_url, current_version)
if new_etag:
cache["etag"] = new_etag
cache["tag_name"] = tag
cache["html_url"] = html_url
cache.pop("last_error", None)
_save_cache(cache_path, cache)
except (HTTPError, URLError, OSError, TimeoutError, ValueError, json.JSONDecodeError) as e:
cache["last_attempt_at"] = now
msg = str(e)
if isinstance(e, HTTPError) and e.code == 403:
msg = (
"GitHub API вернул 403 (лимит или доступ). Повторите позже."
)
cache["last_error"] = msg
_save_cache(cache_path, cache)
_state["error"] = msg
_state["has_update"] = False
_state["ahead_of_release"] = False
_state["latest"] = None
_state["html_url"] = RELEASES_PAGE_URL
def get_status() -> Dict[str, Any]:
"""Снимок состояния после run_check (для подписей в настройках)."""
return dict(_state)

File diff suppressed because it is too large Load Diff