Compare commits

..

No commits in common. "a5641700ed892e47b904e7acfc8a2085a1f1a300" and "79840806f2d0c9f32c8b1003e4fbf62630b09f13" have entirely different histories.

31 changed files with 2295 additions and 2086 deletions

View File

@ -306,7 +306,7 @@ jobs:
Maintainer: Flowseal
Depends: libgtk-3-0, libayatana-appindicator3-1, python3-tk
Description: Telegram Desktop WebSocket Bridge Proxy
MTProto/WebSocket bridge proxy for Telegram Desktop with tray UI.
SOCKS5/WebSocket bridge proxy for Telegram Desktop with tray UI.
EOF
dpkg-deb --build --root-owner-group \

1
.gitignore vendored
View File

@ -24,7 +24,6 @@ local.properties
android/.idea/
android/build/
android/app/build/
android/app/build
android/*.jks
*.keystore
android/*.keystore.properties

View File

@ -23,7 +23,7 @@ ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PATH=/opt/venv/bin:$PATH \
TG_WS_PROXY_HOST=0.0.0.0 \
TG_WS_PROXY_PORT=1443 \
TG_WS_PROXY_PORT=1080 \
TG_WS_PROXY_DC_IPS="2:149.154.167.220 4:149.154.167.220"
RUN apt-get update \
@ -39,7 +39,7 @@ COPY README.md LICENSE ./
USER app
EXPOSE 1443/tcp
EXPOSE 1080/tcp
ENTRYPOINT ["/usr/bin/tini", "--", "/bin/sh", "-lc", "set -eu; args=\"--host ${TG_WS_PROXY_HOST} --port ${TG_WS_PROXY_PORT}\"; for dc in ${TG_WS_PROXY_DC_IPS}; do args=\"$args --dc-ip $dc\"; done; exec python -u proxy/tg_ws_proxy.py $args \"$@\"", "--"]
CMD []

View File

@ -12,17 +12,17 @@
# TG WS Proxy
**Локальный MTProto-прокси** для Telegram Desktop, который **ускоряет работу Telegram**, перенаправляя трафик через WebSocket-соединения. Данные передаются в том же зашифрованном виде, а для работы не нужны сторонние сервера.
**Локальный SOCKS5-прокси** для Telegram Desktop, который **ускоряет работу Telegram**, перенаправляя трафик через WebSocket-соединения. Данные передаются в том же зашифрованном виде, а для работы не нужны сторонние сервера.
<img width="529" height="487" alt="image" src="https://github.com/user-attachments/assets/6a4cf683-0df8-43af-86c1-0e8f08682b62" />
## Как это работает
```
Telegram Desktop → MTProto Proxy (127.0.0.1:1443) → WebSocket → Telegram DC
Telegram Desktop → SOCKS5 (127.0.0.1:1080) → TG WS Proxy → WSS → Telegram DC
```
1. Приложение поднимает MTProto прокси на `127.0.0.1:1443`
1. Приложение поднимает локальный SOCKS5-прокси на `127.0.0.1:1080`
2. Перехватывает подключения к IP-адресам Telegram
3. Извлекает DC ID из MTProto obfuscation init-пакета
4. Устанавливает WebSocket (TLS) соединение к соответствующему DC через домены Telegram
@ -38,7 +38,7 @@ Telegram Desktop → MTProto Proxy (127.0.0.1:1443) → WebSocket → Telegram D
**Меню трея:**
- **Открыть в Telegram** — автоматически настроить прокси через `tg://proxy` ссылку
- **Открыть в Telegram** — автоматически настроить прокси через `tg://socks` ссылку
- **Перезапустить прокси** — перезапуск без выхода из приложения
- **Настройки...** — GUI-редактор конфигурации (в т.ч. версия приложения, опциональная проверка обновлений с GitHub)
- **Открыть логи** — открыть файл логов
@ -69,9 +69,8 @@ makepkg -si
# При помощи AUR-helper
paru -S tg-ws-proxy-bin
# Если вы установили -cli пакет, то запуск осуществляется через systemctl, где 8888 это номер порта,
# разделитель ":" и secret, который можно сгенерировать командой: openssl rand -hex 16
sudo systemctl start tg-ws-proxy-cli@8888:3075abe65830f0325116bb0416cadf9f
# Если вы установили -cli пакет, то запуск осуществляется через systemctl, где 8888 это номер порта прокси:
sudo systemctl start tg-ws-proxy-cli@8888
```
Для остальных дистрибутивов можно использовать **`TgWsProxy_linux_amd64`** (бинарный файл для x86_64).
@ -104,7 +103,7 @@ chmod +x TgWsProxy_linux_amd64
### Консольный proxy
Для запуска только proxy без tray-интерфейса достаточно базовой установки:
Для запуска только SOCKS5/WebSocket proxy без tray-интерфейса достаточно базовой установки:
```bash
pip install -e .
@ -152,19 +151,51 @@ tg-ws-proxy [--port PORT] [--host HOST] [--dc-ip DC:IP ...] [-v]
android/app/build/outputs/apk/standard/debug/app-standard-debug.apk
```
Legacy32 debug-сборка:
```bash
./android/build-local-debug.sh assembleLegacy32Debug
```
Результат:
```text
android/app/build/outputs/apk/legacy32/debug/app-legacy32-debug.apk
```
### Android signed release APK
Для локальной release-сборки нужен keystore и переменные окружения:
```bash
export ANDROID_KEYSTORE_FILE=/path/to/tg-ws-proxy-release.keystore
export ANDROID_KEYSTORE_PASSWORD=...
export ANDROID_KEY_ALIAS=tg-ws-proxy
export ANDROID_KEY_PASSWORD=...
```
Сборка:
```bash
cd android
./build-local-debug.sh assembleStandardRelease
./build-local-debug.sh assembleLegacy32Release
```
Результат:
```text
android/app/build/outputs/apk/standard/release/app-standard-release.apk
android/app/build/outputs/apk/legacy32/release/app-legacy32-release.apk
```
**Аргументы:**
| Аргумент | По умолчанию | Описание |
|---|---|---|
| `--port` | `1443` | Порт прокси |
| `--host` | `127.0.0.1` | Хост прокси |
| `--secret` | `random` | 32 hex chars secret для авторизации клиентов |
| `--port` | `1080` | Порт SOCKS5-прокси |
| `--host` | `127.0.0.1` | Хост SOCKS5-прокси |
| `--dc-ip` | `2:149.154.167.220`, `4:149.154.167.220` | Целевой IP для DC (можно указать несколько раз) |
| `--buf-kb` | `256` | Размер буфера в КБ
| `--pool-size` | `4` | Количество заготовленных соединений на каждый DC
| `--log-file` | выкл. | Путь до файла, в который сохранять логи
| `--log-max-mb` | `5` | Максимальный размер файла логов в МБ (после идёт перезапись)
| `--log-backups` | `0` | Количество сохранений логов после перезаписи
| `-v`, `--verbose` | выкл. | Подробное логирование (DEBUG) |
**Примеры:**
@ -204,10 +235,10 @@ tg-ws-proxy-tray-linux = "linux:main"
1. Telegram → **Настройки****Продвинутые настройки****Тип подключения** → **Прокси**
2. Добавить прокси:
- **Тип:** MTProto
- **Сервер:** `127.0.0.1` (или переопределенный вами)
- **Порт:** `1443` (или переопределенный вами)
- **Secret:** из настроек или логов
- **Тип:** SOCKS5
- **Сервер:** `127.0.0.1`
- **Порт:** `1080`
- **Логин/Пароль:** оставить пустыми
## Настройка Telegram Android
@ -219,10 +250,10 @@ tg-ws-proxy-tray-linux = "linux:main"
1. Telegram → **Настройки****Данные и память** → **Настройки прокси**
2. Добавить прокси:
- **Тип:** MTProto
- **Тип:** SOCKS5
- **Сервер:** `127.0.0.1`
- **Порт:** `1443`
- **Secret:** из настроек приложения
- **Порт:** `1080`
- **Логин/Пароль:** оставить пустыми
Важно:
@ -240,8 +271,7 @@ Tray-приложение хранит данные в:
```json
{
"host": "127.0.0.1",
"port": 1443,
"secret": "...",
"port": 1080,
"dc_ip": [
"2:149.154.167.220",
"4:149.154.167.220"
@ -268,6 +298,18 @@ Tray-приложение хранит данные в:
- Intel macOS 10.15+
- Apple Silicon macOS 11.0+
- Linux x86_64 (требуется AppIndicator для системного трея)
Android-артефакты:
- `tg-ws-proxy-android-vX.Y.Z.apk`
- `tg-ws-proxy-android-vX.Y.Z-legacy32.apk`
Для signed Android release в GitHub Actions нужны secrets:
- `ANDROID_KEYSTORE_BASE64`
- `ANDROID_KEYSTORE_PASSWORD`
- `ANDROID_KEY_ALIAS`
- `ANDROID_KEY_PASSWORD`
## Лицензия
[MIT License](LICENSE)

View File

@ -128,7 +128,6 @@ class MainActivity : AppCompatActivity() {
private fun renderConfig(config: ProxyConfig) {
binding.hostInput.setText(config.host)
binding.portInput.setText(config.portText)
binding.secretInput.setText(config.secretText)
binding.dcIpInput.setText(config.dcIpText)
binding.logMaxMbInput.setText(config.logMaxMbText)
binding.bufferKbInput.setText(config.bufferKbText)
@ -142,7 +141,6 @@ class MainActivity : AppCompatActivity() {
return ProxyConfig(
host = binding.hostInput.text?.toString().orEmpty(),
portText = binding.portInput.text?.toString().orEmpty(),
secretText = binding.secretInput.text?.toString().orEmpty(),
dcIpText = binding.dcIpInput.text?.toString().orEmpty(),
logMaxMbText = binding.logMaxMbInput.text?.toString().orEmpty(),
bufferKbText = binding.bufferKbInput.text?.toString().orEmpty(),
@ -170,7 +168,7 @@ class MainActivity : AppCompatActivity() {
}
}.getOrElse { exc ->
ProxyUpdateStatus(
currentVersion = currentAppVersionName(),
currentVersion = "unknown",
error = exc.message ?: exc.javaClass.simpleName,
)
}

View File

@ -1,11 +1,8 @@
package org.flowseal.tgwsproxy
import java.security.SecureRandom
data class ProxyConfig(
val host: String = DEFAULT_HOST,
val portText: String = DEFAULT_PORT.toString(),
val secretText: String = DEFAULT_SECRET,
val dcIpText: String = DEFAULT_DC_IP_LINES.joinToString("\n"),
val logMaxMbText: String = formatDecimal(DEFAULT_LOG_MAX_MB),
val bufferKbText: String = DEFAULT_BUFFER_KB.toString(),
@ -25,13 +22,6 @@ data class ProxyConfig(
return ValidationResult(errorMessage = "Порт должен быть в диапазоне 1-65535.")
}
val secretValue = secretText.trim().lowercase()
if (secretValue.length != 32 || !secretValue.all { it in "0123456789abcdef" }) {
return ValidationResult(
errorMessage = "MTProto secret должен содержать ровно 32 hex-символа."
)
}
val lines = dcIpText
.lineSequence()
.map { it.trim() }
@ -85,7 +75,6 @@ data class ProxyConfig(
normalized = NormalizedProxyConfig(
host = hostValue,
port = portValue,
secret = secretValue,
dcIpList = lines,
logMaxMb = logMaxMbValue,
bufferKb = bufferKbValue,
@ -98,11 +87,10 @@ data class ProxyConfig(
companion object {
const val DEFAULT_HOST = "127.0.0.1"
const val DEFAULT_PORT = 1443
const val DEFAULT_PORT = 1080
const val DEFAULT_LOG_MAX_MB = 5.0
const val DEFAULT_BUFFER_KB = 256
const val DEFAULT_POOL_SIZE = 4
val DEFAULT_SECRET = generateSecret()
val DEFAULT_DC_IP_LINES = listOf(
"2:149.154.167.220",
"4:149.154.167.220",
@ -116,12 +104,6 @@ data class ProxyConfig(
}
}
private fun generateSecret(): String {
val bytes = ByteArray(16)
SecureRandom().nextBytes(bytes)
return bytes.joinToString(separator = "") { "%02x".format(it) }
}
private fun isIpv4Address(value: String): Boolean {
val octets = value.split(".")
if (octets.size != 4) {
@ -146,7 +128,6 @@ data class ValidationResult(
data class NormalizedProxyConfig(
val host: String,
val port: Int,
val secret: String,
val dcIpList: List<String>,
val logMaxMb: Double,
val bufferKb: Int,

View File

@ -9,7 +9,6 @@ class ProxySettingsStore(context: Context) {
return ProxyConfig(
host = preferences.getString(KEY_HOST, ProxyConfig.DEFAULT_HOST).orEmpty(),
portText = preferences.getInt(KEY_PORT, ProxyConfig.DEFAULT_PORT).toString(),
secretText = preferences.getString(KEY_SECRET, ProxyConfig.DEFAULT_SECRET).orEmpty(),
dcIpText = preferences.getString(
KEY_DC_IP_TEXT,
ProxyConfig.DEFAULT_DC_IP_LINES.joinToString("\n"),
@ -37,7 +36,6 @@ class ProxySettingsStore(context: Context) {
preferences.edit()
.putString(KEY_HOST, config.host)
.putInt(KEY_PORT, config.port)
.putString(KEY_SECRET, config.secret)
.putString(KEY_DC_IP_TEXT, config.dcIpList.joinToString("\n"))
.putFloat(KEY_LOG_MAX_MB, config.logMaxMb.toFloat())
.putInt(KEY_BUFFER_KB, config.bufferKb)
@ -51,7 +49,6 @@ class ProxySettingsStore(context: Context) {
private const val PREFS_NAME = "proxy_settings"
private const val KEY_HOST = "host"
private const val KEY_PORT = "port"
private const val KEY_SECRET = "secret"
private const val KEY_DC_IP_TEXT = "dc_ip_text"
private const val KEY_LOG_MAX_MB = "log_max_mb"
private const val KEY_BUFFER_KB = "buf_kb"

View File

@ -17,7 +17,6 @@ object PythonProxyBridge {
File(context.filesDir, "tg-ws-proxy").absolutePath,
config.host,
config.port,
config.secret,
config.dcIpList,
config.logMaxMb,
config.bufferKb,

View File

@ -8,7 +8,7 @@ import android.net.Uri
object TelegramProxyIntent {
fun open(context: Context, config: NormalizedProxyConfig): Boolean {
val uri = Uri.parse(
"tg://proxy?server=${Uri.encode(config.host)}&port=${config.port}&secret=dd${Uri.encode(config.secret)}"
"tg://socks?server=${Uri.encode(config.host)}&port=${config.port}"
)
val intent = Intent(Intent.ACTION_VIEW, uri)
.addFlags(Intent.FLAG_ACTIVITY_NEW_TASK)

View File

@ -44,7 +44,7 @@ def _normalize_dc_ip_list(dc_ip_list: Iterable[object]) -> list[str]:
return [str(item).strip() for item in values if str(item).strip()]
def start_proxy(app_dir: str, host: str, port: int, secret: str,
def start_proxy(app_dir: str, host: str, port: int,
dc_ip_list: Iterable[object], log_max_mb: float = 5.0,
buf_kb: int = 256, pool_size: int = 4,
verbose: bool = False) -> str:
@ -70,7 +70,6 @@ def start_proxy(app_dir: str, host: str, port: int, secret: str,
config = {
"host": host,
"port": int(port),
"secret": str(secret).strip(),
"dc_ip": _normalize_dc_ip_list(dc_ip_list),
"log_max_mb": float(log_max_mb),
"buf_kb": int(buf_kb),

View File

@ -243,20 +243,6 @@
android:maxLines="1" />
</com.google.android.material.textfield.TextInputLayout>
<com.google.android.material.textfield.TextInputLayout
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:layout_marginTop="16dp"
android:hint="@string/secret_hint">
<com.google.android.material.textfield.TextInputEditText
android:id="@+id/secretInput"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:inputType="textNoSuggestions"
android:maxLines="1" />
</com.google.android.material.textfield.TextInputLayout>
<com.google.android.material.textfield.TextInputLayout
android:layout_width="match_parent"
android:layout_height="wrap_content"

View File

@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<resources>
<string name="app_name">TG WS Proxy</string>
<string name="subtitle">Android app for the local Telegram MTProto proxy.</string>
<string name="subtitle">Android app for the local Telegram SOCKS5 proxy.</string>
<string name="status_label">Foreground service</string>
<string name="status_starting">Starting</string>
<string name="status_running">Running</string>
@ -33,7 +33,6 @@
<string name="updates_open_release_button">Open Release Page</string>
<string name="host_hint">Proxy host</string>
<string name="port_hint">Proxy port</string>
<string name="secret_hint">MTProto secret (32 hex characters)</string>
<string name="dc_ip_hint">DC to IP mappings (one DC:IP per line)</string>
<string name="log_max_mb_hint">Max log size before rotation (MB)</string>
<string name="buffer_kb_hint">Socket buffer size (KB)</string>
@ -52,12 +51,12 @@
<string name="settings_saved">Settings saved</string>
<string name="service_start_requested">Foreground service start requested</string>
<string name="service_restart_requested">Foreground service restart requested</string>
<string name="telegram_not_found">Telegram app was not found for tg://proxy.</string>
<string name="telegram_not_found">Telegram app was not found for tg://socks.</string>
<string name="notification_title">TG WS Proxy</string>
<string name="notification_channel_name">Proxy service</string>
<string name="notification_channel_description">Keeps the Telegram proxy service alive in the foreground.</string>
<string name="notification_starting">MTProto %1$s:%2$d • starting embedded Python</string>
<string name="notification_running">MTProto %1$s:%2$d • proxy active</string>
<string name="notification_starting">SOCKS5 %1$s:%2$d • starting embedded Python</string>
<string name="notification_running">SOCKS5 %1$s:%2$d • proxy active</string>
<string name="notification_endpoint">%1$s:%2$d</string>
<string name="notification_details">DC mappings: %1$d\nTraffic: ↑ %2$s/s ↓ %3$s/s\nTransferred: ↑ %4$s ↓ %5$s</string>
<string name="notification_action_stop">Stop</string>

View File

@ -3,7 +3,6 @@
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
APP_BUILD_DIR="$ROOT_DIR/app/build"
if [[ -z "${GRADLE_USER_HOME:-}" ]]; then
if [[ -d "$HOME/.gradle" && -w "$HOME/.gradle" ]]; then
@ -41,7 +40,6 @@ GRADLE_BIN="gradle"
if [[ -x "$ROOT_DIR/gradlew" ]]; then
GRADLE_BIN="$ROOT_DIR/gradlew"
fi
GRADLE_RUN_DIR="$ROOT_DIR"
ATTEMPTS="${ATTEMPTS:-5}"
SLEEP_SECONDS="${SLEEP_SECONDS:-15}"
@ -49,34 +47,6 @@ TASK="${1:-assembleStandardDebug}"
LOCAL_CHAQUOPY_REPO="${LOCAL_CHAQUOPY_REPO:-$ROOT_DIR/.m2-chaquopy}"
CHAQUOPY_MAVEN_BASE="${CHAQUOPY_MAVEN_BASE:-https://repo.maven.apache.org/maven2}"
running_on_wsl_windows_mount() {
[[ -n "${WSL_DISTRO_NAME:-}" && "$ROOT_DIR" == /mnt/* ]]
}
prepare_wsl_build_dir() {
if ! running_on_wsl_windows_mount; then
return 0
fi
local cache_root="${XDG_CACHE_HOME:-$HOME/.cache}/tg-ws-proxy-android-build"
local project_key
project_key="$(printf '%s' "$ROOT_DIR" | sha256sum | cut -d' ' -f1)"
local linux_build_dir="$cache_root/$project_key/app-build"
mkdir -p "$cache_root"
mkdir -p "$linux_build_dir"
if [[ -L "$APP_BUILD_DIR" ]]; then
return 0
fi
if [[ -e "$APP_BUILD_DIR" ]]; then
rm -rf "$APP_BUILD_DIR"
fi
ln -s "$linux_build_dir" "$APP_BUILD_DIR"
}
task_uses_legacy32() {
[[ "$TASK" =~ [Ll]egacy32 ]]
}
@ -153,11 +123,11 @@ prefetch_chaquopy_runtime() {
cleanup_stale_build_state() {
local stale_dirs=(
"$APP_BUILD_DIR/python/env"
"$APP_BUILD_DIR/intermediates/project_dex_archive"
"$APP_BUILD_DIR/intermediates/desugar_graph"
"$APP_BUILD_DIR/tmp/kotlin-classes"
"$APP_BUILD_DIR/snapshot/kotlin"
"$ROOT_DIR/app/build/python/env"
"$ROOT_DIR/app/build/intermediates/project_dex_archive"
"$ROOT_DIR/app/build/intermediates/desugar_graph"
"$ROOT_DIR/app/build/tmp/kotlin-classes"
"$ROOT_DIR/app/build/snapshot/kotlin"
)
for stale_dir in "${stale_dirs[@]}"; do
@ -168,14 +138,10 @@ cleanup_stale_build_state() {
}
prefetch_chaquopy_runtime
prepare_wsl_build_dir
for attempt in $(seq 1 "$ATTEMPTS"); do
echo "==> Android build attempt $attempt/$ATTEMPTS ($TASK)"
if (
cd "$GRADLE_RUN_DIR"
"$GRADLE_BIN" --no-daemon --console=plain "$TASK"
); then
if "$GRADLE_BIN" --no-daemon --console=plain "$TASK"; then
exit 0
fi

BIN
icon.ico

Binary file not shown.

Before

Width:  |  Height:  |  Size: 473 B

After

Width:  |  Height:  |  Size: 22 KiB

609
linux.py
View File

@ -1,44 +1,241 @@
from __future__ import annotations
import json
import logging
import logging.handlers
import os
import subprocess
import sys
import threading
import webbrowser
import time
from pathlib import Path
from typing import Optional
import customtkinter as ctk
import psutil
import pyperclip
import pystray
from PIL import Image, ImageTk
from PIL import Image, ImageDraw, ImageFont
import proxy.tg_ws_proxy as tg_ws_proxy
from utils.tray_common import (
APP_NAME, DEFAULT_CONFIG, FIRST_RUN_MARKER, LOG_FILE,
acquire_lock, bootstrap, check_ipv6_warning, ctk_run_dialog,
ensure_ctk_thread, ensure_dirs, load_config, load_icon, log,
maybe_notify_update, quit_ctk, release_lock, restart_proxy,
save_config, start_proxy, stop_proxy, tg_proxy_url,
)
from proxy.app_runtime import ProxyAppRuntime
from proxy import __version__
from utils.default_config import default_tray_config
from ui.ctk_tray_ui import (
install_tray_config_buttons, install_tray_config_form,
populate_first_run_window, tray_settings_scroll_and_footer,
install_tray_config_buttons,
install_tray_config_form,
populate_first_run_window,
tray_settings_scroll_and_footer,
validate_config_form,
)
from ui.ctk_theme import (
CONFIG_DIALOG_FRAME_PAD, CONFIG_DIALOG_SIZE, FIRST_RUN_SIZE,
create_ctk_toplevel, ctk_theme_for_platform, main_content_frame,
CONFIG_DIALOG_FRAME_PAD,
CONFIG_DIALOG_SIZE,
FIRST_RUN_SIZE,
create_ctk_root,
ctk_theme_for_platform,
main_content_frame,
)
APP_NAME = "TgWsProxy"
APP_DIR = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / APP_NAME
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
DEFAULT_CONFIG = default_tray_config()
_tray_icon: Optional[object] = None
_config: dict = {}
_exiting = False
_exiting: bool = False
_lock_file_path: Optional[Path] = None
# dialogs (tkinter messagebox)
log = logging.getLogger("tg-ws-tray")
_runtime = ProxyAppRuntime(
APP_DIR,
default_config=DEFAULT_CONFIG,
logger_name="tg-ws-tray",
on_error=lambda text: _show_error(text),
)
CONFIG_FILE = _runtime.config_file
LOG_FILE = _runtime.log_file
def _msgbox(kind: str, text: str, title: str, **kw):
def _same_process(lock_meta: dict, proc: psutil.Process) -> bool:
try:
lock_ct = float(lock_meta.get("create_time", 0.0))
proc_ct = float(proc.create_time())
if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0:
return False
except Exception:
return False
try:
cmdline = proc.cmdline()
for arg in cmdline:
if "linux.py" in arg:
return True
except Exception:
pass
frozen = bool(getattr(sys, "frozen", False))
if frozen:
return APP_NAME.lower() in proc.name().lower()
return False
def _release_lock():
global _lock_file_path
if not _lock_file_path:
return
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
def _acquire_lock() -> bool:
global _lock_file_path
_ensure_dirs()
lock_files = list(APP_DIR.glob("*.lock"))
for f in lock_files:
pid = None
meta: dict = {}
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
meta = {}
try:
proc = psutil.Process(pid)
if _same_process(meta, proc):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
payload = {
"create_time": proc.create_time(),
}
lock_file.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
def _ensure_dirs():
_runtime.ensure_dirs()
def load_config() -> dict:
return _runtime.load_config()
def save_config(cfg: dict):
_runtime.save_config(cfg)
def setup_logging(verbose: bool = False, log_max_mb: float = 5):
_runtime.setup_logging(verbose, log_max_mb=log_max_mb)
def _make_icon_image(size: int = 64):
if Image is None:
raise RuntimeError("Pillow is required for tray icon")
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = 2
draw.ellipse(
[margin, margin, size - margin, size - margin], fill=(0, 136, 204, 255)
)
try:
font = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
size=int(size * 0.55),
)
except Exception:
try:
font = ImageFont.truetype(
"/usr/share/fonts/TTF/DejaVuSans-Bold.ttf", size=int(size * 0.55)
)
except Exception:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
tx = (size - tw) // 2 - bbox[0]
ty = (size - th) // 2 - bbox[1]
draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font)
return img
def _load_icon():
icon_path = Path(__file__).parent / "icon.ico"
if icon_path.exists() and Image:
try:
return Image.open(str(icon_path))
except Exception:
pass
return _make_icon_image()
def start_proxy():
_runtime.start_proxy(_config)
def stop_proxy():
_runtime.stop_proxy()
def restart_proxy():
_runtime.restart_proxy()
def _show_error(text: str, title: str = "TG WS Proxy — Ошибка"):
import tkinter as _tk
from tkinter import messagebox as _mb
root = _tk.Tk()
root.withdraw()
_mb.showerror(title, text, parent=root)
root.destroy()
def _show_info(text: str, title: str = "TG WS Proxy"):
import tkinter as _tk
from tkinter import messagebox as _mb
root = _tk.Tk()
root.withdraw()
_mb.showinfo(title, text, parent=root)
root.destroy()
def _ask_yes_no_dialog(text: str, title: str = "TG WS Proxy") -> bool:
import tkinter as _tk
from tkinter import messagebox as _mb
@ -48,189 +245,273 @@ def _msgbox(kind: str, text: str, title: str, **kw):
root.attributes("-topmost", True)
except Exception:
pass
result = getattr(_mb, kind)(title, text, parent=root, **kw)
r = _mb.askyesno(title, text, parent=root)
root.destroy()
return result
return bool(r)
def _show_error(text: str, title: str = "TG WS Proxy — Ошибка") -> None:
_msgbox("showerror", text, title)
def _maybe_notify_update_async():
def _work():
time.sleep(1.5)
if _exiting:
return
if not _config.get("check_updates", True):
return
try:
from utils.update_check import RELEASES_PAGE_URL, get_status, run_check
run_check(__version__)
st = get_status()
if not st.get("has_update"):
return
url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ver = st.get("latest") or "?"
text = (
f"Доступна новая версия: {ver}\n\n"
f"Открыть страницу релиза в браузере?"
)
if _ask_yes_no_dialog(text, "TG WS Proxy — обновление"):
webbrowser.open(url)
except Exception as exc:
log.debug("Update check failed: %s", exc)
threading.Thread(target=_work, daemon=True, name="update-check").start()
def _show_info(text: str, title: str = "TG WS Proxy") -> None:
_msgbox("showinfo", text, title)
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
return bool(_msgbox("askyesno", text, title))
def _apply_window_icon(root) -> None:
icon_img = load_icon()
if icon_img:
root._ctk_icon_photo = ImageTk.PhotoImage(icon_img.resize((64, 64)))
root.iconphoto(False, root._ctk_icon_photo)
# tray callbacks
def _on_open_in_telegram(icon=None, item=None) -> None:
url = tg_proxy_url(_config)
def _on_open_in_telegram(icon=None, item=None):
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
url = f"tg://socks?server={host}&port={port}"
log.info("Copying %s", url)
try:
pyperclip.copy(url)
_show_info(
f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}"
f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}",
"TG WS Proxy",
)
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_copy_link(icon=None, item=None) -> None:
url = tg_proxy_url(_config)
log.info("Copying link: %s", url)
try:
pyperclip.copy(url)
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_restart(icon=None, item=None):
threading.Thread(target=restart_proxy, daemon=True).start()
def _on_restart(icon=None, item=None) -> None:
threading.Thread(
target=lambda: restart_proxy(_config, _show_error), daemon=True
).start()
def _on_edit_config(icon=None, item=None) -> None:
def _on_edit_config(icon=None, item=None):
threading.Thread(target=_edit_config_dialog, daemon=True).start()
def _on_open_logs(icon=None, item=None) -> None:
def _edit_config_dialog():
if ctk is None:
_show_error("customtkinter не установлен.")
return
cfg = dict(_config)
theme = ctk_theme_for_platform()
w, h = CONFIG_DIALOG_SIZE
root = create_ctk_root(
ctk,
title="TG WS Proxy — Настройки",
width=w,
height=h,
theme=theme,
after_create=_apply_linux_ctk_window_icon,
)
fpx, fpy = CONFIG_DIALOG_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
scroll, footer = tray_settings_scroll_and_footer(ctk, frame, theme)
widgets = install_tray_config_form(
ctk, scroll, theme, cfg, DEFAULT_CONFIG,
show_autostart=False,
)
def on_save():
merged = validate_config_form(
widgets, DEFAULT_CONFIG, include_autostart=False)
if isinstance(merged, str):
_show_error(merged)
return
new_cfg = merged
save_config(new_cfg)
_config.update(new_cfg)
log.info("Config saved: %s", new_cfg)
_tray_icon.menu = _build_menu()
from tkinter import messagebox
if messagebox.askyesno(
"Перезапустить?",
"Настройки сохранены.\n\nПерезапустить прокси сейчас?",
parent=root,
):
root.destroy()
restart_proxy()
else:
root.destroy()
def on_cancel():
root.destroy()
install_tray_config_buttons(
ctk, footer, theme, on_save=on_save, on_cancel=on_cancel)
try:
root.mainloop()
finally:
import tkinter as tk
try:
if root.winfo_exists():
root.destroy()
except tk.TclError:
pass
def _on_open_logs(icon=None, item=None):
log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists():
env = {k: v for k, v in os.environ.items() if k not in ("VIRTUAL_ENV", "PYTHONPATH", "PYTHONHOME")}
env = os.environ.copy()
env.pop("VIRTUAL_ENV", None)
env.pop("PYTHONPATH", None)
env.pop("PYTHONHOME", None)
subprocess.Popen(
["xdg-open", str(LOG_FILE)], env=env,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL, start_new_session=True,
["xdg-open", str(LOG_FILE)],
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
start_new_session=True,
)
else:
_show_info("Файл логов ещё не создан.")
_show_info("Файл логов ещё не создан.", "TG WS Proxy")
def _on_exit(icon=None, item=None) -> None:
def _on_exit(icon=None, item=None):
global _exiting
if _exiting:
os._exit(0)
return
_exiting = True
log.info("User requested exit")
quit_ctk()
threading.Thread(target=lambda: (time.sleep(3), os._exit(0)), daemon=True, name="force-exit").start()
def _force_exit():
time.sleep(3)
os._exit(0)
threading.Thread(target=_force_exit, daemon=True, name="force-exit").start()
if icon:
icon.stop()
# settings dialog
def _edit_config_dialog() -> None:
if not ensure_ctk_thread(ctk):
_show_error("customtkinter не установлен.")
return
cfg = dict(_config)
def _build(done: threading.Event) -> None:
theme = ctk_theme_for_platform()
w, h = CONFIG_DIALOG_SIZE
root = create_ctk_toplevel(
ctk, title="TG WS Proxy — Настройки", width=w, height=h, theme=theme,
after_create=_apply_window_icon,
)
fpx, fpy = CONFIG_DIALOG_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
scroll, footer = tray_settings_scroll_and_footer(ctk, frame, theme)
widgets = install_tray_config_form(ctk, scroll, theme, cfg, DEFAULT_CONFIG, show_autostart=False)
def _finish() -> None:
root.destroy()
done.set()
def on_save() -> None:
from tkinter import messagebox
merged = validate_config_form(widgets, DEFAULT_CONFIG, include_autostart=False)
if isinstance(merged, str):
messagebox.showerror("TG WS Proxy — Ошибка", merged, parent=root)
return
save_config(merged)
_config.update(merged)
log.info("Config saved: %s", merged)
_tray_icon.menu = _build_menu()
do_restart = messagebox.askyesno(
"Перезапустить?",
"Настройки сохранены.\n\nПерезапустить прокси сейчас?",
parent=root,
)
_finish()
if do_restart:
threading.Thread(target=lambda: restart_proxy(_config, _show_error), daemon=True).start()
root.protocol("WM_DELETE_WINDOW", _finish)
install_tray_config_buttons(ctk, footer, theme, on_save=on_save, on_cancel=_finish)
ctk_run_dialog(_build)
# first run
def _show_first_run() -> None:
ensure_dirs()
def _show_first_run():
_ensure_dirs()
if FIRST_RUN_MARKER.exists():
return
if not ensure_ctk_thread(ctk):
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
if ctk is None:
FIRST_RUN_MARKER.touch()
return
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
secret = _config.get("secret", DEFAULT_CONFIG["secret"])
theme = ctk_theme_for_platform()
w, h = FIRST_RUN_SIZE
def _build(done: threading.Event) -> None:
theme = ctk_theme_for_platform()
w, h = FIRST_RUN_SIZE
root = create_ctk_toplevel(
ctk, title="TG WS Proxy", width=w, height=h, theme=theme,
after_create=_apply_window_icon,
)
root = create_ctk_root(
ctk,
title="TG WS Proxy",
width=w,
height=h,
theme=theme,
after_create=_apply_linux_ctk_window_icon,
)
def on_done(open_tg: bool) -> None:
FIRST_RUN_MARKER.touch()
root.destroy()
done.set()
if open_tg:
_on_open_in_telegram()
def on_done(open_tg: bool):
FIRST_RUN_MARKER.touch()
root.destroy()
if open_tg:
_on_open_in_telegram()
populate_first_run_window(ctk, root, theme, host=host, port=port, secret=secret, on_done=on_done)
populate_first_run_window(
ctk, root, theme, host=host, port=port, on_done=on_done)
ctk_run_dialog(_build)
try:
root.mainloop()
finally:
import tkinter as tk
try:
if root.winfo_exists():
root.destroy()
except tk.TclError:
pass
# tray menu
def _has_ipv6_enabled() -> bool:
import socket as _sock
try:
addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6)
for addr in addrs:
ip = addr[4][0]
if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"):
return True
except Exception:
pass
try:
s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM)
s.bind(("::1", 0))
s.close()
return True
except Exception:
return False
def _check_ipv6_warning():
_ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
if not _has_ipv6_enabled():
return
IPV6_WARN_MARKER.touch()
threading.Thread(target=_show_ipv6_dialog, daemon=True).start()
def _show_ipv6_dialog():
_show_info(
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает или в логах присутствуют ошибки, "
"связанные с попытками подключения по IPv6 - "
"попробуйте отключить в настройках прокси Telegram попытку соединения "
"по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 "
"в системе.\n\n"
"Это предупреждение будет показано только один раз.",
"TG WS Proxy",
)
def _build_menu():
if pystray is None:
return None
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
link_host = tg_ws_proxy.get_link_host(host)
return pystray.Menu(
pystray.MenuItem(f"Открыть в Telegram ({link_host}:{port})", _on_open_in_telegram, default=True),
pystray.MenuItem("Скопировать ссылку", _on_copy_link),
pystray.MenuItem(
f"Открыть в Telegram ({host}:{port})", _on_open_in_telegram, default=True
),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Перезапустить прокси", _on_restart),
pystray.MenuItem("Настройки...", _on_edit_config),
@ -240,18 +521,21 @@ def _build_menu():
)
# entry point
def run_tray() -> None:
def run_tray():
global _tray_icon, _config
_config = load_config()
bootstrap(_config)
_config = _runtime.prepare()
_runtime.reset_log_file()
setup_logging(_config.get("verbose", False),
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]))
log.info("TG WS Proxy версия %s, tray app starting", __version__)
log.info("Config: %s", _config)
log.info("Log file: %s", LOG_FILE)
if pystray is None or Image is None:
log.error("pystray or Pillow not installed; running in console mode")
start_proxy(_config, _show_error)
start_proxy()
try:
while True:
time.sleep(1)
@ -259,12 +543,16 @@ def run_tray() -> None:
stop_proxy()
return
start_proxy(_config, _show_error)
maybe_notify_update(_config, lambda: _exiting, _ask_yes_no)
_show_first_run()
check_ipv6_warning(_show_info)
start_proxy()
_maybe_notify_update_async()
_show_first_run()
_check_ipv6_warning()
icon_image = _load_icon()
_tray_icon = pystray.Icon(APP_NAME, icon_image, "TG WS Proxy", menu=_build_menu())
_tray_icon = pystray.Icon(APP_NAME, load_icon(), "TG WS Proxy", menu=_build_menu())
log.info("Tray icon running")
_tray_icon.run()
@ -272,14 +560,15 @@ def run_tray() -> None:
log.info("Tray app exited")
def main() -> None:
if not acquire_lock("linux.py"):
def main():
if not _acquire_lock():
_show_info("Приложение уже запущено.", os.path.basename(sys.argv[0]))
return
try:
run_tray()
finally:
release_lock()
_release_lock()
if __name__ == "__main__":

576
macos.py
View File

@ -1,6 +1,10 @@
from __future__ import annotations
import json
import logging
import logging.handlers
import os
import psutil
import subprocess
import sys
import threading
@ -25,189 +29,241 @@ except ImportError:
pyperclip = None
import proxy.tg_ws_proxy as tg_ws_proxy
from proxy.app_runtime import ProxyAppRuntime
from proxy import __version__
from utils.default_config import default_tray_config
from utils.tray_common import (
APP_DIR, APP_NAME, DEFAULT_CONFIG, FIRST_RUN_MARKER, IPV6_WARN_MARKER,
LOG_FILE, acquire_lock, apply_proxy_config, ensure_dirs, load_config,
log, release_lock, save_config, setup_logging, stop_proxy, tg_proxy_url,
)
APP_NAME = "TgWsProxy"
APP_DIR = Path.home() / "Library" / "Application Support" / APP_NAME
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
MENUBAR_ICON_PATH = APP_DIR / "menubar_icon.png"
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[object] = None
DEFAULT_CONFIG = default_tray_config()
_app: Optional[object] = None
_config: dict = {}
_exiting: bool = False
_lock_file_path: Optional[Path] = None
# osascript dialogs
log = logging.getLogger("tg-ws-tray")
_runtime = ProxyAppRuntime(
APP_DIR,
default_config=DEFAULT_CONFIG,
logger_name="tg-ws-tray",
on_error=lambda text: _show_error(text),
)
CONFIG_FILE = _runtime.config_file
LOG_FILE = _runtime.log_file
def _esc(text: str) -> str:
return text.replace("\\", "\\\\").replace('"', '\\"')
# Single-instance lock
def _osascript(script: str) -> str:
r = subprocess.run(["osascript", "-e", script], capture_output=True, text=True)
return r.stdout.strip()
def _show_error(text: str, title: str = "TG WS Proxy") -> None:
_osascript(
f'display dialog "{_esc(text)}" with title "{_esc(title)}" '
f'buttons {{"OK"}} default button "OK" with icon stop'
)
def _show_info(text: str, title: str = "TG WS Proxy") -> None:
_osascript(
f'display dialog "{_esc(text)}" with title "{_esc(title)}" '
f'buttons {{"OK"}} default button "OK" with icon note'
)
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
return _ask_yes_no_close(text, title) is True
def _ask_yes_no_close(text: str, title: str = "TG WS Proxy") -> Optional[bool]:
r = subprocess.run(
[
"osascript", "-e",
f'button returned of (display dialog "{_esc(text)}" '
f'with title "{_esc(title)}" '
f'buttons {{"Закрыть", "Нет", "Да"}} '
f'default button "Да" cancel button "Закрыть" with icon note)',
],
capture_output=True, text=True,
)
if r.returncode != 0:
return None
btn = r.stdout.strip()
if btn == "Да":
return True
if btn == "Нет":
def _same_process(lock_meta: dict, proc: psutil.Process) -> bool:
try:
lock_ct = float(lock_meta.get("create_time", 0.0))
proc_ct = float(proc.create_time())
if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0:
return False
except Exception:
return False
return None
frozen = bool(getattr(sys, "frozen", False))
if frozen:
return APP_NAME.lower() in proc.name().lower()
return False
def _osascript_input(prompt: str, default: str, title: str = "TG WS Proxy") -> Optional[str]:
r = subprocess.run(
[
"osascript", "-e",
f'text returned of (display dialog "{_esc(prompt)}" '
f'default answer "{_esc(default)}" '
f'with title "{_esc(title)}" '
f'buttons {{"Закрыть", "OK"}} '
f'default button "OK" cancel button "Закрыть")',
],
capture_output=True, text=True,
)
if r.returncode != 0:
return None
return r.stdout.rstrip("\r\n")
def _release_lock():
global _lock_file_path
if not _lock_file_path:
return
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
# menubar icon
def _acquire_lock() -> bool:
global _lock_file_path
_ensure_dirs()
lock_files = list(APP_DIR.glob("*.lock"))
for f in lock_files:
pid = None
meta: dict = {}
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
meta = {}
try:
proc = psutil.Process(pid)
if _same_process(meta, proc):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
payload = {"create_time": proc.create_time()}
lock_file.write_text(json.dumps(payload, ensure_ascii=False),
encoding="utf-8")
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
# Filesystem helpers
def _ensure_dirs():
_runtime.ensure_dirs()
def load_config() -> dict:
return _runtime.load_config()
def save_config(cfg: dict):
_runtime.save_config(cfg)
def setup_logging(verbose: bool = False, log_max_mb: float = 5):
_runtime.setup_logging(verbose, log_max_mb=log_max_mb)
# Menubar icon
def _make_menubar_icon(size: int = 44):
if Image is None:
return None
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = size // 11
draw.ellipse([margin, margin, size - margin, size - margin], fill=(0, 0, 0, 255))
draw.ellipse([margin, margin, size - margin, size - margin],
fill=(0, 0, 0, 255))
try:
font = ImageFont.truetype("/System/Library/Fonts/Helvetica.ttc", size=int(size * 0.55))
font = ImageFont.truetype(
"/System/Library/Fonts/Helvetica.ttc",
size=int(size * 0.55))
except Exception:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
draw.text(
((size - tw) // 2 - bbox[0], (size - th) // 2 - bbox[1]),
"T", fill=(255, 255, 255, 255), font=font,
)
tx = (size - tw) // 2 - bbox[0]
ty = (size - th) // 2 - bbox[1]
draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font)
return img
def _ensure_menubar_icon() -> None:
# Generate menubar icon PNG if it does not exist.
def _ensure_menubar_icon():
if MENUBAR_ICON_PATH.exists():
return
ensure_dirs()
_ensure_dirs()
img = _make_menubar_icon(44)
if img:
img.save(str(MENUBAR_ICON_PATH), "PNG")
# proxy lifecycle (macOS-local)
# Native macOS dialogs
import asyncio as _asyncio
def _escape_osascript_text(text: str) -> str:
return text.replace('\\', '\\\\').replace('"', '\\"')
def _run_proxy_thread() -> None:
global _async_stop
loop = _asyncio.new_event_loop()
_asyncio.set_event_loop(loop)
stop_ev = _asyncio.Event()
_async_stop = (loop, stop_ev)
try:
loop.run_until_complete(tg_ws_proxy._run(stop_event=stop_ev))
except Exception as exc:
log.error("Proxy thread crashed: %s", exc)
if "Address already in use" in str(exc):
_show_error(
"Не удалось запустить прокси:\n"
"Порт уже используется другим приложением.\n\n"
"Закройте приложение, использующее этот порт, "
"или измените порт в настройках прокси и перезапустите."
)
finally:
loop.close()
_async_stop = None
def _osascript(script: str) -> str:
r = subprocess.run(
['osascript', '-e', script],
capture_output=True, text=True)
return r.stdout.strip()
def _start_proxy() -> None:
global _proxy_thread
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
if not apply_proxy_config(_config):
_show_error("Ошибка конфигурации DC → IP.")
return
pc = tg_ws_proxy.proxy_config
log.info("Starting proxy on %s:%d ...", pc.host, pc.port)
_proxy_thread = threading.Thread(target=_run_proxy_thread, daemon=True, name="proxy")
_proxy_thread.start()
def _show_error(text: str, title: str = "TG WS Proxy"):
text_esc = _escape_osascript_text(text)
title_esc = _escape_osascript_text(title)
_osascript(
f'display dialog "{text_esc}" with title "{title_esc}" '
f'buttons {{"OK"}} default button "OK" with icon stop')
def _stop_proxy() -> None:
global _proxy_thread, _async_stop
if _async_stop:
loop, stop_ev = _async_stop
loop.call_soon_threadsafe(stop_ev.set)
if _proxy_thread:
_proxy_thread.join(timeout=2)
_proxy_thread = None
log.info("Proxy stopped")
def _show_info(text: str, title: str = "TG WS Proxy"):
text_esc = _escape_osascript_text(text)
title_esc = _escape_osascript_text(title)
_osascript(
f'display dialog "{text_esc}" with title "{title_esc}" '
f'buttons {{"OK"}} default button "OK" with icon note')
def _restart_proxy() -> None:
log.info("Restarting proxy...")
_stop_proxy()
time.sleep(0.3)
_start_proxy()
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
result = _ask_yes_no_close(text, title)
return result is True
# menu callbacks
def _ask_yes_no_close(text: str,
title: str = "TG WS Proxy") -> Optional[bool]:
text_esc = _escape_osascript_text(text)
title_esc = _escape_osascript_text(title)
r = subprocess.run(
['osascript', '-e',
f'button returned of (display dialog "{text_esc}" '
f'with title "{title_esc}" '
f'buttons {{"Закрыть", "Нет", "Да"}} '
f'default button "Да" cancel button "Закрыть" with icon note)'],
capture_output=True, text=True)
if r.returncode != 0:
return None
result = r.stdout.strip()
if result == "Да":
return True
if result == "Нет":
return False
return None
def _on_open_in_telegram(_=None) -> None:
url = tg_proxy_url(_config)
# Proxy lifecycle
def start_proxy():
_runtime.start_proxy(_config)
def stop_proxy():
_runtime.stop_proxy()
def restart_proxy():
_runtime.restart_proxy()
# Menu callbacks
def _on_open_in_telegram(_=None):
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
url = f"tg://socks?server={host}&port={port}"
log.info("Opening %s", url)
try:
result = subprocess.call(["open", url])
result = subprocess.call(['open', url])
if result != 0:
raise RuntimeError("open command failed")
except Exception:
@ -221,58 +277,67 @@ def _on_open_in_telegram(_=None) -> None:
if pyperclip:
pyperclip.copy(url)
else:
subprocess.run(["pbcopy"], input=url.encode(), check=True)
subprocess.run(['pbcopy'], input=url.encode(),
check=True)
_show_info(
"Не удалось открыть Telegram автоматически.\n\n"
f"Ссылка скопирована в буфер обмена:\n{url}"
)
f"Ссылка скопирована в буфер обмена:\n{url}")
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_copy_link(_=None) -> None:
url = tg_proxy_url(_config)
log.info("Copying link: %s", url)
try:
if pyperclip:
pyperclip.copy(url)
else:
subprocess.run(["pbcopy"], input=url.encode(), check=True)
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_restart(_=None) -> None:
def _do():
def _on_restart(_=None):
def _do_restart():
global _config
_config = load_config()
if _app:
_app.update_menu_title()
_restart_proxy()
restart_proxy()
threading.Thread(target=_do, daemon=True).start()
threading.Thread(target=_do_restart, daemon=True).start()
def _on_open_logs(_=None) -> None:
def _on_open_logs(_=None):
log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists():
subprocess.call(["open", str(LOG_FILE)])
subprocess.call(['open', str(LOG_FILE)])
else:
_show_info("Файл логов ещё не создан.")
# Show a native text input dialog. Returns None if cancelled.
def _osascript_input(prompt: str, default: str,
title: str = "TG WS Proxy") -> Optional[str]:
prompt_esc = _escape_osascript_text(prompt)
default_esc = _escape_osascript_text(default)
title_esc = _escape_osascript_text(title)
r = subprocess.run(
['osascript', '-e',
f'text returned of (display dialog "{prompt_esc}" '
f'default answer "{default_esc}" '
f'with title "{title_esc}" '
f'buttons {{"Закрыть", "OK"}} '
f'default button "OK" cancel button "Закрыть")'],
capture_output=True, text=True)
if r.returncode != 0:
return None
return r.stdout.rstrip("\r\n")
def _on_edit_config(_=None) -> None:
def _on_edit_config(_=None):
threading.Thread(target=_edit_config_dialog, daemon=True).start()
def _check_updates_menu_title() -> str:
on = bool(_config.get("check_updates", True))
return "✓ Проверять обновления при запуске" if on else "Проверять обновления при запуске (выкл)"
return (
"✓ Проверять обновления при запуске"
if on
else "Проверять обновления при запуске (выкл)"
)
def _toggle_check_updates(_=None) -> None:
def _toggle_check_updates(_=None):
global _config
_config["check_updates"] = not bool(_config.get("check_updates", True))
save_config(_config)
@ -280,15 +345,12 @@ def _toggle_check_updates(_=None) -> None:
_app._check_updates_item.title = _check_updates_menu_title()
def _on_open_release_page(_=None) -> None:
def _on_open_release_page(_=None):
from utils.update_check import RELEASES_PAGE_URL
webbrowser.open(RELEASES_PAGE_URL)
# update check
def _maybe_notify_update_async() -> None:
def _maybe_notify_update_async():
def _work():
time.sleep(1.5)
if _exiting:
@ -304,7 +366,8 @@ def _maybe_notify_update_async() -> None:
url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ver = st.get("latest") or "?"
if _ask_yes_no(
f"Доступна новая версия: {ver}\n\nОткрыть страницу релиза в браузере?",
f"Доступна новая версия: {ver}\n\n"
f"Открыть страницу релиза в браузере?",
"TG WS Proxy — обновление",
):
webbrowser.open(url)
@ -314,16 +377,18 @@ def _maybe_notify_update_async() -> None:
threading.Thread(target=_work, daemon=True, name="update-check").start()
# settings dialog
def _edit_config_dialog() -> None:
# Settings via native macOS dialogs
def _edit_config_dialog():
cfg = load_config()
host = _osascript_input("IP-адрес прокси:", cfg.get("host", DEFAULT_CONFIG["host"]))
# Host
host = _osascript_input(
"IP-адрес прокси:",
cfg.get("host", DEFAULT_CONFIG["host"]))
if host is None:
return
host = host.strip()
import socket as _sock
try:
_sock.inet_aton(host)
@ -331,7 +396,10 @@ def _edit_config_dialog() -> None:
_show_error("Некорректный IP-адрес.")
return
port_str = _osascript_input("Порт прокси:", str(cfg.get("port", DEFAULT_CONFIG["port"])))
# Port
port_str = _osascript_input(
"Порт прокси:",
str(cfg.get("port", DEFAULT_CONFIG["port"])))
if port_str is None:
return
try:
@ -342,49 +410,42 @@ def _edit_config_dialog() -> None:
_show_error("Порт должен быть числом 1-65535")
return
secret_str = _osascript_input(
"MTProto Secret (32 hex символа):", cfg.get("secret", DEFAULT_CONFIG["secret"])
)
if secret_str is None:
return
secret_str = secret_str.strip().lower()
if len(secret_str) != 32 or not all(c in "0123456789abcdef" for c in secret_str):
_show_error("Secret должен быть строкой из 32 шестнадцатеричных символов.")
return
# DC-IP mappings
dc_default = ", ".join(cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"]))
dc_str = _osascript_input(
"DC → IP маппинги (через запятую, формат DC:IP):\n"
"Например: 2:149.154.167.220, 4:149.154.167.220",
dc_default,
)
dc_default)
if dc_str is None:
return
dc_lines = [s.strip() for s in dc_str.replace(",", "\n").splitlines() if s.strip()]
dc_lines = [s.strip() for s in dc_str.replace(',', '\n').splitlines()
if s.strip()]
try:
tg_ws_proxy.parse_dc_ip_list(dc_lines)
except ValueError as e:
_show_error(str(e))
return
# Verbose
verbose = _ask_yes_no_close("Включить подробное логирование (verbose)?")
if verbose is None:
return
# Advanced settings
adv_str = _osascript_input(
"Расширенные настройки (буфер KB, WS пул, лог MB):\n"
"Формат: buf_kb,pool_size,log_max_mb",
f"{cfg.get('buf_kb', DEFAULT_CONFIG['buf_kb'])},"
f"{cfg.get('pool_size', DEFAULT_CONFIG['pool_size'])},"
f"{cfg.get('log_max_mb', DEFAULT_CONFIG['log_max_mb'])}",
)
f"{cfg.get('log_max_mb', DEFAULT_CONFIG['log_max_mb'])}")
if adv_str is None:
return
adv = {}
if adv_str:
parts = [s.strip() for s in adv_str.split(",")]
keys = [("buf_kb", int), ("pool_size", int), ("log_max_mb", float)]
parts = [s.strip() for s in adv_str.split(',')]
keys = [("buf_kb", int), ("pool_size", int),
("log_max_mb", float)]
for i, (k, typ) in enumerate(keys):
if i < len(parts):
try:
@ -395,13 +456,11 @@ def _edit_config_dialog() -> None:
new_cfg = {
"host": host,
"port": port,
"secret": secret_str,
"dc_ip": dc_lines,
"verbose": verbose,
"buf_kb": adv.get("buf_kb", cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])),
"pool_size": adv.get("pool_size", cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])),
"log_max_mb": adv.get("log_max_mb", cfg.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"])),
"check_updates": cfg.get("check_updates", True),
}
save_config(new_cfg)
log.info("Config saved: %s", new_cfg)
@ -411,23 +470,21 @@ def _edit_config_dialog() -> None:
if _app:
_app.update_menu_title()
if _ask_yes_no_close("Настройки сохранены.\n\nПерезапустить прокси сейчас?"):
_restart_proxy()
if _ask_yes_no_close(
"Настройки сохранены.\n\nПерезапустить прокси сейчас?"):
restart_proxy()
# first run & ipv6
# First-run & IPv6 dialogs
def _show_first_run() -> None:
ensure_dirs()
def _show_first_run():
_ensure_dirs()
if FIRST_RUN_MARKER.exists():
return
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
secret = _config.get("secret", DEFAULT_CONFIG["secret"])
tg_url = tg_proxy_url(_config)
link_host = tg_ws_proxy.get_link_host(host)
tg_url = f"tg://socks?server={host}&port={port}"
text = (
f"Прокси запущен и работает в строке меню.\n\n"
@ -437,54 +494,54 @@ def _show_first_run() -> None:
f" Или ссылка: {tg_url}\n\n"
f"Вручную:\n"
f" Настройки → Продвинутые → Тип подключения → Прокси\n"
f" MTProto → {link_host} : {port} \n"
f" Secret: dd{secret} \n\n"
f" SOCKS5 → {host} : {port} (без логина/пароля)\n\n"
f"Открыть прокси в Telegram сейчас?"
)
FIRST_RUN_MARKER.touch()
if _ask_yes_no(text, "TG WS Proxy"):
_on_open_in_telegram()
def _check_ipv6_warning() -> None:
ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
def _has_ipv6_enabled() -> bool:
import socket as _sock
has = False
try:
for addr in _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6):
addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6)
for addr in addrs:
ip = addr[4][0]
if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"):
has = True
break
if ip and not ip.startswith('::1') and not ip.startswith('fe80::1'):
return True
except Exception:
pass
if not has:
try:
s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM)
s.bind(("::1", 0))
s.close()
has = True
except Exception:
pass
if not has:
try:
s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM)
s.bind(('::1', 0))
s.close()
return True
except Exception:
return False
def _check_ipv6_warning():
_ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
if not _has_ipv6_enabled():
return
IPV6_WARN_MARKER.touch()
_show_info(
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает, попробуйте отключить "
"попытку соединения по IPv6 в настройках прокси Telegram.\n\n"
"Это предупреждение будет показано только один раз."
)
"Это предупреждение будет показано только один раз.")
# rumps app
# rumps menubar app
_TgWsProxyAppBase = rumps.App if rumps else object
@ -492,26 +549,33 @@ _TgWsProxyAppBase = rumps.App if rumps else object
class TgWsProxyApp(_TgWsProxyAppBase):
def __init__(self):
_ensure_menubar_icon()
icon_path = str(MENUBAR_ICON_PATH) if MENUBAR_ICON_PATH.exists() else None
icon_path = (str(MENUBAR_ICON_PATH)
if MENUBAR_ICON_PATH.exists() else None)
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
link_host = tg_ws_proxy.get_link_host(host)
self._open_tg_item = rumps.MenuItem(
f"Открыть в Telegram ({link_host}:{port})", callback=_on_open_in_telegram
)
self._copy_link_item = rumps.MenuItem("Скопировать ссылку", callback=_on_copy_link)
self._restart_item = rumps.MenuItem("Перезапустить прокси", callback=_on_restart)
self._settings_item = rumps.MenuItem("Настройки...", callback=_on_edit_config)
self._logs_item = rumps.MenuItem("Открыть логи", callback=_on_open_logs)
f"Открыть в Telegram ({host}:{port})",
callback=_on_open_in_telegram)
self._restart_item = rumps.MenuItem(
"Перезапустить прокси",
callback=_on_restart)
self._settings_item = rumps.MenuItem(
"Настройки...",
callback=_on_edit_config)
self._logs_item = rumps.MenuItem(
"Открыть логи",
callback=_on_open_logs)
self._release_page_item = rumps.MenuItem(
"Страница релиза на GitHub…", callback=_on_open_release_page
)
"Страница релиза на GitHub…",
callback=_on_open_release_page)
self._check_updates_item = rumps.MenuItem(
_check_updates_menu_title(), callback=_toggle_check_updates
)
self._version_item = rumps.MenuItem(f"Версия {__version__}", callback=lambda _: None)
_check_updates_menu_title(),
callback=_toggle_check_updates)
self._version_item = rumps.MenuItem(
f"Версия {__version__}",
callback=lambda _: None)
super().__init__(
"TG WS Proxy",
@ -520,7 +584,6 @@ class TgWsProxyApp(_TgWsProxyAppBase):
quit_button="Выход",
menu=[
self._open_tg_item,
self._copy_link_item,
None,
self._restart_item,
self._settings_item,
@ -530,51 +593,41 @@ class TgWsProxyApp(_TgWsProxyAppBase):
self._check_updates_item,
None,
self._version_item,
],
)
])
def update_menu_title(self) -> None:
def update_menu_title(self):
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
link_host = tg_ws_proxy.get_link_host(host)
self._open_tg_item.title = f"Открыть в Telegram ({link_host}:{port})"
self._open_tg_item.title = (
f"Открыть в Telegram ({host}:{port})")
# entry point
def run_menubar() -> None:
def run_menubar():
global _app, _config
_config = load_config()
save_config(_config)
_config = _runtime.prepare()
_runtime.reset_log_file()
if LOG_FILE.exists():
try:
LOG_FILE.unlink()
except Exception:
pass
setup_logging(
_config.get("verbose", False),
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]),
)
setup_logging(_config.get("verbose", False),
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]))
log.info("TG WS Proxy версия %s, menubar app starting", __version__)
log.info("Config: %s", _config)
log.info("Log file: %s", LOG_FILE)
if rumps is None or Image is None:
log.error("rumps or Pillow not installed; running in console mode")
_start_proxy()
start_proxy()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
_stop_proxy()
stop_proxy()
return
_start_proxy()
start_proxy()
_maybe_notify_update_async()
_show_first_run()
_check_ipv6_warning()
@ -582,18 +635,19 @@ def run_menubar() -> None:
log.info("Menubar app running")
_app.run()
_stop_proxy()
stop_proxy()
log.info("Menubar app exited")
def main() -> None:
if not acquire_lock("macos.py"):
def main():
if not _acquire_lock():
_show_info("Приложение уже запущено.")
return
try:
run_menubar()
finally:
release_lock()
_release_lock()
if __name__ == "__main__":

View File

@ -1 +1 @@
__version__ = "1.4.0"
__version__ = "1.3.0"

View File

@ -4,7 +4,6 @@ import asyncio as _asyncio
import json
import logging
import logging.handlers
import os
import sys
import threading
import time
@ -15,9 +14,8 @@ import proxy.tg_ws_proxy as tg_ws_proxy
DEFAULT_CONFIG = {
"port": 1443,
"port": 1080,
"host": "127.0.0.1",
"secret": os.urandom(16).hex(),
"dc_ip": ["2:149.154.167.220", "4:149.154.167.220"],
"log_max_mb": 5,
"buf_kb": 256,
@ -50,27 +48,6 @@ class ProxyAppRuntime:
self._proxy_thread = None
self._async_stop = None
def _build_core_config(self, active_cfg: dict, dc_opt: Dict[int, str]):
port = int(active_cfg.get("port", self.default_config["port"]))
host = str(active_cfg.get("host", self.default_config["host"]))
secret = str(active_cfg.get("secret") or "").strip()
if not secret:
secret = os.urandom(16).hex()
active_cfg["secret"] = secret
buf_kb = int(active_cfg.get("buf_kb", self.default_config["buf_kb"]))
pool_size = int(active_cfg.get(
"pool_size", self.default_config["pool_size"]))
return tg_ws_proxy.ProxyConfig(
port=port,
host=host,
secret=secret,
dc_redirects=dc_opt,
buffer_size=max(4, buf_kb) * 1024,
pool_size=max(0, pool_size),
)
def ensure_dirs(self):
self.app_dir.mkdir(parents=True, exist_ok=True)
@ -155,19 +132,17 @@ class ProxyAppRuntime:
self._async_stop = (loop, stop_ev)
try:
loop.run_until_complete(self.run_proxy(stop_event=stop_ev))
loop.run_until_complete(
self.run_proxy(port, dc_opt, stop_event=stop_ev, host=host))
except Exception as exc:
self.log.error("Proxy thread crashed: %s", exc)
exc_text = str(exc)
if ("10048" in exc_text or
"address already in use" in exc_text.lower()):
if ("10048" in str(exc) or
"Address already in use" in str(exc)):
self._emit_error(
"Не удалось запустить прокси:\n"
"Порт уже используется другим приложением.\n\n"
"Закройте приложение, использующее этот порт, "
"или измените порт в настройках прокси и перезапустите.")
else:
self._emit_error(str(exc) or exc.__class__.__name__)
finally:
loop.close()
self._async_stop = None
@ -193,9 +168,6 @@ class ProxyAppRuntime:
self._emit_error("Ошибка конфигурации:\n%s" % exc)
return False
tg_ws_proxy.proxy_config = self._build_core_config(active_cfg, dc_opt)
self.save_config(active_cfg)
self.log.info("Starting proxy on %s:%d ...", host, port)
tg_ws_proxy._RECV_BUF = max(4, buf_kb) * 1024
tg_ws_proxy._SEND_BUF = tg_ws_proxy._RECV_BUF

File diff suppressed because it is too large Load Diff

View File

@ -22,7 +22,7 @@ keywords = [
"proxy",
"bypass",
"websocket",
"mtproto",
"socks5",
]
classifiers = [
"Development Status :: 5 - Production/Stable",

View File

@ -1,7 +1,6 @@
import sys
import unittest
import json
import subprocess
from pathlib import Path
@ -108,8 +107,7 @@ class AndroidProxyBridgeTests(unittest.TestCase):
log_path = android_proxy_bridge.start_proxy(
"/tmp/app",
"127.0.0.1",
1443,
"0123456789abcdef0123456789abcdef",
1080,
["2:149.154.167.220"],
7.0,
512,
@ -120,7 +118,6 @@ class AndroidProxyBridgeTests(unittest.TestCase):
android_proxy_bridge.ProxyAppRuntime = original_runtime
self.assertEqual(log_path, "/tmp/proxy.log")
self.assertEqual(captured["config"]["secret"], "0123456789abcdef0123456789abcdef")
self.assertEqual(captured["config"]["log_max_mb"], 7.0)
self.assertEqual(captured["config"]["buf_kb"], 512)
self.assertEqual(captured["config"]["pool_size"], 6)
@ -227,38 +224,6 @@ class AndroidProxyBridgeTests(unittest.TestCase):
self.assertEqual(result["error"], "")
self.assertEqual(result["html_url"], "https://example.com/releases/latest")
def test_android_bridge_import_and_update_status_work_without_cryptography(self):
root = Path(__file__).resolve().parents[1]
script = f"""
import importlib.abc
import sys
from pathlib import Path
root = Path({str(root)!r})
sys.path.insert(0, str(root / "android" / "app" / "src" / "main" / "python"))
sys.path.insert(0, str(root))
class BlockCryptography(importlib.abc.MetaPathFinder):
def find_spec(self, fullname, path, target=None):
if fullname == "cryptography" or fullname.startswith("cryptography."):
raise ModuleNotFoundError("No module named 'cryptography'")
return None
sys.meta_path.insert(0, BlockCryptography())
import android_proxy_bridge
print(android_proxy_bridge.get_update_status_json(False))
"""
result = subprocess.run(
[sys.executable, "-c", script],
check=True,
capture_output=True,
text=True,
)
payload = json.loads(result.stdout.strip())
self.assertEqual(payload["current_version"], android_proxy_bridge.__version__)
self.assertEqual(payload["error"], "")
if __name__ == "__main__":
unittest.main()

View File

@ -116,51 +116,6 @@ class ProxyAppRuntimeTests(unittest.TestCase):
self.assertFalse(started)
self.assertEqual(errors, ["Ошибка конфигурации:\nbad dc mapping"])
def test_run_proxy_thread_reports_generic_runtime_error(self):
with tempfile.TemporaryDirectory() as tmpdir:
errors = []
async def fake_run_proxy(stop_event=None):
raise RuntimeError("proxy boom")
runtime = ProxyAppRuntime(
Path(tmpdir),
on_error=errors.append,
run_proxy=fake_run_proxy,
)
runtime._run_proxy_thread(1443, {2: "149.154.167.220"}, "127.0.0.1")
self.assertEqual(errors, ["proxy boom"])
def test_run_proxy_thread_reports_port_in_use_case_insensitively(self):
with tempfile.TemporaryDirectory() as tmpdir:
errors = []
async def fake_run_proxy(stop_event=None):
raise RuntimeError(
"[Errno 98] error while attempting to bind on address "
"('127.0.0.1', 1443): address already in use"
)
runtime = ProxyAppRuntime(
Path(tmpdir),
on_error=errors.append,
run_proxy=fake_run_proxy,
)
runtime._run_proxy_thread(1443, {2: "149.154.167.220"}, "127.0.0.1")
self.assertEqual(
errors,
[
"Не удалось запустить прокси:\n"
"Порт уже используется другим приложением.\n\n"
"Закройте приложение, использующее этот порт, "
"или измените порт в настройках прокси и перезапустите."
],
)
if __name__ == "__main__":
unittest.main()

View File

@ -1,55 +1,38 @@
import hashlib
import struct
import unittest
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from proxy.crypto_backend import create_aes_ctr_transform
from proxy.tg_ws_proxy import (
PROTO_ABRIDGED_INT,
PROTO_TAG_ABRIDGED,
_MsgSplitter,
_generate_relay_init,
_try_handshake,
)
from proxy.tg_ws_proxy import _MsgSplitter, _dc_from_init, _patch_init_dc
KEY = bytes(range(32))
IV = bytes(range(16))
SECRET = bytes.fromhex("0123456789abcdef0123456789abcdef")
PROTO_TAG = 0xEFEFEFEF
def _xor(left: bytes, right: bytes) -> bytes:
return bytes(a ^ b for a, b in zip(left, right))
def _keystream(size: int, key: bytes, iv: bytes) -> bytes:
transform = Cipher(algorithms.AES(key), modes.CTR(iv)).encryptor()
return transform.update(b"\x00" * size)
def _keystream(size: int) -> bytes:
transform = create_aes_ctr_transform(KEY, IV)
return transform.update(b"\x00" * size) + transform.finalize()
def _build_client_handshake(
dc_raw: int,
proto_tag: bytes = PROTO_TAG_ABRIDGED,
secret: bytes = SECRET,
) -> bytes:
def _build_init_packet(dc_raw: int, proto: int = PROTO_TAG) -> bytes:
packet = bytearray(64)
packet[8:40] = KEY
packet[40:56] = IV
dec_key = hashlib.sha256(KEY + secret).digest()
plain_tail = proto_tag + struct.pack("<h", dc_raw) + b"\x00\x00"
packet[56:64] = _xor(plain_tail, _keystream(64, dec_key, IV)[56:64])
plain_tail = struct.pack("<Ih", proto, dc_raw) + b"\x00\x00"
packet[56:64] = _xor(plain_tail, _keystream(64)[56:64])
return bytes(packet)
def _encrypt_after_init(relay_init: bytes, plaintext: bytes) -> bytes:
transform = Cipher(
algorithms.AES(relay_init[8:40]),
modes.CTR(relay_init[40:56]),
).encryptor()
def _encrypt_after_init(init_packet: bytes, plaintext: bytes) -> bytes:
transform = create_aes_ctr_transform(init_packet[8:40], init_packet[40:56])
transform.update(b"\x00" * 64)
return transform.update(plaintext)
return transform.update(plaintext) + transform.finalize()
class CryptoBackendTests(unittest.TestCase):
@ -80,60 +63,42 @@ class CryptoBackendTests(unittest.TestCase):
create_aes_ctr_transform(KEY, IV, backend="missing")
class MtProtoHandshakeTests(unittest.TestCase):
def test_try_handshake_reads_non_media_dc(self):
handshake = _build_client_handshake(dc_raw=2)
class MtProtoInitTests(unittest.TestCase):
def test_dc_from_init_reads_non_media_dc(self):
init_packet = _build_init_packet(dc_raw=2)
result = _try_handshake(handshake, SECRET)
self.assertEqual(_dc_from_init(init_packet), (2, False))
self.assertEqual(result[:3], (2, False, PROTO_TAG_ABRIDGED))
def test_dc_from_init_reads_media_dc(self):
init_packet = _build_init_packet(dc_raw=-4)
def test_try_handshake_reads_media_dc(self):
handshake = _build_client_handshake(dc_raw=-4)
self.assertEqual(_dc_from_init(init_packet), (4, True))
result = _try_handshake(handshake, SECRET)
def test_patch_init_dc_updates_signed_dc_and_preserves_tail(self):
original = _build_init_packet(dc_raw=99) + b"tail"
self.assertEqual(result[:3], (4, True, PROTO_TAG_ABRIDGED))
patched = _patch_init_dc(original, -3)
def test_try_handshake_rejects_wrong_secret(self):
handshake = _build_client_handshake(dc_raw=2)
result = _try_handshake(
handshake,
bytes.fromhex("fedcba9876543210fedcba9876543210"),
)
self.assertIsNone(result)
def test_generate_relay_init_encodes_proto_and_signed_dc(self):
relay_init = _generate_relay_init(PROTO_TAG_ABRIDGED, -3)
decryptor = Cipher(
algorithms.AES(relay_init[8:40]),
modes.CTR(relay_init[40:56]),
).encryptor()
decrypted = decryptor.update(relay_init)
self.assertEqual(decrypted[56:60], PROTO_TAG_ABRIDGED)
self.assertEqual(struct.unpack("<h", decrypted[60:62])[0], -3)
self.assertEqual(_dc_from_init(patched[:64]), (3, True))
self.assertEqual(patched[64:], b"tail")
class MsgSplitterTests(unittest.TestCase):
def test_splitter_splits_multiple_abridged_messages(self):
relay_init = _generate_relay_init(PROTO_TAG_ABRIDGED, -2)
init_packet = _build_init_packet(dc_raw=-2)
plain_chunk = b"\x01abcd\x02EFGH1234"
encrypted_chunk = _encrypt_after_init(relay_init, plain_chunk)
encrypted_chunk = _encrypt_after_init(init_packet, plain_chunk)
parts = _MsgSplitter(relay_init, PROTO_ABRIDGED_INT).split(encrypted_chunk)
parts = _MsgSplitter(init_packet).split(encrypted_chunk)
self.assertEqual(parts, [encrypted_chunk[:5], encrypted_chunk[5:14]])
def test_splitter_leaves_single_message_intact(self):
relay_init = _generate_relay_init(PROTO_TAG_ABRIDGED, 2)
init_packet = _build_init_packet(dc_raw=2)
plain_chunk = b"\x02abcdefgh"
encrypted_chunk = _encrypt_after_init(relay_init, plain_chunk)
encrypted_chunk = _encrypt_after_init(init_packet, plain_chunk)
parts = _MsgSplitter(relay_init, PROTO_ABRIDGED_INT).split(encrypted_chunk)
parts = _MsgSplitter(init_packet).split(encrypted_chunk)
self.assertEqual(parts, [encrypted_chunk])

View File

@ -1,61 +1,128 @@
import hashlib
import struct
import asyncio
import socket
import unittest
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from unittest.mock import patch
from proxy.tg_ws_proxy import (
PROTO_TAG_ABRIDGED,
PROTO_TAG_INTERMEDIATE,
_generate_relay_init,
_try_handshake,
)
from proxy.tg_ws_proxy import _handle_client, _socks5_reply
KEY = bytes(range(32))
IV = bytes(range(16))
SECRET = bytes.fromhex("0123456789abcdef0123456789abcdef")
class _FakeTransport:
def get_extra_info(self, name):
return None
def get_write_buffer_size(self):
return 0
def _xor(left: bytes, right: bytes) -> bytes:
return bytes(a ^ b for a, b in zip(left, right))
class _FakeReader:
def __init__(self, payload: bytes):
self._payload = payload
self._offset = 0
async def readexactly(self, n: int) -> bytes:
end = self._offset + n
if end > len(self._payload):
partial = self._payload[self._offset:]
self._offset = len(self._payload)
raise asyncio.IncompleteReadError(partial, n)
chunk = self._payload[self._offset:end]
self._offset = end
return chunk
def _build_client_handshake(dc_raw: int, proto_tag: bytes) -> bytes:
packet = bytearray(64)
packet[8:40] = KEY
packet[40:56] = IV
class _FakeWriter:
def __init__(self):
self.transport = _FakeTransport()
self.writes = []
self.closed = False
self.close_calls = 0
dec_key = hashlib.sha256(KEY + SECRET).digest()
decryptor = Cipher(algorithms.AES(dec_key), modes.CTR(IV)).encryptor()
keystream = decryptor.update(b"\x00" * 64)
def get_extra_info(self, name):
if name == "peername":
return ("127.0.0.1", 50000)
return None
plain_tail = proto_tag + struct.pack("<h", dc_raw) + b"\x00\x00"
packet[56:64] = _xor(plain_tail, keystream[56:64])
return bytes(packet)
def write(self, data: bytes):
self.writes.append(data)
async def drain(self):
return None
def close(self):
self.closed = True
self.close_calls += 1
async def wait_closed(self):
return None
class MtProtoProtocolTests(unittest.TestCase):
def test_try_handshake_accepts_abridged_proto(self):
handshake = _build_client_handshake(2, PROTO_TAG_ABRIDGED)
def _ipv4_connect_request(ip: str, port: int, cmd: int = 1) -> bytes:
return bytes([0x05, cmd, 0x00, 0x01]) + socket.inet_aton(ip) + port.to_bytes(2, "big")
result = _try_handshake(handshake, SECRET)
self.assertIsNotNone(result)
self.assertEqual(result[:3], (2, False, PROTO_TAG_ABRIDGED))
def _domain_connect_request(domain: str, port: int, cmd: int = 1) -> bytes:
encoded = domain.encode("utf-8")
return (
bytes([0x05, cmd, 0x00, 0x03, len(encoded)])
+ encoded
+ port.to_bytes(2, "big")
)
def test_try_handshake_accepts_intermediate_proto(self):
handshake = _build_client_handshake(-4, PROTO_TAG_INTERMEDIATE)
result = _try_handshake(handshake, SECRET)
def _ipv6_connect_request(ip: str, port: int) -> bytes:
return (
bytes([0x05, 0x01, 0x00, 0x04])
+ socket.inet_pton(socket.AF_INET6, ip)
+ port.to_bytes(2, "big")
)
self.assertIsNotNone(result)
self.assertEqual(result[:3], (4, True, PROTO_TAG_INTERMEDIATE))
def test_generate_relay_init_produces_handshake_sized_packet(self):
relay_init = _generate_relay_init(PROTO_TAG_ABRIDGED, -2)
class Socks5ProtocolTests(unittest.IsolatedAsyncioTestCase):
async def test_rejects_non_socks5_greeting(self):
reader = _FakeReader(b"\x04\x01")
writer = _FakeWriter()
self.assertEqual(len(relay_init), 64)
self.assertEqual(relay_init[0], relay_init[0] & 0xFF)
await _handle_client(reader, writer)
self.assertEqual(writer.writes, [])
self.assertTrue(writer.closed)
async def test_rejects_unsupported_command(self):
reader = _FakeReader(b"\x05\x01\x00" + _ipv4_connect_request("1.1.1.1", 443, cmd=2))
writer = _FakeWriter()
await _handle_client(reader, writer)
self.assertEqual(writer.writes, [b"\x05\x00", _socks5_reply(0x07)])
self.assertTrue(writer.closed)
async def test_rejects_unsupported_address_type(self):
reader = _FakeReader(b"\x05\x01\x00" + b"\x05\x01\x00\x02")
writer = _FakeWriter()
await _handle_client(reader, writer)
self.assertEqual(writer.writes, [b"\x05\x00", _socks5_reply(0x08)])
self.assertTrue(writer.closed)
async def test_rejects_ipv6_destinations(self):
reader = _FakeReader(b"\x05\x01\x00" + _ipv6_connect_request("2001:db8::1", 443))
writer = _FakeWriter()
await _handle_client(reader, writer)
self.assertEqual(writer.writes, [b"\x05\x00", _socks5_reply(0x05)])
self.assertTrue(writer.closed)
async def test_passthrough_connect_failure_returns_error(self):
reader = _FakeReader(b"\x05\x01\x00" + _domain_connect_request("example.com", 443))
writer = _FakeWriter()
with patch("proxy.tg_ws_proxy.asyncio.open_connection", side_effect=OSError("boom")):
await _handle_client(reader, writer)
self.assertEqual(writer.writes, [b"\x05\x00", _socks5_reply(0x05)])
self.assertTrue(writer.closed)
if __name__ == "__main__":

View File

@ -13,11 +13,8 @@ class UpdateCheckTests(unittest.TestCase):
update_check._state.update(self._orig_state)
def test_apply_release_tag_marks_update_available(self):
version_parts = [int(part) for part in __version__.split(".")]
version_parts[-1] += 1
next_version = ".".join(str(part) for part in version_parts)
update_check._apply_release_tag(
tag=f"v{next_version}",
tag="v1.3.1",
html_url="https://example.com/release",
current_version=__version__,
)
@ -25,7 +22,7 @@ class UpdateCheckTests(unittest.TestCase):
status = update_check.get_status()
self.assertTrue(status["has_update"])
self.assertFalse(status["ahead_of_release"])
self.assertEqual(status["latest"], next_version)
self.assertEqual(status["latest"], "1.3.1")
self.assertEqual(status["html_url"], "https://example.com/release")
def test_apply_release_tag_marks_ahead_of_release(self):

View File

@ -1,3 +1,8 @@
"""
Общая светлая тема и фабрика окон CustomTkinter для tray-приложений (Windows / Linux).
Цвета и отступы задаются в одном месте правки темы не дублируются по платформам.
"""
from __future__ import annotations
import sys
@ -8,7 +13,11 @@ from typing import Any, Callable, Optional, Tuple
_tk_variable_del_guard_installed = False
def install_tkinter_variable_del_guard() -> None:
def _install_tkinter_variable_del_guard() -> None:
"""
Убирает «Exception ignored» при выходе процесса: Tcl уже разрушен, а GC ещё
вызывает Variable.__del__ (StringVar и т.д.) напр. окно CTk в фоновом потоке.
"""
global _tk_variable_del_guard_installed
if _tk_variable_del_guard_installed:
return
@ -23,24 +32,24 @@ def install_tkinter_variable_del_guard() -> None:
tkinter.Variable.__del__ = _safe_variable_del # type: ignore[assignment]
_tk_variable_del_guard_installed = True
# Размеры и отступы (единые для диалогов настроек и первого запуска)
CONFIG_DIALOG_SIZE: Tuple[int, int] = (460, 560)
CONFIG_DIALOG_FRAME_PAD: Tuple[int, int] = (20, 14)
FIRST_RUN_SIZE: Tuple[int, int] = (520, 480)
FIRST_RUN_SIZE: Tuple[int, int] = (520, 440)
FIRST_RUN_FRAME_PAD: Tuple[int, int] = (28, 24)
@dataclass(frozen=True)
class CtkTheme:
tg_blue: tuple = ("#3390ec", "#3390ec")
tg_blue_hover: tuple = ("#2b7cd4", "#2b7cd4")
bg: tuple = ("#ffffff", "#1e1e1e")
field_bg: tuple = ("#f0f2f5", "#2b2b2b")
field_border: tuple = ("#d6d9dc", "#3a3a3a")
text_primary: tuple = ("#000000", "#ffffff")
text_secondary: tuple = ("#707579", "#aaaaaa")
"""Палитра Telegram-style и семейства шрифтов для UI и моноширинного текста."""
tg_blue: str = "#3390ec"
tg_blue_hover: str = "#2b7cd4"
bg: str = "#ffffff"
field_bg: str = "#f0f2f5"
field_border: str = "#d6d9dc"
text_primary: str = "#000000"
text_secondary: str = "#707579"
ui_font_family: str = "Sans"
mono_font_family: str = "Monospace"
@ -52,16 +61,17 @@ def ctk_theme_for_platform() -> CtkTheme:
def apply_ctk_appearance(ctk: Any) -> None:
ctk.set_appearance_mode("auto")
ctk.set_appearance_mode("light")
ctk.set_default_color_theme("blue")
def center_ctk_geometry(root: Any, width: int, height: int) -> None:
sw = root.winfo_screenwidth()
sh = root.winfo_screenheight()
root.geometry(f"{width}x{height}+{(sw - width) // 2}+{(sh - height) // 2}")
def create_ctk_toplevel(
def create_ctk_root(
ctk: Any,
*,
title: str,
@ -71,27 +81,21 @@ def create_ctk_toplevel(
topmost: bool = True,
after_create: Optional[Callable[[Any], None]] = None,
) -> Any:
root = ctk.CTkToplevel()
"""
Создаёт CTk: глобальная тема, заголовок, без ресайза, по центру экрана, фон из палитры.
after_create опционально: установка иконки окна (различается по ОС).
"""
_install_tkinter_variable_del_guard()
apply_ctk_appearance(ctk)
root = ctk.CTk()
root.title(title)
root.resizable(False, False)
center_ctk_geometry(root, width, height)
root.configure(fg_color=theme.bg)
if topmost:
root.attributes("-topmost", True)
root.lift()
root.focus_force()
center_ctk_geometry(root, width, height)
root.configure(fg_color=theme.bg)
if after_create:
_after_id = root.after(300, lambda: after_create(root))
_orig_destroy = root.destroy
def _safe_destroy():
try:
root.after_cancel(_after_id)
except Exception:
pass
_orig_destroy()
root.destroy = _safe_destroy
after_create(root)
return root

View File

@ -1,3 +1,7 @@
"""
Всплывающие подсказки для CustomTkinter / tk: задержка, Toplevel без рамки, wrap.
"""
from __future__ import annotations
import tkinter as tk
@ -5,6 +9,8 @@ from typing import Any, List, Optional
class CtkTooltip:
"""Показ текста при наведении на виджет."""
def __init__(
self,
widget: Any,
@ -25,8 +31,6 @@ class CtkTooltip:
widget.bind("<Destroy>", self._on_destroy, add="+")
def _schedule(self, _event: Any = None) -> None:
if self.widget is None:
return
self._cancel_after()
self._after_id = self.widget.after(self.delay_ms, self._show)
@ -85,7 +89,6 @@ class CtkTooltip:
def _on_destroy(self, _event: Any = None) -> None:
self._hide()
self.widget = None
def _is_windows() -> bool:
@ -101,9 +104,11 @@ def attach_ctk_tooltip(
delay_ms: int = 450,
wraplength: int = 320,
) -> None:
"""Повесить подсказку на виджет (CTk или tk)."""
CtkTooltip(widget, text, delay_ms=delay_ms, wraplength=wraplength)
def attach_tooltip_to_widgets(widgets: List[Any], text: str, **kwargs: Any) -> None:
"""Одна и та же подсказка на несколько виджетов (подпись + поле)."""
for w in widgets:
attach_ctk_tooltip(w, text, **kwargs)

View File

@ -1,6 +1,10 @@
"""
Общая разметка CustomTkinter для tray (Windows / Linux): настройки и первый запуск.
Логика сохранения и колбэки остаются в платформенных модулях.
"""
from __future__ import annotations
import os
import webbrowser
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
@ -16,15 +20,15 @@ from ui.ctk_theme import (
)
from ui.ctk_tooltip import attach_ctk_tooltip, attach_tooltip_to_widgets
# Подсказки для формы настроек (новые пользователи)
_TIP_HOST = (
"Адрес, на котором прокси принимает подключения.\n"
"Адрес, на котором прокси принимает SOCKS5-подключения.\n"
"Обычно 127.0.0.1 — локальная сеть, 0.0.0.0 - все интерфейсы"
)
_TIP_PORT = (
"Порт прокси. В Telegram Desktop в настройках прокси должен быть "
"Порт SOCKS5. В Telegram Desktop в настройках прокси должен быть "
"указан тот же порт"
)
_TIP_SECRET = "Секретный ключ для авторизации клиентов"
_TIP_DC = (
"Соответствие номера датацентра Telegram (DC) и IP-адреса сервера.\n"
"Каждая строка: «номер:IP», например 2:149.154.167.220. "
@ -49,60 +53,14 @@ _TIP_AUTOSTART = (
"Запускать TG WS Proxy при входе в Windows. "
"Если вы переместите программу в другую папку, автозапуск сбросится"
)
_TIP_CHECK_UPDATES = "При запуске проверять наличие обновлений"
_TIP_CHECK_UPDATES = (
"При запуске проверять наличие обновлений"
)
_TIP_SAVE = "Сохранить настройки"
_TIP_CANCEL = "Закрыть окно без сохранения изменений"
_INNER_W = 396
def _entry(ctk, parent, theme, *, var=None, width=0, height=36, radius=10, **kw):
opts = dict(
font=(theme.ui_font_family, 13), corner_radius=radius,
fg_color=theme.bg, border_color=theme.field_border,
border_width=1, text_color=theme.text_primary,
)
if var is not None:
opts["textvariable"] = var
if width:
opts["width"] = width
opts["height"] = height
opts.update(kw)
return ctk.CTkEntry(parent, **opts)
def _checkbox(ctk, parent, theme, text, variable):
return ctk.CTkCheckBox(
parent, text=text, variable=variable,
font=(theme.ui_font_family, 13), text_color=theme.text_primary,
fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover,
corner_radius=6, border_width=2, border_color=theme.field_border,
)
def _label(ctk, parent, theme, text, *, size=12, bold=False, secondary=True, **kw):
weight = "bold" if bold else "normal"
return ctk.CTkLabel(
parent, text=text,
font=(theme.ui_font_family, size, weight),
text_color=theme.text_secondary if secondary else theme.text_primary,
anchor="w", **kw,
)
def _labeled_entry(ctk, parent, theme, label_text, value, *, tip="", width=0, pack_fill=False):
col = ctk.CTkFrame(parent, fg_color="transparent")
lbl = _label(ctk, col, theme, label_text)
lbl.pack(anchor="w", pady=(0, 2))
var = ctk.StringVar(value=str(value))
ent = _entry(ctk, col, theme, var=var, width=width)
if pack_fill:
ent.pack(fill="x")
else:
ent.pack(anchor="w")
if tip:
attach_tooltip_to_widgets([lbl, ent, col], tip)
return col, var
# Внутренняя ширина полей относительно ширины окна настроек (см. CONFIG_DIALOG_SIZE)
_CONFIG_FORM_INNER_WIDTH = 396
def tray_settings_scroll_and_footer(
@ -110,6 +68,10 @@ def tray_settings_scroll_and_footer(
content_parent: Any,
theme: CtkTheme,
) -> Tuple[Any, Any]:
"""
Нижняя панель под кнопки и прокручиваемая область для формы (форма не обрезает кнопки).
Возвращает (scroll_frame, footer_frame).
"""
footer = ctk.CTkFrame(content_parent, fg_color=theme.bg)
footer.pack(side="bottom", fill="x")
scroll = ctk.CTkScrollableFrame(
@ -131,12 +93,22 @@ def _config_section(
*,
bottom_spacer: int = 6,
) -> Any:
"""Заголовок секции и карточка с рамкой для группировки полей."""
wrap = ctk.CTkFrame(parent, fg_color="transparent")
wrap.pack(fill="x", pady=(0, bottom_spacer))
_label(ctk, wrap, theme, title, secondary=False, bold=True).pack(anchor="w", pady=(0, 2))
ctk.CTkLabel(
wrap,
text=title,
font=(theme.ui_font_family, 12, "bold"),
text_color=theme.text_primary,
anchor="w",
).pack(anchor="w", pady=(0, 2))
card = ctk.CTkFrame(
wrap, fg_color=theme.field_bg, corner_radius=10,
border_width=1, border_color=theme.field_border,
wrap,
fg_color=theme.field_bg,
corner_radius=10,
border_width=1,
border_color=theme.field_border,
)
card.pack(fill="x")
inner = ctk.CTkFrame(card, fg_color="transparent")
@ -148,7 +120,6 @@ def _config_section(
class TrayConfigFormWidgets:
host_var: Any
port_var: Any
secret_var: Any
dc_textbox: Any
verbose_var: Any
adv_entries: List[Any]
@ -167,67 +138,102 @@ def install_tray_config_form(
show_autostart: bool = False,
autostart_value: bool = False,
) -> TrayConfigFormWidgets:
"""Поля настроек прокси внутри уже созданного `frame`."""
header = ctk.CTkFrame(frame, fg_color="transparent")
header.pack(fill="x", pady=(0, 2))
ctk.CTkLabel(
header, text="Настройки прокси",
header,
text="Настройки прокси",
font=(theme.ui_font_family, 17, "bold"),
text_color=theme.text_primary, anchor="w",
text_color=theme.text_primary,
anchor="w",
).pack(side="left")
ctk.CTkLabel(
header, text=f"v{__version__}",
header,
text=f"v{__version__}",
font=(theme.ui_font_family, 12),
text_color=theme.text_secondary, anchor="e",
text_color=theme.text_secondary,
anchor="e",
).pack(side="right")
conn = _config_section(ctk, frame, theme, "Подключение MTProto")
inner_w = _CONFIG_FORM_INNER_WIDTH
conn = _config_section(ctk, frame, theme, "Подключение SOCKS5")
host_row = ctk.CTkFrame(conn, fg_color="transparent")
host_row.pack(fill="x")
host_col, host_var = _labeled_entry(
ctk, host_row, theme, "IP-адрес",
cfg.get("host", default_config["host"]),
tip=_TIP_HOST, width=160, pack_fill=True,
)
host_col = ctk.CTkFrame(host_row, fg_color="transparent")
host_col.pack(side="left", fill="x", expand=True, padx=(0, 10))
port_col, port_var = _labeled_entry(
ctk, host_row, theme, "Порт",
cfg.get("port", default_config["port"]),
tip=_TIP_PORT, width=100,
host_lbl = ctk.CTkLabel(
host_col,
text="IP-адрес",
font=(theme.ui_font_family, 12),
text_color=theme.text_secondary,
anchor="w",
)
host_lbl.pack(anchor="w", pady=(0, 2))
host_var = ctk.StringVar(value=cfg.get("host", default_config["host"]))
host_entry = ctk.CTkEntry(
host_col,
textvariable=host_var,
width=160,
height=36,
font=(theme.ui_font_family, 13),
corner_radius=10,
fg_color=theme.bg,
border_color=theme.field_border,
border_width=1,
text_color=theme.text_primary,
)
host_entry.pack(fill="x", pady=(0, 0))
attach_tooltip_to_widgets([host_lbl, host_entry, host_col], _TIP_HOST)
port_col = ctk.CTkFrame(host_row, fg_color="transparent")
port_col.pack(side="left")
secret_row = ctk.CTkFrame(conn, fg_color="transparent")
secret_row.pack(fill="x")
secret_col, secret_var = _labeled_entry(
ctk, secret_row, theme, "Secret",
cfg.get("secret", default_config["secret"]),
tip=_TIP_SECRET, width=160, pack_fill=True,
port_lbl = ctk.CTkLabel(
port_col,
text="Порт",
font=(theme.ui_font_family, 12),
text_color=theme.text_secondary,
anchor="w",
)
secret_col.pack(side="left", fill="x", expand=True, padx=(0, 10))
regen_col = ctk.CTkFrame(secret_row, fg_color="transparent")
regen_col.pack(side="left", anchor="s")
ctk.CTkLabel(regen_col, text="", font=(theme.ui_font_family, 12)).pack(pady=(0, 2))
ctk.CTkButton(
regen_col, text="", width=36, height=36,
font=(theme.ui_font_family, 18), corner_radius=10,
fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover,
text_color="#ffffff", border_width=1, border_color=theme.field_border,
command=lambda: secret_var.set(os.urandom(16).hex()),
).pack()
port_lbl.pack(anchor="w", pady=(0, 2))
port_var = ctk.StringVar(value=str(cfg.get("port", default_config["port"])))
port_entry = ctk.CTkEntry(
port_col,
textvariable=port_var,
width=100,
height=36,
font=(theme.ui_font_family, 13),
corner_radius=10,
fg_color=theme.bg,
border_color=theme.field_border,
border_width=1,
text_color=theme.text_primary,
)
port_entry.pack(anchor="w")
attach_tooltip_to_widgets([port_lbl, port_entry, port_col], _TIP_PORT)
dc_inner = _config_section(ctk, frame, theme, "Датацентры Telegram (DC → IP)")
dc_lbl = _label(ctk, dc_inner, theme, "По одному правилу на строку, формат: номер:IP", size=11)
dc_lbl = ctk.CTkLabel(
dc_inner,
text="По одному правилу на строку, формат: номер:IP",
font=(theme.ui_font_family, 11),
text_color=theme.text_secondary,
anchor="w",
)
dc_lbl.pack(anchor="w", pady=(0, 4))
dc_textbox = ctk.CTkTextbox(
dc_inner, width=_INNER_W, height=88,
font=(theme.mono_font_family, 12), corner_radius=10,
fg_color=theme.bg, border_color=theme.field_border,
border_width=1, text_color=theme.text_primary,
dc_inner,
width=inner_w,
height=88,
font=(theme.mono_font_family, 12),
corner_radius=10,
fg_color=theme.bg,
border_color=theme.field_border,
border_width=1,
text_color=theme.text_primary,
)
dc_textbox.pack(fill="x")
dc_textbox.insert("1.0", "\n".join(cfg.get("dc_ip", default_config["dc_ip"])))
@ -236,7 +242,18 @@ def install_tray_config_form(
log_inner = _config_section(ctk, frame, theme, "Логи и производительность")
verbose_var = ctk.BooleanVar(value=cfg.get("verbose", False))
verbose_cb = _checkbox(ctk, log_inner, theme, "Подробное логирование (verbose)", verbose_var)
verbose_cb = ctk.CTkCheckBox(
log_inner,
text="Подробное логирование (verbose)",
variable=verbose_var,
font=(theme.ui_font_family, 13),
text_color=theme.text_primary,
fg_color=theme.tg_blue,
hover_color=theme.tg_blue_hover,
corner_radius=6,
border_width=2,
border_color=theme.field_border,
)
verbose_cb.pack(anchor="w", pady=(0, 6))
attach_ctk_tooltip(verbose_cb, _TIP_VERBOSE)
@ -248,17 +265,33 @@ def install_tray_config_form(
("Пул WebSocket-сессий (по умолчанию 4)", "pool_size", _TIP_POOL),
("Макс. размер лога, МБ (по умолчанию 5)", "log_max_mb", _TIP_LOG_MB),
]
for label_text, key, tip in adv_rows:
col = ctk.CTkFrame(adv_frame, fg_color="transparent")
col.pack(fill="x", pady=(0, 0 if key == "log_max_mb" else 5))
adv_l = _label(ctk, col, theme, label_text, size=11)
for lbl, key, tip in adv_rows:
col_frame = ctk.CTkFrame(adv_frame, fg_color="transparent")
col_frame.pack(fill="x", pady=(0, 0 if key == "log_max_mb" else 5))
adv_l = ctk.CTkLabel(
col_frame,
text=lbl,
font=(theme.ui_font_family, 11),
text_color=theme.text_secondary,
anchor="w",
)
adv_l.pack(anchor="w", pady=(0, 2))
adv_e = _entry(
ctk, col, theme, width=_INNER_W, height=32, radius=8,
textvariable=ctk.StringVar(value=str(cfg.get(key, default_config[key]))),
adv_e = ctk.CTkEntry(
col_frame,
width=inner_w,
height=32,
font=(theme.ui_font_family, 13),
corner_radius=8,
fg_color=theme.bg,
border_color=theme.field_border,
border_width=1,
text_color=theme.text_primary,
textvariable=ctk.StringVar(
value=str(cfg.get(key, default_config[key]))
),
)
adv_e.pack(fill="x")
attach_tooltip_to_widgets([adv_l, adv_e, col], tip)
attach_tooltip_to_widgets([adv_l, adv_e, col_frame], tip)
adv_entries = list(adv_frame.winfo_children())
adv_keys = ("buf_kb", "pool_size", "log_max_mb")
@ -266,9 +299,22 @@ def install_tray_config_form(
upd_inner = _config_section(ctk, frame, theme, "Обновления")
st = get_status()
check_updates_var = ctk.BooleanVar(
value=bool(cfg.get("check_updates", default_config.get("check_updates", True)))
value=bool(
cfg.get("check_updates", default_config.get("check_updates", True))
)
)
upd_cb = ctk.CTkCheckBox(
upd_inner,
text="Проверять обновления при запуске",
variable=check_updates_var,
font=(theme.ui_font_family, 13),
text_color=theme.text_primary,
fg_color=theme.tg_blue,
hover_color=theme.tg_blue_hover,
corner_radius=6,
border_width=2,
border_color=theme.field_border,
)
upd_cb = _checkbox(ctk, upd_inner, theme, "Проверять обновления при запуске", check_updates_var)
upd_cb.pack(anchor="w", pady=(0, 6))
attach_ctk_tooltip(upd_cb, _TIP_CHECK_UPDATES)
@ -289,38 +335,72 @@ def install_tray_config_form(
else:
upd_status = "Установлена последняя известная версия с GitHub."
_label(ctk, upd_inner, theme, upd_status, size=11,
justify="left", wraplength=_INNER_W).pack(anchor="w", pady=(0, 8))
ctk.CTkLabel(
upd_inner,
text=upd_status,
font=(theme.ui_font_family, 11),
text_color=theme.text_secondary,
anchor="w",
justify="left",
wraplength=inner_w,
).pack(anchor="w", pady=(0, 8))
rel_url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ctk.CTkButton(
upd_inner, text="Открыть страницу релиза", height=32,
font=(theme.ui_font_family, 13), corner_radius=8,
fg_color=theme.field_bg, hover_color=theme.field_border,
text_color=theme.text_primary, border_width=1,
open_rel_btn = ctk.CTkButton(
upd_inner,
text="Открыть страницу релиза",
height=32,
font=(theme.ui_font_family, 13),
corner_radius=8,
fg_color=theme.field_bg,
hover_color=theme.field_border,
text_color=theme.text_primary,
border_width=1,
border_color=theme.field_border,
command=lambda u=rel_url: webbrowser.open(u),
).pack(anchor="w")
)
open_rel_btn.pack(anchor="w")
autostart_var = None
if show_autostart:
sys_inner = _config_section(ctk, frame, theme, "Запуск Windows", bottom_spacer=4)
sys_inner = _config_section(
ctk, frame, theme, "Запуск Windows", bottom_spacer=4
)
autostart_var = ctk.BooleanVar(value=autostart_value)
as_cb = _checkbox(ctk, sys_inner, theme, "Автозапуск при включении компьютера", autostart_var)
as_cb = ctk.CTkCheckBox(
sys_inner,
text="Автозапуск при включении компьютера",
variable=autostart_var,
font=(theme.ui_font_family, 13),
text_color=theme.text_primary,
fg_color=theme.tg_blue,
hover_color=theme.tg_blue_hover,
corner_radius=6,
border_width=2,
border_color=theme.field_border,
)
as_cb.pack(anchor="w", pady=(0, 4))
as_hint = _label(
ctk, sys_inner, theme,
"Если переместить программу в другую папку, запись автозапуска может сброситься.",
size=11, justify="left", wraplength=_INNER_W,
as_hint = ctk.CTkLabel(
sys_inner,
text="Если переместить программу в другую папку, запись автозапуска может сброситься.",
font=(theme.ui_font_family, 11),
text_color=theme.text_secondary,
anchor="w",
justify="left",
wraplength=inner_w,
)
as_hint.pack(anchor="w")
attach_tooltip_to_widgets([as_cb, as_hint], _TIP_AUTOSTART)
return TrayConfigFormWidgets(
host_var=host_var, port_var=port_var, secret_var=secret_var,
dc_textbox=dc_textbox, verbose_var=verbose_var,
adv_entries=adv_entries, adv_keys=adv_keys,
autostart_var=autostart_var, check_updates_var=check_updates_var,
host_var=host_var,
port_var=port_var,
dc_textbox=dc_textbox,
verbose_var=verbose_var,
adv_entries=adv_entries,
adv_keys=adv_keys,
autostart_var=autostart_var,
check_updates_var=check_updates_var,
)
@ -329,6 +409,7 @@ def merge_adv_from_form(
base: Dict[str, Any],
default_config: dict,
) -> None:
"""Дополняет base значениями buf_kb / pool_size / log_max_mb (in-place)."""
for i, key in enumerate(widgets.adv_keys):
col_frame = widgets.adv_entries[i]
entry = col_frame.winfo_children()[1]
@ -347,6 +428,9 @@ def validate_config_form(
*,
include_autostart: bool,
) -> Union[dict, str]:
"""
Возвращает словарь полей конфига или строку ошибки для показа пользователю.
"""
import socket as _sock
host_val = widgets.host_var.get().strip()
@ -372,18 +456,9 @@ def validate_config_form(
except ValueError as e:
return str(e)
secret_val = widgets.secret_var.get().strip()
if len(secret_val) != 32:
return "Secret должен содержать ровно 32 hex-символа (16 байт)."
try:
bytes.fromhex(secret_val)
except ValueError:
return "Secret должен состоять только из hex-символов (0-9, a-f)."
new_cfg: Dict[str, Any] = {
"host": host_val,
"port": port_val,
"secret": secret_val,
"dc_ip": lines,
"verbose": widgets.verbose_var.get(),
}
@ -442,11 +517,12 @@ def populate_first_run_window(
*,
host: str,
port: int,
secret: str,
on_done: Callable[[bool], None],
) -> None:
link_host = tg_ws_proxy.get_link_host(host)
tg_url = f"tg://proxy?server={link_host}&port={port}&secret=dd{secret}"
"""
Содержимое окна первого запуска. on_done(open_in_telegram) по «Начать» и по закрытию окна.
"""
tg_url = f"tg://socks?server={host}&port={port}"
fpx, fpy = FIRST_RUN_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
@ -465,35 +541,18 @@ def populate_first_run_window(
("Как подключить Telegram Desktop:", True),
(" Автоматически:", True),
(" ПКМ по иконке в трее → «Открыть в Telegram»", False),
(f" Или скопировать ссылку, отправить её себе в TG и нажать по ней: {tg_url}", False),
(f" Или ссылка: {tg_url}", False),
("\n Вручную:", True),
(" Настройки → Продвинутые → Тип подключения → Прокси", False),
(f" MTProto → {link_host} : {port}", False),
(f" Secret: dd{secret}", False),
(f" SOCKS5 → {host} : {port} (без логина/пароля)", False),
]
textbox = ctk.CTkTextbox(
frame,
font=(theme.ui_font_family, 13),
fg_color=theme.bg,
border_width=0,
text_color=theme.text_primary,
activate_scrollbars=False,
wrap="word",
height=275,
)
textbox._textbox.tag_configure("bold", font=(theme.ui_font_family, 13, "bold"))
textbox._textbox.configure(spacing1=1, spacing3=1)
for text, bold in sections:
if text.startswith("\n"):
textbox.insert("end", "\n")
text = text[1:]
if bold:
textbox.insert("end", text + "\n", "bold")
else:
textbox.insert("end", text + "\n")
textbox.configure(state="disabled")
textbox.pack(anchor="w", fill="x")
weight = "bold" if bold else "normal"
ctk.CTkLabel(frame, text=text,
font=(theme.ui_font_family, 13, weight),
text_color=theme.text_primary,
anchor="w", justify="left").pack(anchor="w", pady=1)
ctk.CTkFrame(frame, fg_color="transparent", height=16).pack()
@ -501,8 +560,12 @@ def populate_first_run_window(
corner_radius=0).pack(fill="x", pady=(0, 12))
auto_var = ctk.BooleanVar(value=True)
_checkbox(ctk, frame, theme, "Открыть прокси в Telegram сейчас",
auto_var).pack(anchor="w", pady=(0, 16))
ctk.CTkCheckBox(frame, text="Открыть прокси в Telegram сейчас",
variable=auto_var, font=(theme.ui_font_family, 13),
text_color=theme.text_primary,
fg_color=theme.tg_blue, hover_color=theme.tg_blue_hover,
corner_radius=6, border_width=2,
border_color=theme.field_border).pack(anchor="w", pady=(0, 16))
def on_ok():
on_done(auto_var.get())

View File

@ -5,11 +5,10 @@
from __future__ import annotations
import sys
import os
from typing import Any, Dict
_TRAY_DEFAULTS_COMMON: Dict[str, Any] = {
"port": 1443,
"port": 1080,
"host": "127.0.0.1",
"dc_ip": ["2:149.154.167.220", "4:149.154.167.220"],
"verbose": False,
@ -21,10 +20,8 @@ _TRAY_DEFAULTS_COMMON: Dict[str, Any] = {
def default_tray_config() -> Dict[str, Any]:
"""Новая копия конфига по умолчанию для текущей ОС."""
cfg = dict(_TRAY_DEFAULTS_COMMON)
cfg["secret"] = os.urandom(16).hex()
if sys.platform == "win32":
cfg["autostart"] = False
return cfg

View File

@ -1,460 +0,0 @@
from __future__ import annotations
import asyncio
import json
import logging
import logging.handlers
import os
import socket as _socket
import sys
import threading
import time
from pathlib import Path
from typing import Any, Callable, Dict, Optional, Tuple
import psutil
import proxy.tg_ws_proxy as tg_ws_proxy
from proxy import __version__
from utils.default_config import default_tray_config
log = logging.getLogger("tg-ws-tray")
APP_NAME = "TgWsProxy"
def _app_dir() -> Path:
if sys.platform == "win32":
return Path(os.environ.get("APPDATA", Path.home())) / APP_NAME
if sys.platform == "darwin":
return Path.home() / "Library" / "Application Support" / APP_NAME
return Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / APP_NAME
APP_DIR = _app_dir()
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done_mtproto"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
DEFAULT_CONFIG: Dict[str, Any] = default_tray_config()
IS_FROZEN = bool(getattr(sys, "frozen", False))
def ensure_dirs() -> None:
APP_DIR.mkdir(parents=True, exist_ok=True)
# single-instance lock
_lock_file_path: Optional[Path] = None
def _same_process(meta: dict, proc: psutil.Process, script_hint: str) -> bool:
try:
lock_ct = float(meta.get("create_time", 0.0))
if lock_ct > 0 and abs(lock_ct - proc.create_time()) > 1.0:
return False
except Exception:
return False
if IS_FROZEN:
return APP_NAME.lower() in proc.name().lower()
try:
for arg in proc.cmdline():
if script_hint in arg:
return True
except Exception:
pass
return False
def acquire_lock(script_hint: str = "") -> bool:
global _lock_file_path
ensure_dirs()
for f in list(APP_DIR.glob("*.lock")):
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
meta: dict = {}
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
pass
try:
if _same_process(meta, psutil.Process(pid), script_hint):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
lock_file.write_text(
json.dumps({"create_time": proc.create_time()}, ensure_ascii=False),
encoding="utf-8",
)
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
def release_lock() -> None:
global _lock_file_path
if _lock_file_path:
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
# config
def load_config() -> dict:
ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict) -> None:
ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
# logging
_LOG_FMT_FILE = "%(asctime)s %(levelname)-5s %(name)s %(message)s"
_LOG_FMT_CONSOLE = "%(asctime)s %(levelname)-5s %(message)s"
def setup_logging(verbose: bool = False, log_max_mb: float = 5) -> None:
ensure_dirs()
level = logging.DEBUG if verbose else logging.INFO
root = logging.getLogger()
root.setLevel(level)
fh = logging.handlers.RotatingFileHandler(
str(LOG_FILE),
maxBytes=max(32 * 1024, int(log_max_mb * 1024 * 1024)),
backupCount=0,
encoding="utf-8",
)
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(_LOG_FMT_FILE, datefmt="%Y-%m-%d %H:%M:%S"))
root.addHandler(fh)
if not IS_FROZEN:
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(level)
ch.setFormatter(logging.Formatter(_LOG_FMT_CONSOLE, datefmt="%H:%M:%S"))
root.addHandler(ch)
# icon
def make_icon_image(size: int = 64, *, color: Tuple[int, ...] = (0, 136, 204, 255)):
from PIL import Image, ImageDraw, ImageFont
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = 2
draw.ellipse([margin, margin, size - margin, size - margin], fill=color)
for path in _font_paths():
try:
font = ImageFont.truetype(path, size=int(size * 0.55))
break
except Exception:
continue
else:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
draw.text(
((size - tw) // 2 - bbox[0], (size - th) // 2 - bbox[1]),
"T",
fill=(255, 255, 255, 255),
font=font,
)
return img
def _font_paths():
if sys.platform == "win32":
return ["arial.ttf"]
if sys.platform == "darwin":
return ["/System/Library/Fonts/Helvetica.ttc"]
return [
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/TTF/DejaVuSans-Bold.ttf",
]
def load_icon():
from PIL import Image
icon_path = Path(__file__).parents[1] / "icon.ico"
if icon_path.exists():
try:
return Image.open(str(icon_path))
except Exception:
pass
return make_icon_image(64)
# proxy lifecycle
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[Tuple[asyncio.AbstractEventLoop, asyncio.Event]] = None
def _run_proxy_thread(on_port_busy: Callable[[str], None]) -> None:
global _async_stop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
stop_ev = asyncio.Event()
_async_stop = (loop, stop_ev)
try:
loop.run_until_complete(tg_ws_proxy._run(stop_event=stop_ev))
except Exception as exc:
log.error("Proxy thread crashed: %s", exc)
if "Address already in use" in str(exc) or "10048" in str(exc):
on_port_busy(
"Не удалось запустить прокси:\n"
"Порт уже используется другим приложением.\n\n"
"Закройте приложение, использующее этот порт, "
"или измените порт в настройках прокси и перезапустите."
)
finally:
loop.close()
_async_stop = None
def apply_proxy_config(cfg: dict) -> bool:
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
try:
dc_redirects = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
return False
pc = tg_ws_proxy.proxy_config
pc.port = cfg.get("port", DEFAULT_CONFIG["port"])
pc.host = cfg.get("host", DEFAULT_CONFIG["host"])
pc.secret = cfg.get("secret", DEFAULT_CONFIG["secret"])
pc.dc_redirects = dc_redirects
pc.buffer_size = max(4, cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])) * 1024
pc.pool_size = max(0, cfg.get("pool_size", DEFAULT_CONFIG["pool_size"]))
return True
def start_proxy(cfg: dict, on_error: Callable[[str], None]) -> None:
global _proxy_thread
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
if not apply_proxy_config(cfg):
on_error("Ошибка конфигурации DC → IP.")
return
pc = tg_ws_proxy.proxy_config
log.info("Starting proxy on %s:%d ...", pc.host, pc.port)
_proxy_thread = threading.Thread(
target=_run_proxy_thread, args=(on_error,), daemon=True, name="proxy"
)
_proxy_thread.start()
def stop_proxy() -> None:
global _proxy_thread, _async_stop
if _async_stop:
loop, stop_ev = _async_stop
loop.call_soon_threadsafe(stop_ev.set)
if _proxy_thread:
_proxy_thread.join(timeout=5)
_proxy_thread = None
log.info("Proxy stopped")
def restart_proxy(cfg: dict, on_error: Callable[[str], None]) -> None:
log.info("Restarting proxy...")
stop_proxy()
time.sleep(0.3)
start_proxy(cfg, on_error)
def tg_proxy_url(cfg: dict) -> str:
host = cfg.get("host", DEFAULT_CONFIG["host"])
port = cfg.get("port", DEFAULT_CONFIG["port"])
secret = cfg.get("secret", DEFAULT_CONFIG["secret"])
link_host = tg_ws_proxy.get_link_host(host)
return f"tg://proxy?server={link_host}&port={port}&secret=dd{secret}"
_IPV6_WARNING = (
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает или в логах присутствуют ошибки, "
"связанные с попытками подключения по IPv6 - "
"попробуйте отключить в настройках прокси Telegram попытку соединения "
"по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 "
"в системе.\n\n"
"Это предупреждение будет показано только один раз."
)
def _has_ipv6() -> bool:
try:
for addr in _socket.getaddrinfo(_socket.gethostname(), None, _socket.AF_INET6):
ip = addr[4][0]
if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"):
return True
except Exception:
pass
try:
s = _socket.socket(_socket.AF_INET6, _socket.SOCK_STREAM)
s.bind(("::1", 0))
s.close()
return True
except Exception:
return False
def check_ipv6_warning(show_info: Callable[[str, str], None]) -> None:
ensure_dirs()
if IPV6_WARN_MARKER.exists() or not _has_ipv6():
return
IPV6_WARN_MARKER.touch()
threading.Thread(
target=lambda: show_info(_IPV6_WARNING, "TG WS Proxy"),
daemon=True,
).start()
# update check
def maybe_notify_update(
cfg: dict,
is_exiting: Callable[[], bool],
ask_open: Callable[[str, str], bool],
) -> None:
if not cfg.get("check_updates", True):
return
def _work():
time.sleep(1.5)
if is_exiting():
return
try:
from utils.update_check import RELEASES_PAGE_URL, get_status, run_check
import webbrowser
run_check(__version__)
st = get_status()
if not st.get("has_update"):
return
url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ver = st.get("latest") or "?"
if ask_open(
f"Доступна новая версия: {ver}\n\nОткрыть страницу релиза в браузере?",
"TG WS Proxy — обновление",
):
webbrowser.open(url)
except Exception as exc:
log.debug("Update check failed: %s", exc)
threading.Thread(target=_work, daemon=True, name="update-check").start()
# ctk thread (windows / linux)
_ctk_root: Any = None
_ctk_root_ready = threading.Event()
def ensure_ctk_thread(ctk: Any) -> bool:
global _ctk_root
if ctk is None:
return False
if _ctk_root_ready.is_set():
return True
def _run():
global _ctk_root
from ui.ctk_theme import apply_ctk_appearance, install_tkinter_variable_del_guard
install_tkinter_variable_del_guard()
apply_ctk_appearance(ctk)
_ctk_root = ctk.CTk()
_ctk_root.withdraw()
_ctk_root_ready.set()
_ctk_root.mainloop()
threading.Thread(target=_run, daemon=True, name="ctk-root").start()
_ctk_root_ready.wait(timeout=5.0)
return _ctk_root is not None
def ctk_run_dialog(build_fn: Callable[[threading.Event], None]) -> None:
if _ctk_root is None:
return
done = threading.Event()
def _invoke():
try:
build_fn(done)
except Exception:
log.exception("CTk dialog failed")
done.set()
_ctk_root.after(0, _invoke)
done.wait()
import gc
gc.collect()
def quit_ctk() -> None:
if _ctk_root is not None:
try:
_ctk_root.after(0, _ctk_root.quit)
except Exception:
pass
# common bootstrap
def bootstrap(cfg: dict) -> None:
save_config(cfg)
if LOG_FILE.exists():
try:
LOG_FILE.unlink()
except Exception:
pass
setup_logging(
cfg.get("verbose", False),
log_max_mb=cfg.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]),
)
log.info("TG WS Proxy версия %s starting", __version__)
log.info("Config: %s", cfg)
log.info("Log file: %s", LOG_FILE)

View File

@ -1,12 +1,18 @@
from __future__ import annotations
import ctypes
import ipaddress
import json
import logging
import logging.handlers
import os
import winreg
import psutil
import sys
import threading
import time
import webbrowser
import winreg
import ipaddress
from pathlib import Path
from typing import Optional
@ -26,62 +32,169 @@ except ImportError:
ctk = None
try:
from PIL import Image
from PIL import Image, ImageDraw, ImageFont
except ImportError:
Image = None
Image = ImageDraw = ImageFont = None
import proxy.tg_ws_proxy as tg_ws_proxy
from utils.tray_common import (
APP_NAME, DEFAULT_CONFIG, FIRST_RUN_MARKER, IS_FROZEN, LOG_FILE,
acquire_lock, bootstrap, check_ipv6_warning, ctk_run_dialog,
ensure_ctk_thread, ensure_dirs, load_config, load_icon, log,
maybe_notify_update, quit_ctk, release_lock, restart_proxy,
save_config, start_proxy, stop_proxy, tg_proxy_url,
)
from proxy.app_runtime import ProxyAppRuntime
from proxy import __version__
from utils.default_config import default_tray_config
from ui.ctk_tray_ui import (
install_tray_config_buttons, install_tray_config_form,
populate_first_run_window, tray_settings_scroll_and_footer,
install_tray_config_buttons,
install_tray_config_form,
populate_first_run_window,
tray_settings_scroll_and_footer,
validate_config_form,
)
from ui.ctk_theme import (
CONFIG_DIALOG_FRAME_PAD, CONFIG_DIALOG_SIZE, FIRST_RUN_SIZE,
create_ctk_toplevel, ctk_theme_for_platform, main_content_frame,
CONFIG_DIALOG_FRAME_PAD,
CONFIG_DIALOG_SIZE,
FIRST_RUN_SIZE,
create_ctk_root,
ctk_theme_for_platform,
main_content_frame,
)
IS_FROZEN = bool(getattr(sys, "frozen", False))
APP_NAME = "TgWsProxy"
APP_DIR = Path(os.environ.get("APPDATA", Path.home())) / APP_NAME
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
DEFAULT_CONFIG = default_tray_config()
_tray_icon: Optional[object] = None
_config: dict = {}
_exiting = False
_exiting: bool = False
_lock_file_path: Optional[Path] = None
ICON_PATH = str(Path(__file__).parent / "icon.ico")
log = logging.getLogger("tg-ws-tray")
_runtime = ProxyAppRuntime(
APP_DIR,
default_config=DEFAULT_CONFIG,
logger_name="tg-ws-tray",
on_error=lambda text: _show_error(text),
)
CONFIG_FILE = _runtime.config_file
LOG_FILE = _runtime.log_file
# win32 dialogs
_u32 = ctypes.windll.user32
_u32.MessageBoxW.argtypes = [ctypes.c_void_p, ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint]
_u32.MessageBoxW.restype = ctypes.c_int
_MB_OK_ERR = 0x10
_MB_OK_INFO = 0x40
_MB_YESNO_Q = 0x24
_IDYES = 6
_user32 = ctypes.windll.user32
_user32.MessageBoxW.argtypes = [
ctypes.c_void_p,
ctypes.c_wchar_p,
ctypes.c_wchar_p,
ctypes.c_uint,
]
_user32.MessageBoxW.restype = ctypes.c_int
def _show_error(text: str, title: str = "TG WS Proxy — Ошибка") -> None:
_u32.MessageBoxW(None, text, title, _MB_OK_ERR)
def _same_process(lock_meta: dict, proc: psutil.Process) -> bool:
try:
lock_ct = float(lock_meta.get("create_time", 0.0))
proc_ct = float(proc.create_time())
if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0:
return False
except Exception:
return False
try:
for arg in proc.cmdline():
if "windows.py" in arg:
return True
except Exception:
pass
frozen = bool(getattr(sys, "frozen", False))
if frozen:
return (
os.path.basename(sys.executable).lower() == proc.name().lower()
)
return False
def _show_info(text: str, title: str = "TG WS Proxy") -> None:
_u32.MessageBoxW(None, text, title, _MB_OK_INFO)
def _release_lock():
global _lock_file_path
if not _lock_file_path:
return
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
return _u32.MessageBoxW(None, text, title, _MB_YESNO_Q) == _IDYES
def _acquire_lock() -> bool:
global _lock_file_path
_ensure_dirs()
lock_files = list(APP_DIR.glob("*.lock"))
for f in lock_files:
pid = None
meta: dict = {}
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
meta = {}
try:
proc = psutil.Process(pid)
if _same_process(meta, proc):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
payload = {
"create_time": proc.create_time(),
}
lock_file.write_text(json.dumps(payload, ensure_ascii=False),
encoding="utf-8")
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
# autostart (registry)
def _ensure_dirs():
_runtime.ensure_dirs()
_RUN_KEY = r"Software\Microsoft\Windows\CurrentVersion\Run"
def load_config() -> dict:
return _runtime.load_config()
def save_config(cfg: dict):
_runtime.save_config(cfg)
def setup_logging(verbose: bool = False, log_max_mb: float = 5):
_runtime.setup_logging(verbose, log_max_mb=log_max_mb)
def _autostart_reg_name() -> str:
return APP_NAME
def _supports_autostart() -> bool:
@ -94,213 +207,408 @@ def _autostart_command() -> str:
def is_autostart_enabled() -> bool:
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, _RUN_KEY, 0, winreg.KEY_READ) as k:
val, _ = winreg.QueryValueEx(k, APP_NAME)
return str(val).strip() == _autostart_command().strip()
except (FileNotFoundError, OSError):
with winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
0,
winreg.KEY_READ,
) as k:
val, _ = winreg.QueryValueEx(k, _autostart_reg_name())
stored = str(val).strip()
expected = _autostart_command().strip()
return stored == expected
except FileNotFoundError:
return False
except OSError:
return False
def set_autostart_enabled(enabled: bool) -> None:
try:
with winreg.CreateKey(winreg.HKEY_CURRENT_USER, _RUN_KEY) as k:
with winreg.CreateKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
) as k:
if enabled:
winreg.SetValueEx(k, APP_NAME, 0, winreg.REG_SZ, _autostart_command())
winreg.SetValueEx(
k,
_autostart_reg_name(),
0,
winreg.REG_SZ,
_autostart_command(),
)
else:
try:
winreg.DeleteValue(k, APP_NAME)
winreg.DeleteValue(k, _autostart_reg_name())
except FileNotFoundError:
pass
except OSError as exc:
log.error("Failed to update autostart: %s", exc)
_show_error(
"Не удалось изменить автозапуск.\n\n"
"Попробуйте запустить приложение от имени пользователя "
f"с правами на реестр.\n\nОшибка: {exc}"
"Попробуйте запустить приложение от имени пользователя с правами на реестр.\n\n"
f"Ошибка: {exc}"
)
# tray callbacks
def _make_icon_image(size: int = 64):
if Image is None:
raise RuntimeError("Pillow is required for tray icon")
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
def _on_open_in_telegram(icon=None, item=None) -> None:
url = tg_proxy_url(_config)
margin = 2
draw.ellipse([margin, margin, size - margin, size - margin],
fill=(0, 136, 204, 255))
try:
font = ImageFont.truetype("arial.ttf", size=int(size * 0.55))
except Exception:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
tx = (size - tw) // 2 - bbox[0]
ty = (size - th) // 2 - bbox[1]
draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font)
return img
def _load_icon():
icon_path = Path(__file__).parent / "icon.ico"
if icon_path.exists() and Image:
try:
return Image.open(str(icon_path))
except Exception:
pass
return _make_icon_image()
def start_proxy():
_runtime.start_proxy(_config)
def stop_proxy():
_runtime.stop_proxy()
def restart_proxy():
_runtime.restart_proxy()
def _show_error(text: str, title: str = "TG WS Proxy — Ошибка"):
_user32.MessageBoxW(None, text, title, 0x10)
def _show_info(text: str, title: str = "TG WS Proxy"):
_user32.MessageBoxW(None, text, title, 0x40)
def _ask_open_release_page(latest_version: str, url: str) -> bool:
"""Win32 Yes/No: открыть страницу релиза."""
MB_YESNO = 0x4
MB_ICONQUESTION = 0x20
IDYES = 6
text = (
f"Доступна новая версия: {latest_version}\n\n"
f"Открыть страницу релиза в браузере?"
)
r = _user32.MessageBoxW(
None,
text,
"TG WS Proxy — обновление",
MB_YESNO | MB_ICONQUESTION,
)
return r == IDYES
def _maybe_notify_update_async():
"""
Фоновая проверка GitHub Releases и уведомление (не блокирует трей).
"""
def _work():
time.sleep(1.5)
if _exiting:
return
if not _config.get("check_updates", True):
return
try:
from utils.update_check import RELEASES_PAGE_URL, get_status, run_check
run_check(__version__)
st = get_status()
if not st.get("has_update"):
return
url = (st.get("html_url") or "").strip() or RELEASES_PAGE_URL
ver = st.get("latest") or "?"
if _ask_open_release_page(str(ver), url):
webbrowser.open(url)
except Exception as exc:
log.debug("Update check failed: %s", exc)
threading.Thread(target=_work, daemon=True, name="update-check").start()
def _on_open_in_telegram(icon=None, item=None):
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
url = f"tg://socks?server={host}&port={port}"
log.info("Opening %s", url)
try:
if not webbrowser.open(url):
raise RuntimeError
result = webbrowser.open(url)
if not result:
raise RuntimeError("webbrowser.open returned False")
except Exception:
log.info("Browser open failed, copying to clipboard")
if pyperclip is None:
_show_error(
"Не удалось открыть Telegram автоматически.\n\n"
f"Установите пакет pyperclip для копирования в буфер или откройте вручную:\n{url}"
)
f"Установите пакет pyperclip для копирования в буфер или откройте вручную:\n{url}")
return
try:
pyperclip.copy(url)
_show_info(
"Не удалось открыть Telegram автоматически.\n\n"
f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}"
)
f"Не удалось открыть Telegram автоматически.\n\n"
f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}",
"TG WS Proxy")
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_copy_link(icon=None, item=None) -> None:
url = tg_proxy_url(_config)
log.info("Copying link: %s", url)
if pyperclip is None:
_show_error(
"Установите пакет pyperclip для копирования в буфер обмена."
)
return
try:
pyperclip.copy(url)
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_restart(icon=None, item=None):
threading.Thread(target=restart_proxy, daemon=True).start()
def _on_restart(icon=None, item=None) -> None:
threading.Thread(
target=lambda: restart_proxy(_config, _show_error), daemon=True
).start()
def _on_edit_config(icon=None, item=None) -> None:
def _on_edit_config(icon=None, item=None):
threading.Thread(target=_edit_config_dialog, daemon=True).start()
def _on_open_logs(icon=None, item=None) -> None:
def _edit_config_dialog():
if ctk is None:
_show_error("customtkinter не установлен.")
return
cfg = dict(_config)
cfg["autostart"] = is_autostart_enabled()
# Make sure that the autostart key is removed if autostart
# is disabled, even if the executable file is moved.
if _supports_autostart() and not cfg["autostart"]:
set_autostart_enabled(False)
theme = ctk_theme_for_platform()
w, h = CONFIG_DIALOG_SIZE
if _supports_autostart():
h += 100
icon_path = str(Path(__file__).parent / "icon.ico")
root = create_ctk_root(
ctk,
title="TG WS Proxy — Настройки",
width=w,
height=h,
theme=theme,
after_create=lambda r: r.iconbitmap(icon_path),
)
fpx, fpy = CONFIG_DIALOG_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
scroll, footer = tray_settings_scroll_and_footer(ctk, frame, theme)
widgets = install_tray_config_form(
ctk,
scroll,
theme,
cfg,
DEFAULT_CONFIG,
show_autostart=_supports_autostart(),
autostart_value=cfg.get("autostart", False),
)
def on_save():
merged = validate_config_form(
widgets,
DEFAULT_CONFIG,
include_autostart=_supports_autostart(),
)
if isinstance(merged, str):
_show_error(merged)
return
new_cfg = merged
save_config(new_cfg)
_config.update(new_cfg)
log.info("Config saved: %s", new_cfg)
if _supports_autostart():
set_autostart_enabled(bool(new_cfg.get("autostart", False)))
_tray_icon.menu = _build_menu()
# Win32 MessageBox из того же потока, что и mainloop CTk, блокирует обработку Tcl/Tk
# и даёт зависание; tkinter.messagebox согласован с циклом окна.
from tkinter import messagebox
if messagebox.askyesno("Перезапустить?",
"Настройки сохранены.\n\n"
"Перезапустить прокси сейчас?",
parent=root):
root.destroy()
restart_proxy()
else:
root.destroy()
def on_cancel():
root.destroy()
install_tray_config_buttons(
ctk, footer, theme, on_save=on_save, on_cancel=on_cancel)
try:
root.mainloop()
finally:
import tkinter as tk
try:
if root.winfo_exists():
root.destroy()
except tk.TclError:
pass
def _on_open_logs(icon=None, item=None):
log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists():
os.startfile(str(LOG_FILE))
else:
_show_info("Файл логов ещё не создан.")
_show_info("Файл логов ещё не создан.", "TG WS Proxy")
def _on_exit(icon=None, item=None) -> None:
def _on_exit(icon=None, item=None):
global _exiting
if _exiting:
os._exit(0)
return
_exiting = True
log.info("User requested exit")
quit_ctk()
threading.Thread(target=lambda: (time.sleep(3), os._exit(0)), daemon=True, name="force-exit").start()
def _force_exit():
time.sleep(3)
os._exit(0)
threading.Thread(target=_force_exit, daemon=True, name="force-exit").start()
if icon:
icon.stop()
# settings dialog
def _edit_config_dialog() -> None:
if not ensure_ctk_thread(ctk):
_show_error("customtkinter не установлен.")
return
cfg = dict(_config)
cfg["autostart"] = is_autostart_enabled()
if _supports_autostart() and not cfg["autostart"]:
set_autostart_enabled(False)
def _build(done: threading.Event) -> None:
theme = ctk_theme_for_platform()
w, h = CONFIG_DIALOG_SIZE
if _supports_autostart():
h += 100
root = create_ctk_toplevel(
ctk, title="TG WS Proxy — Настройки", width=w, height=h, theme=theme,
after_create=lambda r: r.iconbitmap(ICON_PATH),
)
fpx, fpy = CONFIG_DIALOG_FRAME_PAD
frame = main_content_frame(ctk, root, theme, padx=fpx, pady=fpy)
scroll, footer = tray_settings_scroll_and_footer(ctk, frame, theme)
widgets = install_tray_config_form(
ctk, scroll, theme, cfg, DEFAULT_CONFIG,
show_autostart=_supports_autostart(),
autostart_value=cfg.get("autostart", False),
)
def _finish() -> None:
root.destroy()
done.set()
def on_save() -> None:
from tkinter import messagebox
merged = validate_config_form(widgets, DEFAULT_CONFIG, include_autostart=_supports_autostart())
if isinstance(merged, str):
messagebox.showerror("TG WS Proxy — Ошибка", merged, parent=root)
return
save_config(merged)
_config.update(merged)
log.info("Config saved: %s", merged)
if _supports_autostart():
set_autostart_enabled(bool(merged.get("autostart", False)))
_tray_icon.menu = _build_menu()
do_restart = messagebox.askyesno(
"Перезапустить?",
"Настройки сохранены.\n\nПерезапустить прокси сейчас?",
parent=root,
)
_finish()
if do_restart:
threading.Thread(target=lambda: restart_proxy(_config, _show_error), daemon=True).start()
root.protocol("WM_DELETE_WINDOW", _finish)
install_tray_config_buttons(ctk, footer, theme, on_save=on_save, on_cancel=_finish)
ctk_run_dialog(_build)
# first run
def _show_first_run() -> None:
ensure_dirs()
def _show_first_run():
_ensure_dirs()
if FIRST_RUN_MARKER.exists():
return
if not ensure_ctk_thread(ctk):
FIRST_RUN_MARKER.touch()
return
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
secret = _config.get("secret", DEFAULT_CONFIG["secret"])
def _build(done: threading.Event) -> None:
theme = ctk_theme_for_platform()
w, h = FIRST_RUN_SIZE
root = create_ctk_toplevel(
ctk, title="TG WS Proxy", width=w, height=h, theme=theme,
after_create=lambda r: r.iconbitmap(ICON_PATH),
)
if ctk is None:
FIRST_RUN_MARKER.touch()
return
def on_done(open_tg: bool) -> None:
FIRST_RUN_MARKER.touch()
root.destroy()
done.set()
if open_tg:
_on_open_in_telegram()
theme = ctk_theme_for_platform()
icon_path = str(Path(__file__).parent / "icon.ico")
w, h = FIRST_RUN_SIZE
root = create_ctk_root(
ctk,
title="TG WS Proxy",
width=w,
height=h,
theme=theme,
after_create=lambda r: r.iconbitmap(icon_path),
)
populate_first_run_window(ctk, root, theme, host=host, port=port, secret=secret, on_done=on_done)
def on_done(open_tg: bool):
FIRST_RUN_MARKER.touch()
root.destroy()
if open_tg:
_on_open_in_telegram()
ctk_run_dialog(_build)
populate_first_run_window(
ctk, root, theme, host=host, port=port, on_done=on_done)
try:
root.mainloop()
finally:
import tkinter as tk
try:
if root.winfo_exists():
root.destroy()
except tk.TclError:
pass
# tray menu
def _has_ipv6_enabled() -> bool:
import socket as _sock
try:
addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6)
for addr in addrs:
ip = addr[4][0]
if not ip or ip.startswith("::1"):
continue
try:
if ipaddress.IPv6Address(ip).is_link_local:
continue
except ValueError:
if ip.startswith("fe80:"):
continue
return True
except Exception:
pass
try:
s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM)
s.bind(('::1', 0))
s.close()
return True
except Exception:
return False
def _check_ipv6_warning():
_ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
if not _has_ipv6_enabled():
return
IPV6_WARN_MARKER.touch()
threading.Thread(target=_show_ipv6_dialog, daemon=True).start()
def _show_ipv6_dialog():
_show_info(
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает или в логах присутствуют ошибки, "
"связанные с попытками подключения по IPv6 - "
"попробуйте отключить в настройках прокси Telegram попытку соединения "
"по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 "
"в системе.\n\n"
"Это предупреждение будет показано только один раз.",
"TG WS Proxy")
def _build_menu():
if pystray is None:
return None
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
link_host = tg_ws_proxy.get_link_host(host)
return pystray.Menu(
pystray.MenuItem(f"Открыть в Telegram ({link_host}:{port})", _on_open_in_telegram, default=True),
pystray.MenuItem("Скопировать ссылку", _on_copy_link),
pystray.MenuItem(
f"Открыть в Telegram ({host}:{port})",
_on_open_in_telegram,
default=True),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Перезапустить прокси", _on_restart),
pystray.MenuItem("Настройки...", _on_edit_config),
@ -310,17 +618,23 @@ def _build_menu():
)
# entry point
def run_tray() -> None:
def run_tray():
global _tray_icon, _config
_config = load_config()
bootstrap(_config)
_config = _runtime.prepare()
_runtime.reset_log_file()
setup_logging(_config.get("verbose", False),
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]))
log.info("TG WS Proxy версия %s, tray app starting", __version__)
log.info("Config: %s", _config)
log.info("Log file: %s", LOG_FILE)
if pystray is None or Image is None or ctk is None:
log.error("pystray, Pillow or customtkinter not installed; running in console mode")
start_proxy(_config, _show_error)
log.error(
"pystray, Pillow or customtkinter not installed; "
"running in console mode")
start_proxy()
try:
while True:
time.sleep(1)
@ -328,12 +642,20 @@ def run_tray() -> None:
stop_proxy()
return
start_proxy(_config, _show_error)
maybe_notify_update(_config, lambda: _exiting, _ask_yes_no)
_show_first_run()
check_ipv6_warning(_show_info)
start_proxy()
_maybe_notify_update_async()
_show_first_run()
_check_ipv6_warning()
icon_image = _load_icon()
_tray_icon = pystray.Icon(
APP_NAME,
icon_image,
"TG WS Proxy",
menu=_build_menu())
_tray_icon = pystray.Icon(APP_NAME, load_icon(), "TG WS Proxy", menu=_build_menu())
log.info("Tray icon running")
_tray_icon.run()
@ -341,14 +663,15 @@ def run_tray() -> None:
log.info("Tray app exited")
def main() -> None:
if not acquire_lock("windows.py"):
def main():
if not _acquire_lock():
_show_info("Приложение уже запущено.", os.path.basename(sys.argv[0]))
return
try:
run_tray()
finally:
release_lock()
_release_lock()
if __name__ == "__main__":