36 Commits

Author SHA1 Message Date
Flowseal
afb7c5f56d revert keepalive mechanism 2026-03-22 08:00:14 +03:00
Flowseal
18a1bced83 logrotate #366; configurable pool and buffer sizes 2026-03-22 02:54:03 +03:00
Flowseal
ed85e2a284 keepalive for stale mitigation 2026-03-21 09:26:34 +03:00
Flowseal
c1452c23da Optimizations 2026-03-20 22:57:15 +03:00
Flowseal
6a80ca85e3 Optimizations 2026-03-19 22:07:47 +03:00
Flowseal
4ae7cb92f7 autostart fixes 2026-03-19 12:26:31 +03:00
HonoLite
7eeb447a76 add windows autostart (#171) 2026-03-19 11:27:59 +03:00
Flowseal
5d839c1112 fix for default dc options 2026-03-19 11:09:07 +03:00
Flowseal
0dc2a9cac6 built files rename 2026-03-19 07:43:42 +03:00
Flowseal
7943c539b6 .deb build test 2026-03-19 07:28:46 +03:00
Flowseal
5e53a8a470 unused import 2026-03-19 07:03:11 +03:00
pitoni
692157b0f5 Linux binary, github actions (#282) 2026-03-19 06:55:55 +03:00
Flowseal
26542558c6 dc fail logic rewrite for independent usability 2026-03-19 06:23:58 +03:00
Flowseal
e6ee4e6159 Hardcoded dc override for 203 2026-03-19 05:53:14 +03:00
Flowseal
96383057c6 dc203 for possible overriding 2026-03-19 05:42:40 +03:00
Flowseal
646468680c Speed improvements 2026-03-19 02:36:17 +03:00
Flowseal
51aca9009f removed req files 2026-03-18 22:03:57 +03:00
Flowseal
6b9ddda7f0 readme simplify 2026-03-18 21:58:35 +03:00
Flowseal
54c6f3881b pyproject fixes; macos support 2026-03-18 21:54:58 +03:00
delewer
99b5c722e1 build: migrate deps to pyproject.toml (#201) 2026-03-18 21:33:12 +03:00
kek.of
9924440c48 Update macos.py (#272) 2026-03-18 20:27:16 +03:00
Flowseal
7572258a28 MacOS build simplify, readme update 2026-03-18 19:22:46 +03:00
Flowseal
d2190cfec6 cffi universal2 fix 2026-03-18 18:15:06 +03:00
Flowseal
053ec3e00f Universal2 macos test 2026-03-18 18:11:07 +03:00
Flowseal
55affaf78f macos dialog fix; macos merge logs 2026-03-18 17:49:24 +03:00
Илья
533420b516 MacOS support (#225) 2026-03-18 17:33:38 +03:00
hir-lol
473078593a Merge pull request #244 from hir-lol/main 2026-03-18 01:40:09 +03:00
Flowseal
46011c0ff5 Github optional release on build 2026-03-17 22:18:21 +03:00
Flowseal
8219b9f144 pyinstaller changed to previous version for false detect prevention 2026-03-17 22:15:04 +03:00
Flowseal
cf3e3b2aec typos 2026-03-16 04:09:39 +03:00
unknown
3fdce27fbb Media chunking fix; Removed high number dc detection 2026-03-16 04:04:54 +03:00
Flowseal
1433c2e881 typo in readme 2026-03-15 15:55:23 +03:00
Flowseal
f774777539 Merge pull request #141 from nullptr-deref/main 2026-03-15 15:50:57 +03:00
Rostislav Tolushkin
b6cb5aa76f general typos fix 2026-03-15 15:29:19 +03:00
Flowseal
7574357db9 update readme 2026-03-15 14:17:07 +03:00
Flowseal
2571847a9e issue template 2026-03-15 14:10:04 +03:00
15 changed files with 2732 additions and 160 deletions

20
.github/ISSUE_TEMPLATE/bug_report.yml vendored Normal file
View File

@@ -0,0 +1,20 @@
name: 🐛 Проблема
title: '[Проблема] '
description: Сообщить о проблеме
labels: ['type: проблема', 'status: нуждается в сортировке']
body:
- type: textarea
id: description
attributes:
label: Опишите вашу проблему
description: Чётко опишите проблему с которой вы столкнулись
placeholder: Описание проблемы
validations:
required: true
- type: textarea
id: additions
attributes:
label: Дополнительные детали
description: Если у вас проблемы с работой прокси, то приложите файл логов в момент возникновения проблемы.

View File

@@ -3,9 +3,14 @@ name: Build & Release
on: on:
workflow_dispatch: workflow_dispatch:
inputs: inputs:
make_release:
description: 'Create Github Release?'
type: boolean
required: true
default: false
version: version:
description: "Release version tag (e.g. v1.0.0)" description: "Release version tag (e.g. v1.0.0)"
required: true required: false
default: "v1.0.0" default: "v1.0.0"
permissions: permissions:
@@ -25,20 +30,23 @@ jobs:
cache: "pip" cache: "pip"
- name: Install dependencies - name: Install dependencies
run: pip install -r requirements.txt run: pip install ".[win10]"
- name: Install pyinstaller - name: Install pyinstaller
run: pip install pyinstaller run: pip install "pyinstaller==6.13.0"
- name: Build EXE with PyInstaller - name: Build EXE with PyInstaller
run: pyinstaller packaging/windows.spec --noconfirm run: pyinstaller packaging/windows.spec --noconfirm
- name: Rename artifact
run: mv dist/TgWsProxy.exe dist/TgWsProxy_windows.exe
- name: Upload artifact - name: Upload artifact
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: TgWsProxy name: TgWsProxy
path: | path: |
dist/TgWsProxy.exe dist/TgWsProxy_windows.exe
build-win7: build-win7:
runs-on: windows-latest runs-on: windows-latest
@@ -53,7 +61,7 @@ jobs:
cache: "pip" cache: "pip"
- name: Install dependencies (Win7-compatible) - name: Install dependencies (Win7-compatible)
run: pip install -r requirements-win7.txt run: pip install ".[win7]"
- name: Install pyinstaller - name: Install pyinstaller
run: pip install "pyinstaller==5.13.2" run: pip install "pyinstaller==5.13.2"
@@ -62,17 +70,256 @@ jobs:
run: pyinstaller packaging/windows.spec --noconfirm run: pyinstaller packaging/windows.spec --noconfirm
- name: Rename artifact - name: Rename artifact
run: mv dist/TgWsProxy.exe dist/TgWsProxy-win7.exe run: mv dist/TgWsProxy.exe dist/TgWsProxy_windows_7.exe
- name: Upload artifact - name: Upload artifact
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: TgWsProxy-win7 name: TgWsProxy-win7
path: dist/TgWsProxy-win7.exe path: dist/TgWsProxy_windows_7.exe
build-macos:
runs-on: macos-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Install universal2 Python
run: |
set -euo pipefail
curl -LO https://www.python.org/ftp/python/3.12.10/python-3.12.10-macos11.pkg
sudo installer -pkg python-3.12.10-macos11.pkg -target /
echo "/Library/Frameworks/Python.framework/Versions/3.12/bin" >> "$GITHUB_PATH"
- name: Install dependencies
run: |
set -euo pipefail
python3.12 -m pip install --upgrade pip setuptools wheel
python3.12 -m pip install delocate==0.13.0
mkdir -p wheelhouse/arm64 wheelhouse/x86_64 wheelhouse/universal2
python3.12 -m pip download \
--only-binary=:all: \
--platform macosx_11_0_arm64 \
--python-version 3.12 \
--implementation cp \
-d wheelhouse/arm64 \
'cffi>=2.0.0' \
Pillow==12.1.0 \
psutil==7.0.0
python3.12 -m pip download \
--only-binary=:all: \
--platform macosx_10_13_x86_64 \
--python-version 3.12 \
--implementation cp \
-d wheelhouse/x86_64 \
'cffi>=2.0.0' \
Pillow==12.1.0
python3.12 -m pip download \
--only-binary=:all: \
--platform macosx_10_9_x86_64 \
--python-version 3.12 \
--implementation cp \
-d wheelhouse/x86_64 \
psutil==7.0.0
delocate-merge \
wheelhouse/arm64/cffi-*.whl \
wheelhouse/x86_64/cffi-*.whl \
-w wheelhouse/universal2
delocate-merge \
wheelhouse/arm64/pillow-12.1.0-*.whl \
wheelhouse/x86_64/pillow-12.1.0-*.whl \
-w wheelhouse/universal2
delocate-merge \
wheelhouse/arm64/psutil-7.0.0-*.whl \
wheelhouse/x86_64/psutil-7.0.0-*.whl \
-w wheelhouse/universal2
python3.12 -m pip install --no-deps wheelhouse/universal2/*.whl
python3.12 -m pip install ".[macos]"
python3.12 -m pip install pyinstaller==6.13.0
- name: Create macOS icon from ICO
run: |
set -euo pipefail
python3.12 - <<'PY'
from PIL import Image
image = Image.open('icon.ico')
image = image.resize((1024, 1024), Image.LANCZOS)
image.save('icon_1024.png', 'PNG')
PY
mkdir -p icon.iconset
sips -z 16 16 icon_1024.png --out icon.iconset/icon_16x16.png
sips -z 32 32 icon_1024.png --out icon.iconset/icon_16x16@2x.png
sips -z 32 32 icon_1024.png --out icon.iconset/icon_32x32.png
sips -z 64 64 icon_1024.png --out icon.iconset/icon_32x32@2x.png
sips -z 128 128 icon_1024.png --out icon.iconset/icon_128x128.png
sips -z 256 256 icon_1024.png --out icon.iconset/icon_128x128@2x.png
sips -z 256 256 icon_1024.png --out icon.iconset/icon_256x256.png
sips -z 512 512 icon_1024.png --out icon.iconset/icon_256x256@2x.png
sips -z 512 512 icon_1024.png --out icon.iconset/icon_512x512.png
sips -z 1024 1024 icon_1024.png --out icon.iconset/icon_512x512@2x.png
iconutil -c icns icon.iconset -o icon.icns
rm -rf icon.iconset icon_1024.png
- name: Build app with PyInstaller
run: python3.12 -m PyInstaller packaging/macos.spec --noconfirm
- name: Validate universal2 app bundle
run: |
set -euo pipefail
found=0
while IFS= read -r -d '' file; do
if file "$file" | grep -q "Mach-O"; then
found=1
archs="$(lipo -archs "$file" 2>/dev/null || true)"
case "$archs" in
*arm64*x86_64*|*x86_64*arm64*) ;;
*)
echo "Missing universal2 slices in $file: ${archs:-unknown}" >&2
exit 1
;;
esac
fi
done < <(find "dist/TG WS Proxy.app" -type f -print0)
if [ "$found" -eq 0 ]; then
echo "No Mach-O files found in app bundle" >&2
exit 1
fi
- name: Create DMG
run: |
set -euo pipefail
APP_NAME="TG WS Proxy"
DMG_TEMP="dist/dmg_temp"
rm -rf "$DMG_TEMP"
mkdir -p "$DMG_TEMP"
cp -R "dist/${APP_NAME}.app" "$DMG_TEMP/"
ln -s /Applications "$DMG_TEMP/Applications"
hdiutil create \
-volname "$APP_NAME" \
-srcfolder "$DMG_TEMP" \
-ov \
-format UDZO \
"dist/TgWsProxy_macos_universal.dmg"
rm -rf "$DMG_TEMP"
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: TgWsProxy-macOS
path: dist/TgWsProxy_macos_universal.dmg
build-linux:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y \
python3-venv \
python3-dev \
python3-gi \
gir1.2-ayatanaappindicator3-0.1 \
python3-tk
- name: Create venv with system site-packages
run: python3 -m venv --system-site-packages .venv
- name: Install dependencies
run: |
.venv/bin/pip install --upgrade pip
.venv/bin/pip install ".[linux]"
.venv/bin/pip install "pyinstaller==6.13.0"
- name: Build binary with PyInstaller
run: .venv/bin/pyinstaller packaging/linux.spec --noconfirm
- name: Rename binary artifact
run: mv dist/TgWsProxy dist/TgWsProxy_linux_amd64
- name: Create .deb package
run: |
set -euo pipefail
VERSION="${{ github.event.inputs.version }}"
VERSION="${VERSION#v}"
PKG_ROOT="pkg"
rm -rf "$PKG_ROOT"
mkdir -p \
"$PKG_ROOT/DEBIAN" \
"$PKG_ROOT/usr/bin" \
"$PKG_ROOT/usr/share/applications" \
"$PKG_ROOT/usr/share/icons/hicolor/256x256/apps"
install -m 755 dist/TgWsProxy_linux_amd64 "$PKG_ROOT/usr/bin/tg-ws-proxy"
.venv/bin/python - <<PY
from PIL import Image
Image.open("icon.ico").save(
"${PKG_ROOT}/usr/share/icons/hicolor/256x256/apps/tg-ws-proxy.png",
"PNG",
)
PY
cat > "$PKG_ROOT/usr/share/applications/tg-ws-proxy.desktop" <<EOF
[Desktop Entry]
Type=Application
Name=TG WS Proxy
GenericName=Telegram Proxy
Comment=Telegram Desktop WebSocket Bridge Proxy
Exec=tg-ws-proxy
Icon=tg-ws-proxy
Terminal=false
Categories=Network;
StartupNotify=true
Keywords=telegram;proxy;websocket;
EOF
cat > "$PKG_ROOT/DEBIAN/control" <<EOF
Package: tg-ws-proxy
Version: ${VERSION}
Section: net
Priority: optional
Architecture: amd64
Maintainer: Flowseal
Depends: libgtk-3-0, libayatana-appindicator3-1, python3-tk
Description: Telegram Desktop WebSocket Bridge Proxy
SOCKS5/WebSocket bridge proxy for Telegram Desktop with tray UI.
EOF
dpkg-deb --build --root-owner-group \
"$PKG_ROOT" \
"dist/TgWsProxy_linux_amd64.deb"
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: TgWsProxy-linux
path: |
dist/TgWsProxy_linux_amd64
dist/TgWsProxy_linux_amd64.deb
release: release:
needs: [build, build-win7] needs: [build, build-win7, build-macos, build-linux]
runs-on: ubuntu-latest runs-on: ubuntu-latest
if: ${{ github.event.inputs.make_release == 'true' }}
steps: steps:
- name: Download main build - name: Download main build
uses: actions/download-artifact@v4 uses: actions/download-artifact@v4
@@ -86,6 +333,18 @@ jobs:
name: TgWsProxy-win7 name: TgWsProxy-win7
path: dist path: dist
- name: Download macOS build
uses: actions/download-artifact@v4
with:
name: TgWsProxy-macOS
path: dist
- name: Download Linux build
uses: actions/download-artifact@v4
with:
name: TgWsProxy-linux
path: dist
- name: Create GitHub Release - name: Create GitHub Release
uses: softprops/action-gh-release@v2 uses: softprops/action-gh-release@v2
with: with:
@@ -94,9 +353,12 @@ jobs:
body: | body: |
## TG WS Proxy ${{ github.event.inputs.version }} ## TG WS Proxy ${{ github.event.inputs.version }}
files: | files: |
dist/TgWsProxy.exe dist/TgWsProxy_windows.exe
dist/TgWsProxy-win7.exe dist/TgWsProxy_windows_7.exe
dist/TgWsProxy_macos_universal.dmg
dist/TgWsProxy_linux_amd64
dist/TgWsProxy_linux_amd64.deb
draft: false draft: false
prerelease: false prerelease: false
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

1
.gitignore vendored
View File

@@ -27,3 +27,4 @@ scan_ips.py
scan.txt scan.txt
AyuGramDesktop-dev/ AyuGramDesktop-dev/
tweb-master/ tweb-master/
/icon.icns

122
README.md
View File

@@ -1,53 +1,113 @@
> [!CAUTION]
>
> ### Реакция антивирусов
>
> Windows Defender часто ошибочно помечает приложение как **Wacatac**.
> Если вы не можете скачать из-за блокировки, то:
>
> 1) Попробуйте скачать версию win7 (она ничем не отличается в плане функционала)
> 2) Отключите антивирус на время скачивания, добавьте файл в исключения и включите обратно
>
> **Всегда проверяйте, что скачиваете из интернета, тем более из непроверенных источников. Всегда лучше смотреть на детекты широко известных антивирусов на VirusTotal**
# TG WS Proxy # TG WS Proxy
Локальный SOCKS5-прокси для Telegram Desktop, который перенаправляет трафик через WebSocket-соединения к указанным серверам, помогая частично ускорить работу Telegram. **Локальный SOCKS5-прокси** для Telegram Desktop, который **ускоряет работу Telegram**, перенаправляя трафик через WebSocket-соединения. Данные передаются в том же зашифрованном виде, а для работы не нужны сторонние сервера.
**Ожидаемый результат аналогичен прокидыванию hosts для Web Telegram**: ускорение загрузки и скачивания файлов, загрузки сообщений и части медиа.
<img width="529" height="487" alt="image" src="https://github.com/user-attachments/assets/6a4cf683-0df8-43af-86c1-0e8f08682b62" /> <img width="529" height="487" alt="image" src="https://github.com/user-attachments/assets/6a4cf683-0df8-43af-86c1-0e8f08682b62" />
## Как это работает ## Как это работает
``` ```
Telegram Desktop → SOCKS5 (127.0.0.1:1080) → TG WS Proxy → WSS (kws*.web.telegram.org) → Telegram DC Telegram Desktop → SOCKS5 (127.0.0.1:1080) → TG WS Proxy → WSS → Telegram DC
``` ```
1. Приложение поднимает локальный SOCKS5-прокси на `127.0.0.1:1080` 1. Приложение поднимает локальный SOCKS5-прокси на `127.0.0.1:1080`
2. Перехватывает подключения к IP-адресам Telegram 2. Перехватывает подключения к IP-адресам Telegram
3. Извлекает DC ID из MTProto obfuscation init-пакета 3. Извлекает DC ID из MTProto obfuscation init-пакета
4. Устанавливает WebSocket (TLS) соединение к соответствующему DC через домены `kws{N}.web.telegram.org` 4. Устанавливает WebSocket (TLS) соединение к соответствующему DC через домены Telegram
5. Если WS недоступен (302 redirect) — автоматически переключается на прямое TCP-соединение 5. Если WS недоступен (302 redirect) — автоматически переключается на прямое TCP-соединение
## 🚀 Быстрый старт ## 🚀 Быстрый старт
### Windows ### Windows
Перейдите на [страницу релизов](https://github.com/Flowseal/tg-ws-proxy/releases) и скачайте **`TgWsProxy.exe`**. Он собирается автоматически через [Github Actions](https://github.com/Flowseal/tg-ws-proxy/actions) из открытого исходного кода.
Перейдите на [страницу релизов](https://github.com/Flowseal/tg-ws-proxy/releases) и скачайте **`TgWsProxy_windows.exe`**. Он собирается автоматически через [Github Actions](https://github.com/Flowseal/tg-ws-proxy/actions) из открытого исходного кода.
При первом запуске откроется окно с инструкцией по подключению Telegram Desktop. Приложение сворачивается в системный трей. При первом запуске откроется окно с инструкцией по подключению Telegram Desktop. Приложение сворачивается в системный трей.
**Меню трея:** **Меню трея:**
- **Открыть в Telegram** — автоматически настроить прокси через `tg://socks` ссылку - **Открыть в Telegram** — автоматически настроить прокси через `tg://socks` ссылку
- **Перезапустить прокси** — перезапуск без выхода из приложения - **Перезапустить прокси** — перезапуск без выхода из приложения
- **Настройки...** — GUI-редактор конфигурации - **Настройки...** — GUI-редактор конфигурации
- **Открыть логи** — открыть файл логов - **Открыть логи** — открыть файл логов
- **Выход** — остановить прокси и закрыть приложение - **Выход** — остановить прокси и закрыть приложение
### macOS
Перейдите на [страницу релизов](https://github.com/Flowseal/tg-ws-proxy/releases) и скачайте **`TgWsProxy_macos_universal.dmg`** — универсальная сборка для Apple Silicon и Intel.
1. Открыть образ
2. Перенести **TG WS Proxy.app** в папку **Applications**
3. При первом запуске macOS может попросить подтвердить открытие: **Системные настройки → Конфиденциальность и безопасность → Всё равно открыть**
### Linux
Для Debian/Ubuntu скачайте со [страницы релизов](https://github.com/Flowseal/tg-ws-proxy/releases) пакет **`TgWsProxy_linux_amd64.deb`**.
Для остальных дистрибутивов можно использовать **`TgWsProxy_linux_amd64`** (бинарный файл для x86_64).
```bash
chmod +x TgWsProxy_linux_amd64
./TgWsProxy_linux_amd64
```
При первом запуске откроется окно с инструкцией. Приложение работает в системном трее (требуется AppIndicator).
## Установка из исходников ## Установка из исходников
### Консольный proxy
Для запуска только SOCKS5/WebSocket proxy без tray-интерфейса достаточно базовой установки:
```bash ```bash
pip install -r requirements.txt pip install -e .
tg-ws-proxy
``` ```
### Windows (Tray-приложение) ### Windows 10+
```bash ```bash
python windows.py pip install -e ".[win10]"
tg-ws-proxy-tray-win
``` ```
### Консольный режим ### Windows 7
```bash ```bash
python proxy/tg_ws_proxy.py [--port PORT] [--dc-ip DC:IP ...] [-v] pip install -e ".[win7]"
tg-ws-proxy-tray-win
```
### macOS
```bash
pip install -e ".[macos]"
tg-ws-proxy-tray-macos
```
### Linux
```bash
pip install -e ".[linux]"
tg-ws-proxy-tray-linux
```
### Консольный режим из исходников
```bash
tg-ws-proxy [--port PORT] [--host HOST] [--dc-ip DC:IP ...] [-v]
``` ```
**Аргументы:** **Аргументы:**
@@ -55,6 +115,7 @@ python proxy/tg_ws_proxy.py [--port PORT] [--dc-ip DC:IP ...] [-v]
| Аргумент | По умолчанию | Описание | | Аргумент | По умолчанию | Описание |
|---|---|---| |---|---|---|
| `--port` | `1080` | Порт SOCKS5-прокси | | `--port` | `1080` | Порт SOCKS5-прокси |
| `--host` | `127.0.0.1` | Хост SOCKS5-прокси |
| `--dc-ip` | `2:149.154.167.220`, `4:149.154.167.220` | Целевой IP для DC (можно указать несколько раз) | | `--dc-ip` | `2:149.154.167.220`, `4:149.154.167.220` | Целевой IP для DC (можно указать несколько раз) |
| `-v`, `--verbose` | выкл. | Подробное логирование (DEBUG) | | `-v`, `--verbose` | выкл. | Подробное логирование (DEBUG) |
@@ -62,13 +123,27 @@ python proxy/tg_ws_proxy.py [--port PORT] [--dc-ip DC:IP ...] [-v]
```bash ```bash
# Стандартный запуск # Стандартный запуск
python proxy/tg_ws_proxy.py tg-ws-proxy
# Другой порт и дополнительные DC # Другой порт и дополнительные DC
python proxy/tg_ws_proxy.py --port 9050 --dc-ip 1:149.154.175.205 --dc-ip 2:149.154.167.220 tg-ws-proxy --port 9050 --dc-ip 1:149.154.175.205 --dc-ip 2:149.154.167.220
# С подробным логированием # С подробным логированием
python proxy/tg_ws_proxy.py -v tg-ws-proxy -v
```
## CLI-скрипты (pyproject.toml)
CLI команды объявляются в `pyproject.toml` в секции `[project.scripts]` и должны указывать на `module:function`.
Пример:
```toml
[project.scripts]
tg-ws-proxy = "proxy.tg_ws_proxy:main"
tg-ws-proxy-tray-win = "windows:main"
tg-ws-proxy-tray-macos = "macos:main"
tg-ws-proxy-tray-linux = "linux:main"
``` ```
## Настройка Telegram Desktop ## Настройка Telegram Desktop
@@ -88,7 +163,11 @@ python proxy/tg_ws_proxy.py -v
## Конфигурация ## Конфигурация
Tray-приложение хранит данные в `%APPDATA%/TgWsProxy`: Tray-приложение хранит данные в:
- **Windows:** `%APPDATA%/TgWsProxy`
- **macOS:** `~/Library/Application Support/TgWsProxy`
- **Linux:** `~/.config/TgWsProxy` (или `$XDG_CONFIG_HOME/TgWsProxy`)
```json ```json
{ {
@@ -103,12 +182,15 @@ Tray-приложение хранит данные в `%APPDATA%/TgWsProxy`:
## Автоматическая сборка ## Автоматическая сборка
Проект содержит спецификацию PyInstaller ([`windows.spec`](packaging/windows.spec)) и GitHub Actions workflow ([`.github/workflows/build.yml`](.github/workflows/build.yml)) для автоматической сборки. Проект содержит спецификации PyInstaller ([`packaging/windows.spec`](packaging/windows.spec), [`packaging/macos.spec`](packaging/macos.spec), [`packaging/linux.spec`](packaging/linux.spec)) и GitHub Actions workflow ([`.github/workflows/build.yml`](.github/workflows/build.yml)) для автоматической сборки.
```bash Минимально поддерживаемые версии ОС для текущих бинарных сборок:
pip install pyinstaller
pyinstaller packaging/windows.spec - Windows 10+ для `TgWsProxy_windows.exe`
``` - Windows 7 для `TgWsProxy_windows_7.exe`
- Intel macOS 10.15+
- Apple Silicon macOS 11.0+
- Linux x86_64 (требуется AppIndicator для системного трея)
## Лицензия ## Лицензия

871
linux.py Normal file
View File

@@ -0,0 +1,871 @@
from __future__ import annotations
import asyncio as _asyncio
import json
import logging
import logging.handlers
import os
import subprocess
import sys
import threading
import time
from pathlib import Path
from typing import Dict, Optional
import customtkinter as ctk
import psutil
import pyperclip
import pystray
from PIL import Image, ImageDraw, ImageFont
import proxy.tg_ws_proxy as tg_ws_proxy
APP_NAME = "TgWsProxy"
APP_DIR = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / APP_NAME
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
DEFAULT_CONFIG = {
"port": 1080,
"host": "127.0.0.1",
"dc_ip": ["2:149.154.167.220", "4:149.154.167.220"],
"verbose": False,
"log_max_mb": 5,
"buf_kb": 256,
"pool_size": 4,
}
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[object] = None
_tray_icon: Optional[object] = None
_config: dict = {}
_exiting: bool = False
_lock_file_path: Optional[Path] = None
log = logging.getLogger("tg-ws-tray")
def _same_process(lock_meta: dict, proc: psutil.Process) -> bool:
try:
lock_ct = float(lock_meta.get("create_time", 0.0))
proc_ct = float(proc.create_time())
if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0:
return False
except Exception:
return False
try:
cmdline = proc.cmdline()
for arg in cmdline:
if "linux.py" in arg:
return True
except Exception:
pass
frozen = bool(getattr(sys, "frozen", False))
if frozen:
return APP_NAME.lower() in proc.name().lower()
return False
def _release_lock():
global _lock_file_path
if not _lock_file_path:
return
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
def _acquire_lock() -> bool:
global _lock_file_path
_ensure_dirs()
lock_files = list(APP_DIR.glob("*.lock"))
for f in lock_files:
pid = None
meta: dict = {}
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
meta = {}
try:
proc = psutil.Process(pid)
if _same_process(meta, proc):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
payload = {
"create_time": proc.create_time(),
}
lock_file.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
def _ensure_dirs():
APP_DIR.mkdir(parents=True, exist_ok=True)
def load_config() -> dict:
_ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict):
_ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
def setup_logging(verbose: bool = False, log_max_mb: float = 5):
_ensure_dirs()
root = logging.getLogger()
root.setLevel(logging.DEBUG if verbose else logging.INFO)
fh = logging.handlers.RotatingFileHandler(
str(LOG_FILE),
maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024),
backupCount=0,
encoding='utf-8',
)
fh.setLevel(logging.DEBUG)
fh.setFormatter(
logging.Formatter(
"%(asctime)s %(levelname)-5s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
root.addHandler(fh)
if not getattr(sys, "frozen", False):
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG if verbose else logging.INFO)
ch.setFormatter(
logging.Formatter(
"%(asctime)s %(levelname)-5s %(message)s", datefmt="%H:%M:%S"
)
)
root.addHandler(ch)
def _make_icon_image(size: int = 64):
if Image is None:
raise RuntimeError("Pillow is required for tray icon")
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = 2
draw.ellipse(
[margin, margin, size - margin, size - margin], fill=(0, 136, 204, 255)
)
try:
font = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
size=int(size * 0.55),
)
except Exception:
try:
font = ImageFont.truetype(
"/usr/share/fonts/TTF/DejaVuSans-Bold.ttf", size=int(size * 0.55)
)
except Exception:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
tx = (size - tw) // 2 - bbox[0]
ty = (size - th) // 2 - bbox[1]
draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font)
return img
def _load_icon():
icon_path = Path(__file__).parent / "icon.ico"
if icon_path.exists() and Image:
try:
return Image.open(str(icon_path))
except Exception:
pass
return _make_icon_image()
def _run_proxy_thread(
port: int, dc_opt: Dict[int, str], verbose: bool, host: str = "127.0.0.1"
):
global _async_stop
loop = _asyncio.new_event_loop()
_asyncio.set_event_loop(loop)
stop_ev = _asyncio.Event()
_async_stop = (loop, stop_ev)
try:
loop.run_until_complete(
tg_ws_proxy._run(port, dc_opt, stop_event=stop_ev, host=host)
)
except Exception as exc:
log.error("Proxy thread crashed: %s", exc)
if "Address already in use" in str(exc):
_show_error(
"Не удалось запустить прокси:\nПорт уже используется другим приложением.\n\nЗакройте приложение, использующее этот порт, или измените порт в настройках прокси и перезапустите."
)
finally:
loop.close()
_async_stop = None
def start_proxy():
global _proxy_thread, _config
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
cfg = _config
port = cfg.get("port", DEFAULT_CONFIG["port"])
host = cfg.get("host", DEFAULT_CONFIG["host"])
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
verbose = cfg.get("verbose", False)
try:
dc_opt = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
_show_error(f"Ошибка конфигурации:\n{e}")
return
log.info("Starting proxy on %s:%d ...", host, port)
buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])
pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])
tg_ws_proxy._RECV_BUF = max(4, buf_kb) * 1024
tg_ws_proxy._SEND_BUF = tg_ws_proxy._RECV_BUF
tg_ws_proxy._WS_POOL_SIZE = max(0, pool_size)
_proxy_thread = threading.Thread(
target=_run_proxy_thread,
args=(port, dc_opt, verbose, host),
daemon=True,
name="proxy",
)
_proxy_thread.start()
def stop_proxy():
global _proxy_thread, _async_stop
if _async_stop:
loop, stop_ev = _async_stop
loop.call_soon_threadsafe(stop_ev.set)
if _proxy_thread:
_proxy_thread.join(timeout=2)
_proxy_thread = None
log.info("Proxy stopped")
def restart_proxy():
log.info("Restarting proxy...")
stop_proxy()
time.sleep(0.3)
start_proxy()
def _show_error(text: str, title: str = "TG WS Proxy — Ошибка"):
import tkinter as _tk
from tkinter import messagebox as _mb
root = _tk.Tk()
root.withdraw()
_mb.showerror(title, text, parent=root)
root.destroy()
def _show_info(text: str, title: str = "TG WS Proxy"):
import tkinter as _tk
from tkinter import messagebox as _mb
root = _tk.Tk()
root.withdraw()
_mb.showinfo(title, text, parent=root)
root.destroy()
def _on_open_in_telegram(icon=None, item=None):
port = _config.get("port", DEFAULT_CONFIG["port"])
url = f"tg://socks?server=127.0.0.1&port={port}"
log.info("Copying %s", url)
try:
pyperclip.copy(url)
_show_info(
f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}",
"TG WS Proxy",
)
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_restart(icon=None, item=None):
threading.Thread(target=restart_proxy, daemon=True).start()
def _on_edit_config(icon=None, item=None):
threading.Thread(target=_edit_config_dialog, daemon=True).start()
def _edit_config_dialog():
if ctk is None:
_show_error("customtkinter не установлен.")
return
cfg = dict(_config)
ctk.set_appearance_mode("light")
ctk.set_default_color_theme("blue")
root = ctk.CTk()
root.title("TG WS Proxy — Настройки")
root.resizable(False, False)
root.attributes("-topmost", True)
icon_img = _load_icon()
if icon_img:
from PIL import ImageTk
_photo = ImageTk.PhotoImage(icon_img.resize((64, 64)))
root.iconphoto(False, _photo)
TG_BLUE = "#3390ec"
TG_BLUE_HOVER = "#2b7cd4"
BG = "#ffffff"
FIELD_BG = "#f0f2f5"
FIELD_BORDER = "#d6d9dc"
TEXT_PRIMARY = "#000000"
TEXT_SECONDARY = "#707579"
FONT_FAMILY = "Sans"
w, h = 420, 540
sw = root.winfo_screenwidth()
sh = root.winfo_screenheight()
root.geometry(f"{w}x{h}+{(sw - w) // 2}+{(sh - h) // 2}")
root.configure(fg_color=BG)
frame = ctk.CTkFrame(root, fg_color=BG, corner_radius=0)
frame.pack(fill="both", expand=True, padx=24, pady=20)
# Host
ctk.CTkLabel(
frame,
text="IP-адрес прокси",
font=(FONT_FAMILY, 13),
text_color=TEXT_PRIMARY,
anchor="w",
).pack(anchor="w", pady=(0, 4))
host_var = ctk.StringVar(value=cfg.get("host", "127.0.0.1"))
host_entry = ctk.CTkEntry(
frame,
textvariable=host_var,
width=200,
height=36,
font=(FONT_FAMILY, 13),
corner_radius=10,
fg_color=FIELD_BG,
border_color=FIELD_BORDER,
border_width=1,
text_color=TEXT_PRIMARY,
)
host_entry.pack(anchor="w", pady=(0, 12))
# Port
ctk.CTkLabel(
frame,
text="Порт прокси",
font=(FONT_FAMILY, 13),
text_color=TEXT_PRIMARY,
anchor="w",
).pack(anchor="w", pady=(0, 4))
port_var = ctk.StringVar(value=str(cfg.get("port", 1080)))
port_entry = ctk.CTkEntry(
frame,
textvariable=port_var,
width=120,
height=36,
font=(FONT_FAMILY, 13),
corner_radius=10,
fg_color=FIELD_BG,
border_color=FIELD_BORDER,
border_width=1,
text_color=TEXT_PRIMARY,
)
port_entry.pack(anchor="w", pady=(0, 12))
# DC-IP mappings
ctk.CTkLabel(
frame,
text="DC → IP маппинги (по одному на строку, формат DC:IP)",
font=(FONT_FAMILY, 13),
text_color=TEXT_PRIMARY,
anchor="w",
).pack(anchor="w", pady=(0, 4))
dc_textbox = ctk.CTkTextbox(
frame,
width=370,
height=120,
font=("Monospace", 12),
corner_radius=10,
fg_color=FIELD_BG,
border_color=FIELD_BORDER,
border_width=1,
text_color=TEXT_PRIMARY,
)
dc_textbox.pack(anchor="w", pady=(0, 12))
dc_textbox.insert("1.0", "\n".join(cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])))
# Verbose
verbose_var = ctk.BooleanVar(value=cfg.get("verbose", False))
ctk.CTkCheckBox(
frame,
text="Подробное логирование (verbose)",
variable=verbose_var,
font=(FONT_FAMILY, 13),
text_color=TEXT_PRIMARY,
fg_color=TG_BLUE,
hover_color=TG_BLUE_HOVER,
corner_radius=6,
border_width=2,
border_color=FIELD_BORDER,
).pack(anchor="w", pady=(0, 8))
# Advanced: buf_kb, pool_size, log_max_mb
adv_frame = ctk.CTkFrame(frame, fg_color="transparent")
adv_frame.pack(anchor="w", fill="x", pady=(4, 8))
for col, (lbl, key, w_) in enumerate([
("Буфер (KB, 256 default)", "buf_kb", 120),
("WS пулов (4 default)", "pool_size", 120),
("Log size (MB, 5 def)", "log_max_mb", 120),
]):
col_frame = ctk.CTkFrame(adv_frame, fg_color="transparent")
col_frame.pack(side="left", padx=(0, 10))
ctk.CTkLabel(col_frame, text=lbl, font=(FONT_FAMILY, 11),
text_color=TEXT_SECONDARY, anchor="w").pack(anchor="w")
ctk.CTkEntry(col_frame, width=w_, height=30, font=(FONT_FAMILY, 12),
corner_radius=8, fg_color=FIELD_BG,
border_color=FIELD_BORDER, border_width=1,
text_color=TEXT_PRIMARY,
textvariable=ctk.StringVar(
value=str(cfg.get(key, DEFAULT_CONFIG[key]))
)).pack(anchor="w")
_adv_entries = list(adv_frame.winfo_children())
_adv_keys = ["buf_kb", "pool_size", "log_max_mb"]
def on_save():
import socket as _sock
host_val = host_var.get().strip()
try:
_sock.inet_aton(host_val)
except OSError:
_show_error("Некорректный IP-адрес.")
return
try:
port_val = int(port_var.get().strip())
if not (1 <= port_val <= 65535):
raise ValueError
except ValueError:
_show_error("Порт должен быть числом 1-65535")
return
lines = [
l.strip()
for l in dc_textbox.get("1.0", "end").strip().splitlines()
if l.strip()
]
try:
tg_ws_proxy.parse_dc_ip_list(lines)
except ValueError as e:
_show_error(str(e))
return
new_cfg = {
"host": host_val,
"port": port_val,
"dc_ip": lines,
"verbose": verbose_var.get(),
}
for i, key in enumerate(_adv_keys):
col_frame = _adv_entries[i]
entry = col_frame.winfo_children()[1]
try:
val = float(entry.get().strip())
if key in ("buf_kb", "pool_size"):
val = int(val)
new_cfg[key] = val
except ValueError:
new_cfg[key] = DEFAULT_CONFIG[key]
save_config(new_cfg)
_config.update(new_cfg)
log.info("Config saved: %s", new_cfg)
_tray_icon.menu = _build_menu()
from tkinter import messagebox
if messagebox.askyesno(
"Перезапустить?",
"Настройки сохранены.\n\nПерезапустить прокси сейчас?",
parent=root,
):
root.destroy()
restart_proxy()
else:
root.destroy()
def on_cancel():
root.destroy()
btn_frame = ctk.CTkFrame(frame, fg_color="transparent")
btn_frame.pack(fill="x", pady=(20, 0))
ctk.CTkButton(btn_frame, text="Сохранить", height=38,
font=(FONT_FAMILY, 14, "bold"), corner_radius=10,
fg_color=TG_BLUE, hover_color=TG_BLUE_HOVER,
text_color="#ffffff",
command=on_save).pack(side="left", fill="x", expand=True, padx=(0, 8))
ctk.CTkButton(btn_frame, text="Отмена", height=38,
font=(FONT_FAMILY, 14), corner_radius=10,
fg_color=FIELD_BG, hover_color=FIELD_BORDER,
text_color=TEXT_PRIMARY, border_width=1,
border_color=FIELD_BORDER,
command=on_cancel).pack(side="right", fill="x", expand=True)
root.mainloop()
def _on_open_logs(icon=None, item=None):
log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists():
env = os.environ.copy()
env.pop("VIRTUAL_ENV", None)
env.pop("PYTHONPATH", None)
env.pop("PYTHONHOME", None)
subprocess.Popen(
["xdg-open", str(LOG_FILE)],
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
start_new_session=True,
)
else:
_show_info("Файл логов ещё не создан.", "TG WS Proxy")
def _on_exit(icon=None, item=None):
global _exiting
if _exiting:
os._exit(0)
return
_exiting = True
log.info("User requested exit")
def _force_exit():
time.sleep(3)
os._exit(0)
threading.Thread(target=_force_exit, daemon=True, name="force-exit").start()
if icon:
icon.stop()
def _show_first_run():
_ensure_dirs()
if FIRST_RUN_MARKER.exists():
return
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
tg_url = f"tg://socks?server={host}&port={port}"
if ctk is None:
FIRST_RUN_MARKER.touch()
return
ctk.set_appearance_mode("light")
ctk.set_default_color_theme("blue")
TG_BLUE = "#3390ec"
TG_BLUE_HOVER = "#2b7cd4"
BG = "#ffffff"
FIELD_BG = "#f0f2f5"
FIELD_BORDER = "#d6d9dc"
TEXT_PRIMARY = "#000000"
TEXT_SECONDARY = "#707579"
FONT_FAMILY = "Sans"
root = ctk.CTk()
root.title("TG WS Proxy")
root.resizable(False, False)
root.attributes("-topmost", True)
icon_img = _load_icon()
if icon_img:
from PIL import ImageTk
_photo = ImageTk.PhotoImage(icon_img.resize((64, 64)))
root.iconphoto(False, _photo)
w, h = 520, 440
sw = root.winfo_screenwidth()
sh = root.winfo_screenheight()
root.geometry(f"{w}x{h}+{(sw - w) // 2}+{(sh - h) // 2}")
root.configure(fg_color=BG)
frame = ctk.CTkFrame(root, fg_color=BG, corner_radius=0)
frame.pack(fill="both", expand=True, padx=28, pady=24)
title_frame = ctk.CTkFrame(frame, fg_color="transparent")
title_frame.pack(anchor="w", pady=(0, 16), fill="x")
# Blue accent bar
accent_bar = ctk.CTkFrame(
title_frame, fg_color=TG_BLUE, width=4, height=32, corner_radius=2
)
accent_bar.pack(side="left", padx=(0, 12))
ctk.CTkLabel(
title_frame,
text="Прокси запущен и работает в системном трее",
font=(FONT_FAMILY, 17, "bold"),
text_color=TEXT_PRIMARY,
).pack(side="left")
# Info sections
sections = [
("Как подключить Telegram Desktop:", True),
(" Автоматически:", True),
(f" ПКМ по иконке в трее → «Открыть в Telegram»", False),
(f" Или ссылка: {tg_url}", False),
("\n Вручную:", True),
(" Настройки → Продвинутые → Тип подключения → Прокси", False),
(f" SOCKS5 → {host} : {port} (без логина/пароля)", False),
]
for text, bold in sections:
weight = "bold" if bold else "normal"
ctk.CTkLabel(
frame,
text=text,
font=(FONT_FAMILY, 13, weight),
text_color=TEXT_PRIMARY,
anchor="w",
justify="left",
).pack(anchor="w", pady=1)
# Spacer
ctk.CTkFrame(frame, fg_color="transparent", height=16).pack()
# Separator
ctk.CTkFrame(frame, fg_color=FIELD_BORDER, height=1, corner_radius=0).pack(
fill="x", pady=(0, 12)
)
# Checkbox
auto_var = ctk.BooleanVar(value=True)
ctk.CTkCheckBox(
frame,
text="Открыть прокси в Telegram сейчас",
variable=auto_var,
font=(FONT_FAMILY, 13),
text_color=TEXT_PRIMARY,
fg_color=TG_BLUE,
hover_color=TG_BLUE_HOVER,
corner_radius=6,
border_width=2,
border_color=FIELD_BORDER,
).pack(anchor="w", pady=(0, 16))
def on_ok():
FIRST_RUN_MARKER.touch()
open_tg = auto_var.get()
root.destroy()
if open_tg:
_on_open_in_telegram()
ctk.CTkButton(
frame,
text="Начать",
width=180,
height=42,
font=(FONT_FAMILY, 15, "bold"),
corner_radius=10,
fg_color=TG_BLUE,
hover_color=TG_BLUE_HOVER,
text_color="#ffffff",
command=on_ok,
).pack(pady=(0, 0))
root.protocol("WM_DELETE_WINDOW", on_ok)
root.mainloop()
def _has_ipv6_enabled() -> bool:
import socket as _sock
try:
addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6)
for addr in addrs:
ip = addr[4][0]
if ip and not ip.startswith("::1") and not ip.startswith("fe80::1"):
return True
except Exception:
pass
try:
s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM)
s.bind(("::1", 0))
s.close()
return True
except Exception:
return False
def _check_ipv6_warning():
_ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
if not _has_ipv6_enabled():
return
IPV6_WARN_MARKER.touch()
threading.Thread(target=_show_ipv6_dialog, daemon=True).start()
def _show_ipv6_dialog():
_show_info(
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает или в логах присутствуют ошибки, "
"связанные с попытками подключения по IPv6 - "
"попробуйте отключить в настройках прокси Telegram попытку соединения "
"по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 "
"в системе.\n\n"
"Это предупреждение будет показано только один раз.",
"TG WS Proxy",
)
def _build_menu():
if pystray is None:
return None
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
return pystray.Menu(
pystray.MenuItem(
f"Открыть в Telegram ({host}:{port})", _on_open_in_telegram, default=True
),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Перезапустить прокси", _on_restart),
pystray.MenuItem("Настройки...", _on_edit_config),
pystray.MenuItem("Открыть логи", _on_open_logs),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Выход", _on_exit),
)
def run_tray():
global _tray_icon, _config
_config = load_config()
save_config(_config)
if LOG_FILE.exists():
try:
LOG_FILE.unlink()
except Exception:
pass
setup_logging(_config.get("verbose", False),
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]))
log.info("TG WS Proxy tray app starting")
log.info("Config: %s", _config)
log.info("Log file: %s", LOG_FILE)
if pystray is None or Image is None:
log.error("pystray or Pillow not installed; running in console mode")
start_proxy()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop_proxy()
return
start_proxy()
_show_first_run()
_check_ipv6_warning()
icon_image = _load_icon()
_tray_icon = pystray.Icon(APP_NAME, icon_image, "TG WS Proxy", menu=_build_menu())
log.info("Tray icon running")
_tray_icon.run()
stop_proxy()
log.info("Tray app exited")
def main():
if not _acquire_lock():
_show_info("Приложение уже запущено.", os.path.basename(sys.argv[0]))
return
try:
run_tray()
finally:
_release_lock()
if __name__ == "__main__":
main()

663
macos.py Normal file
View File

@@ -0,0 +1,663 @@
from __future__ import annotations
import json
import logging
import logging.handlers
import os
import psutil
import subprocess
import sys
import threading
import time
import webbrowser
import asyncio as _asyncio
from pathlib import Path
from typing import Dict, Optional
try:
import rumps
except ImportError:
rumps = None
try:
from PIL import Image, ImageDraw, ImageFont
except ImportError:
Image = ImageDraw = ImageFont = None
try:
import pyperclip
except ImportError:
pyperclip = None
import proxy.tg_ws_proxy as tg_ws_proxy
APP_NAME = "TgWsProxy"
APP_DIR = Path.home() / "Library" / "Application Support" / APP_NAME
CONFIG_FILE = APP_DIR / "config.json"
LOG_FILE = APP_DIR / "proxy.log"
FIRST_RUN_MARKER = APP_DIR / ".first_run_done"
IPV6_WARN_MARKER = APP_DIR / ".ipv6_warned"
MENUBAR_ICON_PATH = APP_DIR / "menubar_icon.png"
DEFAULT_CONFIG = {
"port": 1080,
"host": "127.0.0.1",
"dc_ip": ["2:149.154.167.220", "4:149.154.167.220"],
"verbose": False,
"log_max_mb": 5,
"buf_kb": 256,
"pool_size": 4,
}
_proxy_thread: Optional[threading.Thread] = None
_async_stop: Optional[object] = None
_app: Optional[object] = None
_config: dict = {}
_exiting: bool = False
_lock_file_path: Optional[Path] = None
log = logging.getLogger("tg-ws-tray")
# Single-instance lock
def _same_process(lock_meta: dict, proc: psutil.Process) -> bool:
try:
lock_ct = float(lock_meta.get("create_time", 0.0))
proc_ct = float(proc.create_time())
if lock_ct > 0 and abs(lock_ct - proc_ct) > 1.0:
return False
except Exception:
return False
frozen = bool(getattr(sys, "frozen", False))
if frozen:
return APP_NAME.lower() in proc.name().lower()
return False
def _release_lock():
global _lock_file_path
if not _lock_file_path:
return
try:
_lock_file_path.unlink(missing_ok=True)
except Exception:
pass
_lock_file_path = None
def _acquire_lock() -> bool:
global _lock_file_path
_ensure_dirs()
lock_files = list(APP_DIR.glob("*.lock"))
for f in lock_files:
pid = None
meta: dict = {}
try:
pid = int(f.stem)
except Exception:
f.unlink(missing_ok=True)
continue
try:
raw = f.read_text(encoding="utf-8").strip()
if raw:
meta = json.loads(raw)
except Exception:
meta = {}
try:
proc = psutil.Process(pid)
if _same_process(meta, proc):
return False
except Exception:
pass
f.unlink(missing_ok=True)
lock_file = APP_DIR / f"{os.getpid()}.lock"
try:
proc = psutil.Process(os.getpid())
payload = {"create_time": proc.create_time()}
lock_file.write_text(json.dumps(payload, ensure_ascii=False),
encoding="utf-8")
except Exception:
lock_file.touch()
_lock_file_path = lock_file
return True
# Filesystem helpers
def _ensure_dirs():
APP_DIR.mkdir(parents=True, exist_ok=True)
def load_config() -> dict:
_ensure_dirs()
if CONFIG_FILE.exists():
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_CONFIG.items():
data.setdefault(k, v)
return data
except Exception as exc:
log.warning("Failed to load config: %s", exc)
return dict(DEFAULT_CONFIG)
def save_config(cfg: dict):
_ensure_dirs()
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
def setup_logging(verbose: bool = False, log_max_mb: float = 5):
_ensure_dirs()
root = logging.getLogger()
root.setLevel(logging.DEBUG if verbose else logging.INFO)
fh = logging.handlers.RotatingFileHandler(
str(LOG_FILE),
maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024),
backupCount=0,
encoding='utf-8',
)
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"))
root.addHandler(fh)
if not getattr(sys, "frozen", False):
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG if verbose else logging.INFO)
ch.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(message)s",
datefmt="%H:%M:%S"))
root.addHandler(ch)
# Menubar icon
def _make_menubar_icon(size: int = 44):
if Image is None:
return None
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
margin = size // 11
draw.ellipse([margin, margin, size - margin, size - margin],
fill=(0, 0, 0, 255))
try:
font = ImageFont.truetype(
"/System/Library/Fonts/Helvetica.ttc",
size=int(size * 0.55))
except Exception:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), "T", font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
tx = (size - tw) // 2 - bbox[0]
ty = (size - th) // 2 - bbox[1]
draw.text((tx, ty), "T", fill=(255, 255, 255, 255), font=font)
return img
# Generate menubar icon PNG if it does not exist.
def _ensure_menubar_icon():
if MENUBAR_ICON_PATH.exists():
return
_ensure_dirs()
img = _make_menubar_icon(44)
if img:
img.save(str(MENUBAR_ICON_PATH), "PNG")
# Native macOS dialogs
def _osascript(script: str) -> str:
r = subprocess.run(
['osascript', '-e', script],
capture_output=True, text=True)
return r.stdout.strip()
def _show_error(text: str, title: str = "TG WS Proxy"):
text_esc = text.replace('\\', '\\\\').replace('"', '\\"')
title_esc = title.replace('\\', '\\\\').replace('"', '\\"')
_osascript(
f'display dialog "{text_esc}" with title "{title_esc}" '
f'buttons {{"OK"}} default button "OK" with icon stop')
def _show_info(text: str, title: str = "TG WS Proxy"):
text_esc = text.replace('\\', '\\\\').replace('"', '\\"')
title_esc = title.replace('\\', '\\\\').replace('"', '\\"')
_osascript(
f'display dialog "{text_esc}" with title "{title_esc}" '
f'buttons {{"OK"}} default button "OK" with icon note')
def _ask_yes_no(text: str, title: str = "TG WS Proxy") -> bool:
text_esc = text.replace('\\', '\\\\').replace('"', '\\"')
title_esc = title.replace('\\', '\\\\').replace('"', '\\"')
result = _osascript(
f'display dialog "{text_esc}" with title "{title_esc}" '
f'buttons {{"Нет", "Да"}} default button "Да" with icon note')
return "Да" in result
# Proxy lifecycle
def _run_proxy_thread(port: int, dc_opt: Dict[int, str], verbose: bool,
host: str = '127.0.0.1'):
global _async_stop
loop = _asyncio.new_event_loop()
_asyncio.set_event_loop(loop)
stop_ev = _asyncio.Event()
_async_stop = (loop, stop_ev)
try:
loop.run_until_complete(
tg_ws_proxy._run(port, dc_opt, stop_event=stop_ev, host=host))
except Exception as exc:
log.error("Proxy thread crashed: %s", exc)
if "Address already in use" in str(exc):
_show_error(
"Не удалось запустить прокси:\n"
"Порт уже используется другим приложением.\n\n"
"Закройте приложение, использующее этот порт, "
"или измените порт в настройках прокси и перезапустите.")
finally:
loop.close()
_async_stop = None
def start_proxy():
global _proxy_thread, _config
if _proxy_thread and _proxy_thread.is_alive():
log.info("Proxy already running")
return
cfg = _config
port = cfg.get("port", DEFAULT_CONFIG["port"])
host = cfg.get("host", DEFAULT_CONFIG["host"])
dc_ip_list = cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"])
verbose = cfg.get("verbose", False)
try:
dc_opt = tg_ws_proxy.parse_dc_ip_list(dc_ip_list)
except ValueError as e:
log.error("Bad config dc_ip: %s", e)
_show_error(f"Ошибка конфигурации:\n{e}")
return
log.info("Starting proxy on %s:%d ...", host, port)
buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])
pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])
tg_ws_proxy._RECV_BUF = max(4, buf_kb) * 1024
tg_ws_proxy._SEND_BUF = tg_ws_proxy._RECV_BUF
tg_ws_proxy._WS_POOL_SIZE = max(0, pool_size)
_proxy_thread = threading.Thread(
target=_run_proxy_thread,
args=(port, dc_opt, verbose, host),
daemon=True, name="proxy")
_proxy_thread.start()
def stop_proxy():
global _proxy_thread, _async_stop
if _async_stop:
loop, stop_ev = _async_stop
loop.call_soon_threadsafe(stop_ev.set)
if _proxy_thread:
_proxy_thread.join(timeout=2)
_proxy_thread = None
log.info("Proxy stopped")
def restart_proxy():
log.info("Restarting proxy...")
stop_proxy()
time.sleep(0.3)
start_proxy()
# Menu callbacks
def _on_open_in_telegram(_=None):
port = _config.get("port", DEFAULT_CONFIG["port"])
url = f"tg://socks?server=127.0.0.1&port={port}"
log.info("Opening %s", url)
try:
result = subprocess.call(['open', url])
if result != 0:
raise RuntimeError("open command failed")
except Exception:
log.info("open command failed, trying webbrowser")
try:
if not webbrowser.open(url):
raise RuntimeError("webbrowser.open returned False")
except Exception:
log.info("Browser open failed, copying to clipboard")
try:
if pyperclip:
pyperclip.copy(url)
else:
subprocess.run(['pbcopy'], input=url.encode(),
check=True)
_show_info(
"Не удалось открыть Telegram автоматически.\n\n"
f"Ссылка скопирована в буфер обмена:\n{url}")
except Exception as exc:
log.error("Clipboard copy failed: %s", exc)
_show_error(f"Не удалось скопировать ссылку:\n{exc}")
def _on_restart(_=None):
def _do_restart():
global _config
_config = load_config()
if _app:
_app.update_menu_title()
restart_proxy()
threading.Thread(target=_do_restart, daemon=True).start()
def _on_open_logs(_=None):
log.info("Opening log file: %s", LOG_FILE)
if LOG_FILE.exists():
subprocess.call(['open', str(LOG_FILE)])
else:
_show_info("Файл логов ещё не создан.")
# Show a native text input dialog. Returns None if cancelled.
def _osascript_input(prompt: str, default: str,
title: str = "TG WS Proxy") -> Optional[str]:
prompt_esc = prompt.replace('\\', '\\\\').replace('"', '\\"')
default_esc = default.replace('\\', '\\\\').replace('"', '\\"')
title_esc = title.replace('\\', '\\\\').replace('"', '\\"')
r = subprocess.run(
['osascript', '-e',
f'text returned of (display dialog "{prompt_esc}" '
f'default answer "{default_esc}" '
f'with title "{title_esc}" '
f'buttons {{"Отмена", "OK"}} default button "OK")'],
capture_output=True, text=True)
if r.returncode != 0:
return None
return r.stdout.rstrip("\r\n")
def _on_edit_config(_=None):
threading.Thread(target=_edit_config_dialog, daemon=True).start()
# Settings via native macOS dialogs
def _edit_config_dialog():
cfg = load_config()
# Host
host = _osascript_input(
"IP-адрес прокси:",
cfg.get("host", DEFAULT_CONFIG["host"]))
if host is None:
return
host = host.strip()
import socket as _sock
try:
_sock.inet_aton(host)
except OSError:
_show_error("Некорректный IP-адрес.")
return
# Port
port_str = _osascript_input(
"Порт прокси:",
str(cfg.get("port", DEFAULT_CONFIG["port"])))
if port_str is None:
return
try:
port = int(port_str.strip())
if not (1 <= port <= 65535):
raise ValueError
except ValueError:
_show_error("Порт должен быть числом 1-65535")
return
# DC-IP mappings
dc_default = ", ".join(cfg.get("dc_ip", DEFAULT_CONFIG["dc_ip"]))
dc_str = _osascript_input(
"DC → IP маппинги (через запятую, формат DC:IP):\n"
"Например: 2:149.154.167.220, 4:149.154.167.220",
dc_default)
if dc_str is None:
return
dc_lines = [s.strip() for s in dc_str.replace(',', '\n').splitlines()
if s.strip()]
try:
tg_ws_proxy.parse_dc_ip_list(dc_lines)
except ValueError as e:
_show_error(str(e))
return
# Verbose
verbose = _ask_yes_no("Включить подробное логирование (verbose)?")
# Advanced settings
adv_str = _osascript_input(
"Расширенные настройки (буфер KB, WS пул, лог MB):\n"
"Формат: buf_kb,pool_size,log_max_mb",
f"{cfg.get('buf_kb', DEFAULT_CONFIG['buf_kb'])},"
f"{cfg.get('pool_size', DEFAULT_CONFIG['pool_size'])},"
f"{cfg.get('log_max_mb', DEFAULT_CONFIG['log_max_mb'])}")
adv = {}
if adv_str:
parts = [s.strip() for s in adv_str.split(',')]
keys = [("buf_kb", int), ("pool_size", int),
("log_max_mb", float)]
for i, (k, typ) in enumerate(keys):
if i < len(parts):
try:
adv[k] = typ(parts[i])
except ValueError:
pass
new_cfg = {
"host": host,
"port": port,
"dc_ip": dc_lines,
"verbose": verbose,
"buf_kb": adv.get("buf_kb", cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])),
"pool_size": adv.get("pool_size", cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])),
"log_max_mb": adv.get("log_max_mb", cfg.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"])),
}
save_config(new_cfg)
log.info("Config saved: %s", new_cfg)
global _config
_config = new_cfg
if _app:
_app.update_menu_title()
if _ask_yes_no("Настройки сохранены.\n\nПерезапустить прокси сейчас?"):
restart_proxy()
# First-run & IPv6 dialogs
def _show_first_run():
_ensure_dirs()
if FIRST_RUN_MARKER.exists():
return
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
tg_url = f"tg://socks?server={host}&port={port}"
text = (
f"Прокси запущен и работает в строке меню.\n\n"
f"Как подключить Telegram Desktop:\n\n"
f"Автоматически:\n"
f" Нажмите «Открыть в Telegram» в меню\n"
f" Или ссылка: {tg_url}\n\n"
f"Вручную:\n"
f" Настройки → Продвинутые → Тип подключения → Прокси\n"
f" SOCKS5 → {host} : {port} (без логина/пароля)\n\n"
f"Открыть прокси в Telegram сейчас?"
)
FIRST_RUN_MARKER.touch()
if _ask_yes_no(text, "TG WS Proxy"):
_on_open_in_telegram()
def _has_ipv6_enabled() -> bool:
import socket as _sock
try:
addrs = _sock.getaddrinfo(_sock.gethostname(), None, _sock.AF_INET6)
for addr in addrs:
ip = addr[4][0]
if ip and not ip.startswith('::1') and not ip.startswith('fe80::1'):
return True
except Exception:
pass
try:
s = _sock.socket(_sock.AF_INET6, _sock.SOCK_STREAM)
s.bind(('::1', 0))
s.close()
return True
except Exception:
return False
def _check_ipv6_warning():
_ensure_dirs()
if IPV6_WARN_MARKER.exists():
return
if not _has_ipv6_enabled():
return
IPV6_WARN_MARKER.touch()
_show_info(
"На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает, попробуйте отключить "
"попытку соединения по IPv6 в настройках прокси Telegram.\n\n"
"Это предупреждение будет показано только один раз.")
# rumps menubar app
_TgWsProxyAppBase = rumps.App if rumps else object
class TgWsProxyApp(_TgWsProxyAppBase):
def __init__(self):
_ensure_menubar_icon()
icon_path = (str(MENUBAR_ICON_PATH)
if MENUBAR_ICON_PATH.exists() else None)
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
self._open_tg_item = rumps.MenuItem(
f"Открыть в Telegram ({host}:{port})",
callback=_on_open_in_telegram)
self._restart_item = rumps.MenuItem(
"Перезапустить прокси",
callback=_on_restart)
self._settings_item = rumps.MenuItem(
"Настройки...",
callback=_on_edit_config)
self._logs_item = rumps.MenuItem(
"Открыть логи",
callback=_on_open_logs)
super().__init__(
"TG WS Proxy",
icon=icon_path,
template=False,
quit_button="Выход",
menu=[
self._open_tg_item,
None,
self._restart_item,
self._settings_item,
self._logs_item,
])
def update_menu_title(self):
host = _config.get("host", DEFAULT_CONFIG["host"])
port = _config.get("port", DEFAULT_CONFIG["port"])
self._open_tg_item.title = (
f"Открыть в Telegram ({host}:{port})")
def run_menubar():
global _app, _config
_config = load_config()
save_config(_config)
if LOG_FILE.exists():
try:
LOG_FILE.unlink()
except Exception:
pass
setup_logging(_config.get("verbose", False),
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]))
log.info("TG WS Proxy menubar app starting")
log.info("Config: %s", _config)
log.info("Log file: %s", LOG_FILE)
if rumps is None or Image is None:
log.error("rumps or Pillow not installed; running in console mode")
start_proxy()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop_proxy()
return
start_proxy()
_show_first_run()
_check_ipv6_warning()
_app = TgWsProxyApp()
log.info("Menubar app running")
_app.run()
stop_proxy()
log.info("Menubar app exited")
def main():
if not _acquire_lock():
_show_info("Приложение уже запущено.")
return
try:
run_menubar()
finally:
_release_lock()
if __name__ == "__main__":
main()

80
packaging/linux.spec Normal file
View File

@@ -0,0 +1,80 @@
# -*- mode: python ; coding: utf-8 -*-
import sys
import os
import glob
from PyInstaller.utils.hooks import collect_submodules, collect_data_files
block_cipher = None
# customtkinter ships JSON themes + assets that must be bundled
import customtkinter
ctk_path = os.path.dirname(customtkinter.__file__)
# Collect gi (PyGObject) submodules and data so pystray._appindicator works
gi_hiddenimports = collect_submodules('gi')
gi_datas = collect_data_files('gi')
# Collect GObject typelib files from the system
typelib_dirs = glob.glob('/usr/lib/*/girepository-1.0')
typelib_datas = []
for d in typelib_dirs:
typelib_datas.append((d, 'gi_typelibs'))
a = Analysis(
[os.path.join(os.path.dirname(SPEC), os.pardir, 'linux.py')],
pathex=[],
binaries=[],
datas=[(ctk_path, 'customtkinter/')] + gi_datas + typelib_datas,
hiddenimports=[
'pystray._appindicator',
'PIL._tkinter_finder',
'customtkinter',
'cryptography.hazmat.primitives.ciphers',
'cryptography.hazmat.primitives.ciphers.algorithms',
'cryptography.hazmat.primitives.ciphers.modes',
'cryptography.hazmat.backends.openssl',
'gi',
'_gi',
'gi.repository.GLib',
'gi.repository.GObject',
'gi.repository.Gtk',
'gi.repository.Gdk',
'gi.repository.AyatanaAppIndicator3',
] + gi_hiddenimports,
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
noarchive=False,
cipher=block_cipher,
)
icon_path = os.path.join(os.path.dirname(SPEC), os.pardir, 'icon.ico')
if os.path.exists(icon_path):
a.datas += [('icon.ico', icon_path, 'DATA')]
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='TgWsProxy',
debug=False,
bootloader_ignore_signals=False,
strip=True,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=False,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)

83
packaging/macos.spec Normal file
View File

@@ -0,0 +1,83 @@
# -*- mode: python ; coding: utf-8 -*-
import sys
import os
block_cipher = None
a = Analysis(
[os.path.join(os.path.dirname(SPEC), os.pardir, 'macos.py')],
pathex=[],
binaries=[],
datas=[],
hiddenimports=[
'rumps',
'objc',
'Foundation',
'AppKit',
'PyObjCTools',
'PyObjCTools.AppHelper',
'cryptography.hazmat.primitives.ciphers',
'cryptography.hazmat.primitives.ciphers.algorithms',
'cryptography.hazmat.primitives.ciphers.modes',
'cryptography.hazmat.backends.openssl',
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
noarchive=False,
cipher=block_cipher,
)
icon_path = os.path.join(os.path.dirname(SPEC), os.pardir, 'icon.icns')
if not os.path.exists(icon_path):
icon_path = None
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
[],
exclude_binaries=True,
name='TgWsProxy',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=False,
console=False,
argv_emulation=False,
target_arch='universal2',
codesign_identity=None,
entitlements_file=None,
)
coll = COLLECT(
exe,
a.binaries,
a.zipfiles,
a.datas,
strip=False,
upx=False,
upx_exclude=[],
name='TgWsProxy',
)
app = BUNDLE(
coll,
name='TG WS Proxy.app',
icon=icon_path,
bundle_identifier='com.tgwsproxy.app',
info_plist={
'CFBundleName': 'TG WS Proxy',
'CFBundleDisplayName': 'TG WS Proxy',
'CFBundleShortVersionString': '1.0.0',
'CFBundleVersion': '1.0.0',
'LSMinimumSystemVersion': '10.15',
'LSUIElement': True,
'NSHighResolutionCapable': True,
'NSAppleEventsUsageDescription':
'TG WS Proxy needs to display dialogs.',
},
)

1
proxy/__init__.py Normal file
View File

@@ -0,0 +1 @@
__version__ = "1.1.3"

View File

@@ -4,6 +4,7 @@ import argparse
import asyncio import asyncio
import base64 import base64
import logging import logging
import logging.handlers
import os import os
import socket as _socket import socket as _socket
import ssl import ssl
@@ -18,8 +19,8 @@ DEFAULT_PORT = 1080
log = logging.getLogger('tg-ws-proxy') log = logging.getLogger('tg-ws-proxy')
_TCP_NODELAY = True _TCP_NODELAY = True
_RECV_BUF = 131072 _RECV_BUF = 256 * 1024
_SEND_BUF = 131072 _SEND_BUF = 256 * 1024
_WS_POOL_SIZE = 4 _WS_POOL_SIZE = 4
_WS_POOL_MAX_AGE = 120.0 _WS_POOL_MAX_AGE = 120.0
@@ -68,6 +69,11 @@ _IP_TO_DC: Dict[str, Tuple[int, bool]] = {
'91.105.192.100': (203, False), '91.105.192.100': (203, False),
} }
# This case might work but not actually sure
_DC_OVERRIDES: Dict[int, int] = {
203: 2
}
_dc_opt: Dict[int, Optional[str]] = {} _dc_opt: Dict[int, Optional[str]] = {}
# DCs where WS is known to fail (302 redirect) # DCs where WS is known to fail (302 redirect)
@@ -77,7 +83,10 @@ _ws_blacklist: Set[Tuple[int, bool]] = set()
# Rate-limit re-attempts per (dc, is_media) # Rate-limit re-attempts per (dc, is_media)
_dc_fail_until: Dict[Tuple[int, bool], float] = {} _dc_fail_until: Dict[Tuple[int, bool], float] = {}
_DC_FAIL_COOLDOWN = 60.0 # seconds _DC_FAIL_COOLDOWN = 30.0 # seconds to keep reduced WS timeout after failure
_WS_FAIL_TIMEOUT = 2.0 # quick-retry timeout after a recent WS failure
_ZERO_64 = b'\x00' * 64
_ssl_ctx = ssl.create_default_context() _ssl_ctx = ssl.create_default_context()
@@ -123,6 +132,21 @@ def _xor_mask(data: bytes, mask: bytes) -> bytes:
return (int.from_bytes(data, 'big') ^ int.from_bytes(mask_rep, 'big')).to_bytes(n, 'big') return (int.from_bytes(data, 'big') ^ int.from_bytes(mask_rep, 'big')).to_bytes(n, 'big')
# Pre-compiled struct formats
_st_BB = struct.Struct('>BB')
_st_BBH = struct.Struct('>BBH')
_st_BBQ = struct.Struct('>BBQ')
_st_BB4s = struct.Struct('>BB4s')
_st_BBH4s = struct.Struct('>BBH4s')
_st_BBQ4s = struct.Struct('>BBQ4s')
_st_H = struct.Struct('>H')
_st_Q = struct.Struct('>Q')
_st_I_net = struct.Struct('!I')
_st_Ih = struct.Struct('<Ih')
_st_I_le = struct.Struct('<I')
_VALID_PROTOS = frozenset((0xEFEFEFEF, 0xEEEEEEEE, 0xDDDDDDDD))
class RawWebSocket: class RawWebSocket:
""" """
Lightweight WebSocket client over asyncio reader/writer streams. Lightweight WebSocket client over asyncio reader/writer streams.
@@ -131,6 +155,7 @@ class RawWebSocket:
proxy), performs the HTTP Upgrade handshake, and provides send/recv proxy), performs the HTTP Upgrade handshake, and provides send/recv
for binary frames with proper masking, ping/pong, and close handling. for binary frames with proper masking, ping/pong, and close handling.
""" """
__slots__ = ('reader', 'writer', '_closed')
OP_CONTINUATION = 0x0 OP_CONTINUATION = 0x0
OP_TEXT = 0x1 OP_TEXT = 0x1
@@ -296,40 +321,37 @@ class RawWebSocket:
@staticmethod @staticmethod
def _build_frame(opcode: int, data: bytes, def _build_frame(opcode: int, data: bytes,
mask: bool = False) -> bytes: mask: bool = False) -> bytes:
header = bytearray()
header.append(0x80 | opcode) # FIN=1 + opcode
length = len(data) length = len(data)
mask_bit = 0x80 if mask else 0x00 fb = 0x80 | opcode
if not mask:
if length < 126:
return _st_BB.pack(fb, length) + data
if length < 65536:
return _st_BBH.pack(fb, 126, length) + data
return _st_BBQ.pack(fb, 127, length) + data
mask_key = os.urandom(4)
masked = _xor_mask(data, mask_key)
if length < 126: if length < 126:
header.append(mask_bit | length) return _st_BB4s.pack(fb, 0x80 | length, mask_key) + masked
elif length < 65536: if length < 65536:
header.append(mask_bit | 126) return _st_BBH4s.pack(fb, 0x80 | 126, length, mask_key) + masked
header.extend(struct.pack('>H', length)) return _st_BBQ4s.pack(fb, 0x80 | 127, length, mask_key) + masked
else:
header.append(mask_bit | 127)
header.extend(struct.pack('>Q', length))
if mask:
mask_key = os.urandom(4)
header.extend(mask_key)
return bytes(header) + _xor_mask(data, mask_key)
return bytes(header) + data
async def _read_frame(self) -> Tuple[int, bytes]: async def _read_frame(self) -> Tuple[int, bytes]:
hdr = await self.reader.readexactly(2) hdr = await self.reader.readexactly(2)
opcode = hdr[0] & 0x0F opcode = hdr[0] & 0x0F
is_masked = bool(hdr[1] & 0x80)
length = hdr[1] & 0x7F length = hdr[1] & 0x7F
if length == 126: if length == 126:
length = struct.unpack('>H', length = _st_H.unpack(
await self.reader.readexactly(2))[0] await self.reader.readexactly(2))[0]
elif length == 127: elif length == 127:
length = struct.unpack('>Q', length = _st_Q.unpack(
await self.reader.readexactly(8))[0] await self.reader.readexactly(8))[0]
if is_masked: if hdr[1] & 0x80:
mask_key = await self.reader.readexactly(4) mask_key = await self.reader.readexactly(4)
payload = await self.reader.readexactly(length) payload = await self.reader.readexactly(length)
return opcode, _xor_mask(payload, mask_key) return opcode, _xor_mask(payload, mask_key)
@@ -348,7 +370,7 @@ def _human_bytes(n: int) -> str:
def _is_telegram_ip(ip: str) -> bool: def _is_telegram_ip(ip: str) -> bool:
try: try:
n = struct.unpack('!I', _socket.inet_aton(ip))[0] n = _st_I_net.unpack(_socket.inet_aton(ip))[0]
return any(lo <= n <= hi for lo, hi in _TG_RANGES) return any(lo <= n <= hi for lo, hi in _TG_RANGES)
except OSError: except OSError:
return False return False
@@ -365,19 +387,16 @@ def _dc_from_init(data: bytes) -> Tuple[Optional[int], bool]:
Returns (dc_id, is_media). Returns (dc_id, is_media).
""" """
try: try:
key = bytes(data[8:40]) cipher = Cipher(algorithms.AES(data[8:40]), modes.CTR(data[40:56]))
iv = bytes(data[40:56])
cipher = Cipher(algorithms.AES(key), modes.CTR(iv))
encryptor = cipher.encryptor() encryptor = cipher.encryptor()
keystream = encryptor.update(b'\x00' * 64) + encryptor.finalize() keystream = encryptor.update(_ZERO_64)
plain = bytes(a ^ b for a, b in zip(data[56:64], keystream[56:64])) plain = (int.from_bytes(data[56:64], 'big') ^ int.from_bytes(keystream[56:64], 'big')).to_bytes(8, 'big')
proto = struct.unpack('<I', plain[0:4])[0] proto, dc_raw = _st_Ih.unpack(plain[:6])
dc_raw = struct.unpack('<h', plain[4:6])[0]
log.debug("dc_from_init: proto=0x%08X dc_raw=%d plain=%s", log.debug("dc_from_init: proto=0x%08X dc_raw=%d plain=%s",
proto, dc_raw, plain.hex()) proto, dc_raw, plain.hex())
if proto in (0xEFEFEFEF, 0xEEEEEEEE, 0xDDDDDDDD): if proto in _VALID_PROTOS:
dc = abs(dc_raw) dc = abs(dc_raw)
if 1 <= dc <= 1000: if 1 <= dc <= 5 or dc == 203:
return dc, (dc_raw < 0) return dc, (dc_raw < 0)
except Exception as exc: except Exception as exc:
log.debug("DC extraction failed: %s", exc) log.debug("DC extraction failed: %s", exc)
@@ -396,11 +415,9 @@ def _patch_init_dc(data: bytes, dc: int) -> bytes:
new_dc = struct.pack('<h', dc) new_dc = struct.pack('<h', dc)
try: try:
key_raw = bytes(data[8:40]) cipher = Cipher(algorithms.AES(data[8:40]), modes.CTR(data[40:56]))
iv = bytes(data[40:56])
cipher = Cipher(algorithms.AES(key_raw), modes.CTR(iv))
enc = cipher.encryptor() enc = cipher.encryptor()
ks = enc.update(b'\x00' * 64) + enc.finalize() ks = enc.update(_ZERO_64)
patched = bytearray(data[:64]) patched = bytearray(data[:64])
patched[60] = ks[60] ^ new_dc[0] patched[60] = ks[60] ^ new_dc[0]
patched[61] = ks[61] ^ new_dc[1] patched[61] = ks[61] ^ new_dc[1]
@@ -424,30 +441,30 @@ class _MsgSplitter:
""" """
def __init__(self, init_data: bytes): def __init__(self, init_data: bytes):
key_raw = bytes(init_data[8:40]) cipher = Cipher(algorithms.AES(init_data[8:40]),
iv = bytes(init_data[40:56]) modes.CTR(init_data[40:56]))
cipher = Cipher(algorithms.AES(key_raw), modes.CTR(iv))
self._dec = cipher.encryptor() self._dec = cipher.encryptor()
self._dec.update(b'\x00' * 64) # skip init packet self._dec.update(_ZERO_64) # skip init packet
def split(self, chunk: bytes) -> List[bytes]: def split(self, chunk: bytes) -> List[bytes]:
"""Decrypt to find message boundaries, return split ciphertext.""" """Decrypt to find message boundaries, return split ciphertext."""
plain = self._dec.update(chunk) plain = self._dec.update(chunk)
boundaries = [] boundaries = []
pos = 0 pos = 0
while pos < len(plain): plain_len = len(plain)
while pos < plain_len:
first = plain[pos] first = plain[pos]
if first == 0x7f: if first == 0x7f:
if pos + 4 > len(plain): if pos + 4 > plain_len:
break break
msg_len = ( msg_len = (
struct.unpack_from('<I', plain, pos + 1)[0] & 0xFFFFFF _st_I_le.unpack_from(plain, pos + 1)[0] & 0xFFFFFF
) * 4 ) * 4
pos += 4 pos += 4
else: else:
msg_len = first * 4 msg_len = first * 4
pos += 1 pos += 1
if msg_len == 0 or pos + msg_len > len(plain): if msg_len == 0 or pos + msg_len > plain_len:
break break
pos += msg_len pos += msg_len
boundaries.append(pos) boundaries.append(pos)
@@ -464,16 +481,10 @@ class _MsgSplitter:
def _ws_domains(dc: int, is_media) -> List[str]: def _ws_domains(dc: int, is_media) -> List[str]:
""" dc = _DC_OVERRIDES.get(dc, dc)
Return domain names to try for WebSocket connection to a DC.
DC 1-5: kws{N}[-1].web.telegram.org
DC >5: kws{N}[-1].telegram.org
"""
base = 'telegram.org' if dc > 5 else 'web.telegram.org'
if is_media is None or is_media: if is_media is None or is_media:
return [f'kws{dc}-1.{base}', f'kws{dc}.{base}'] return [f'kws{dc}-1.web.telegram.org', f'kws{dc}.web.telegram.org']
return [f'kws{dc}.{base}', f'kws{dc}-1.{base}'] return [f'kws{dc}.web.telegram.org', f'kws{dc}-1.web.telegram.org']
class Stats: class Stats:
@@ -614,11 +625,12 @@ async def _bridge_ws(reader, writer, ws: RawWebSocket, label,
nonlocal up_bytes, up_packets nonlocal up_bytes, up_packets
try: try:
while True: while True:
chunk = await reader.read(131072) chunk = await reader.read(65536)
if not chunk: if not chunk:
break break
_stats.bytes_up += len(chunk) n = len(chunk)
up_bytes += len(chunk) _stats.bytes_up += n
up_bytes += n
up_packets += 1 up_packets += 1
if splitter: if splitter:
parts = splitter.split(chunk) parts = splitter.split(chunk)
@@ -640,14 +652,12 @@ async def _bridge_ws(reader, writer, ws: RawWebSocket, label,
data = await ws.recv() data = await ws.recv()
if data is None: if data is None:
break break
_stats.bytes_down += len(data) n = len(data)
down_bytes += len(data) _stats.bytes_down += n
down_bytes += n
down_packets += 1 down_packets += 1
writer.write(data) writer.write(data)
# drain only when kernel buffer is filling up await writer.drain()
buf = writer.transport.get_write_buffer_size()
if buf > _SEND_BUF:
await writer.drain()
except (asyncio.CancelledError, ConnectionError, OSError): except (asyncio.CancelledError, ConnectionError, OSError):
return return
except Exception as e: except Exception as e:
@@ -687,26 +697,27 @@ async def _bridge_tcp(reader, writer, remote_reader, remote_writer,
label, dc=None, dst=None, port=None, label, dc=None, dst=None, port=None,
is_media=False): is_media=False):
"""Bidirectional TCP <-> TCP forwarding (for fallback).""" """Bidirectional TCP <-> TCP forwarding (for fallback)."""
async def forward(src, dst_w, tag): async def forward(src, dst_w, is_up):
try: try:
while True: while True:
data = await src.read(65536) data = await src.read(65536)
if not data: if not data:
break break
if 'up' in tag: n = len(data)
_stats.bytes_up += len(data) if is_up:
_stats.bytes_up += n
else: else:
_stats.bytes_down += len(data) _stats.bytes_down += n
dst_w.write(data) dst_w.write(data)
await dst_w.drain() await dst_w.drain()
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
except Exception as e: except Exception as e:
log.debug("[%s] %s ended: %s", label, tag, e) log.debug("[%s] forward ended: %s", label, e)
tasks = [ tasks = [
asyncio.create_task(forward(reader, remote_writer, 'up')), asyncio.create_task(forward(reader, remote_writer, True)),
asyncio.create_task(forward(remote_reader, writer, 'down')), asyncio.create_task(forward(remote_reader, writer, False)),
] ]
try: try:
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
@@ -747,8 +758,12 @@ async def _pipe(r, w):
pass pass
_SOCKS5_REPLIES = {s: bytes([0x05, s, 0x00, 0x01, 0, 0, 0, 0, 0, 0])
for s in (0x00, 0x05, 0x07, 0x08)}
def _socks5_reply(status): def _socks5_reply(status):
return bytes([0x05, status, 0x00, 0x01]) + b'\x00' * 6 return _SOCKS5_REPLIES[status]
async def _tcp_fallback(reader, writer, dst, port, init, label, async def _tcp_fallback(reader, writer, dst, port, init, label,
@@ -816,13 +831,13 @@ async def _handle_client(reader, writer):
writer.close() writer.close()
return return
port = struct.unpack('!H', await reader.readexactly(2))[0] port = _st_H.unpack(await reader.readexactly(2))[0]
if ':' in dst: if ':' in dst:
log.error( log.error(
"[%s] IPv6 address detected: %s:%d" "[%s] IPv6 address detected: %s:%d"
"IPv6 doesn't supported " "IPv6 addresses are not supported; "
"Disable IPv6 to continue using the proxy.", "disable IPv6 to continue using the proxy.",
label, dst, port) label, dst, port)
writer.write(_socks5_reply(0x05)) writer.write(_socks5_reply(0x05))
await writer.drain() await writer.drain()
@@ -911,20 +926,10 @@ async def _handle_client(reader, writer):
label, dc, media_tag) label, dc, media_tag)
return return
# -- Cooldown check --
fail_until = _dc_fail_until.get(dc_key, 0)
if now < fail_until:
remaining = fail_until - now
log.debug("[%s] DC%d%s WS cooldown (%.0fs) -> TCP",
label, dc, media_tag, remaining)
ok = await _tcp_fallback(reader, writer, dst, port, init,
label, dc=dc, is_media=is_media)
if ok:
log.info("[%s] DC%d%s TCP fallback closed",
label, dc, media_tag)
return
# -- Try WebSocket via direct connection -- # -- Try WebSocket via direct connection --
fail_until = _dc_fail_until.get(dc_key, 0)
ws_timeout = _WS_FAIL_TIMEOUT if now < fail_until else 10.0
domains = _ws_domains(dc, is_media) domains = _ws_domains(dc, is_media)
target = _dc_opt[dc] target = _dc_opt[dc]
ws = None ws = None
@@ -942,7 +947,7 @@ async def _handle_client(reader, writer):
label, dc, media_tag, dst, port, url, target) label, dc, media_tag, dst, port, url, target)
try: try:
ws = await RawWebSocket.connect(target, domain, ws = await RawWebSocket.connect(target, domain,
timeout=10) timeout=ws_timeout)
all_redirects = False all_redirects = False
break break
except WsHandshakeError as exc: except WsHandshakeError as exc:
@@ -1127,24 +1132,56 @@ def main():
ap.add_argument('--host', type=str, default='127.0.0.1', ap.add_argument('--host', type=str, default='127.0.0.1',
help='Listen host (default 127.0.0.1)') help='Listen host (default 127.0.0.1)')
ap.add_argument('--dc-ip', metavar='DC:IP', action='append', ap.add_argument('--dc-ip', metavar='DC:IP', action='append',
default=['2:149.154.167.220', '4:149.154.167.220'], default=[],
help='Target IP for a DC, e.g. --dc-ip 1:149.154.175.205' help='Target IP for a DC, e.g. --dc-ip 1:149.154.175.205'
' --dc-ip 2:149.154.167.220') ' --dc-ip 2:149.154.167.220')
ap.add_argument('-v', '--verbose', action='store_true', ap.add_argument('-v', '--verbose', action='store_true',
help='Debug logging') help='Debug logging')
ap.add_argument('--log-file', type=str, default=None, metavar='PATH',
help='Log to file with rotation (default: stderr only)')
ap.add_argument('--log-max-mb', type=float, default=5, metavar='MB',
help='Max log file size in MB before rotation (default 5)')
ap.add_argument('--log-backups', type=int, default=0, metavar='N',
help='Number of rotated log files to keep (default 0)')
ap.add_argument('--buf-kb', type=int, default=256, metavar='KB',
help='Socket send/recv buffer size in KB (default 256)')
ap.add_argument('--pool-size', type=int, default=4, metavar='N',
help='WS connection pool size per DC (default 4, min 0)')
args = ap.parse_args() args = ap.parse_args()
if not args.dc_ip:
args.dc_ip = ['2:149.154.167.220', '4:149.154.167.220']
try: try:
dc_opt = parse_dc_ip_list(args.dc_ip) dc_opt = parse_dc_ip_list(args.dc_ip)
except ValueError as e: except ValueError as e:
log.error(str(e)) log.error(str(e))
sys.exit(1) sys.exit(1)
logging.basicConfig( log_level = logging.DEBUG if args.verbose else logging.INFO
level=logging.DEBUG if args.verbose else logging.INFO, log_fmt = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s',
format='%(asctime)s %(levelname)-5s %(message)s', datefmt='%H:%M:%S')
datefmt='%H:%M:%S', root = logging.getLogger()
) root.setLevel(log_level)
console = logging.StreamHandler()
console.setFormatter(log_fmt)
root.addHandler(console)
if args.log_file:
fh = logging.handlers.RotatingFileHandler(
args.log_file,
maxBytes=max(32 * 1024, args.log_max_mb * 1024 * 1024),
backupCount=max(0, args.log_backups),
encoding='utf-8',
)
fh.setFormatter(log_fmt)
root.addHandler(fh)
global _RECV_BUF, _SEND_BUF, _WS_POOL_SIZE
_RECV_BUF = max(4, args.buf_kb) * 1024
_SEND_BUF = _RECV_BUF
_WS_POOL_SIZE = max(0, args.pool_size)
try: try:
asyncio.run(_run(args.port, dc_opt, host=args.host)) asyncio.run(_run(args.port, dc_opt, host=args.host))

95
pyproject.toml Normal file
View File

@@ -0,0 +1,95 @@
[build-system]
requires = ["hatchling>=1.25.0"]
build-backend = "hatchling.build"
[project]
name = "tg-ws-proxy"
dynamic=["version"]
description = "Telegram Desktop WebSocket Bridge Proxy"
readme = "README.md"
requires-python = ">=3.8"
license = { name = "MIT", file = "LICENSE" }
authors = [
{ name = "Flowseal" }
]
keywords = [
"telegram",
"proxy",
"websocket"
]
classifiers = [
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
"Environment :: MacOS X :: Cocoa",
"Environment :: Win32 (MS Windows)",
"Environment :: X11 Applications :: GTK",
"Intended Audience :: Customer Service",
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: MacOS :: MacOS X",
"Operating System :: Microsoft :: Windows",
"Operating System :: POSIX :: Linux",
"Topic :: System :: Networking :: Firewalls",
]
dependencies = [
"cryptography==41.0.7; platform_system == 'Windows' and python_version < '3.9'",
"cryptography==46.0.5; platform_system != 'Windows' or python_version >= '3.9'",
]
[project.optional-dependencies]
win7 = [
"customtkinter==5.2.2",
"Pillow==10.4.0",
"psutil==5.9.8",
"pystray==0.19.5",
"pyperclip==1.9.0",
]
win10 = [
"customtkinter==5.2.2",
"Pillow==12.1.1",
"psutil==7.0.0",
"pystray==0.19.5",
"pyperclip==1.9.0",
]
macos = [
"Pillow==12.1.0",
"psutil==7.0.0",
"pyperclip==1.9.0",
"rumps==0.4.0",
]
linux = [
"customtkinter==5.2.2",
"Pillow==12.1.1",
"psutil==7.0.0",
"pystray==0.19.5",
"pyperclip==1.9.0",
]
[project.scripts]
tg-ws-proxy = "proxy.tg_ws_proxy:main"
tg-ws-proxy-tray-win = "windows:main"
tg-ws-proxy-tray-macos = "macos:main"
tg-ws-proxy-tray-linux = "linux:main"
[project.urls]
Source = "https://github.com/Flowseal/tg-ws-proxy"
Issues = "https://github.com/Flowseal/tg-ws-proxy/issues"
[tool.hatch.build.targets.wheel]
packages = ["proxy"]
[tool.hatch.build.force-include]
"windows.py" = "windows.py"
"macos.py" = "macos.py"
"linux.py" = "linux.py"
[tool.hatch.version]
path = "proxy/__init__.py"

View File

@@ -1,6 +0,0 @@
cryptography==41.0.7
customtkinter==5.2.2
Pillow==10.4.0
psutil==5.9.8
pystray==0.19.5
pyperclip==1.9.0

View File

@@ -1,6 +0,0 @@
cryptography==46.0.5
customtkinter==5.2.2
Pillow==12.1.1
psutil==7.0.0
pystray==0.19.5
pyperclip==1.9.0

246
stress_test.py Normal file
View File

@@ -0,0 +1,246 @@
"""
Stress-test: сравнение OLD vs NEW реализаций горячих функций прокси.
Тестируются:
1. _build_frame — сборка WS-фрейма (masked binary)
2. _build_frame — сборка WS-фрейма (unmasked)
3. _socks5_reply — генерация SOCKS5-ответа
4. _dc_from_init XOR-часть (bytes(a^b for …) vs int.from_bytes)
5. mask key generation (os.urandom vs PRNG)
"""
import gc
import os
import random
import struct
import time
# ── Размеры данных, типичные для Telegram ──────────────────────────
SMALL = 64 # init-пакет / ack
MEDIUM = 1024 # текстовое сообщение
LARGE = 65536 # фото / голосовое
# ═══════════════════════════════════════════════════════════════════
# XOR mask (не менялся — для полноты)
# ═══════════════════════════════════════════════════════════════════
def xor_mask(data: bytes, mask: bytes) -> bytes:
if not data:
return data
n = len(data)
mask_rep = (mask * (n // 4 + 1))[:n]
return (int.from_bytes(data, 'big') ^ int.from_bytes(mask_rep, 'big')).to_bytes(n, 'big')
# ═══════════════════════════════════════════════════════════════════
# _build_frame
# ═══════════════════════════════════════════════════════════════════
def build_frame_old(opcode: int, data: bytes, mask: bool = False) -> bytes:
"""Старая: bytearray + append/extend + os.urandom."""
header = bytearray()
header.append(0x80 | opcode)
length = len(data)
mask_bit = 0x80 if mask else 0x00
if length < 126:
header.append(mask_bit | length)
elif length < 65536:
header.append(mask_bit | 126)
header.extend(struct.pack('>H', length))
else:
header.append(mask_bit | 127)
header.extend(struct.pack('>Q', length))
if mask:
mask_key = os.urandom(4)
header.extend(mask_key)
return bytes(header) + xor_mask(data, mask_key)
return bytes(header) + data
# ── Новая: pre-compiled struct + PRNG ──────────────────────────────
_st_BB = struct.Struct('>BB')
_st_BBH = struct.Struct('>BBH')
_st_BBQ = struct.Struct('>BBQ')
_st_BB4s = struct.Struct('>BB4s')
_st_BBH4s = struct.Struct('>BBH4s')
_st_BBQ4s = struct.Struct('>BBQ4s')
_mask_rng = random.Random(int.from_bytes(os.urandom(16), 'big'))
_mask_pack = struct.Struct('>I').pack
def _random_mask_key() -> bytes:
return _mask_pack(_mask_rng.getrandbits(32))
def build_frame_new(opcode: int, data: bytes, mask: bool = False) -> bytes:
"""Новая: struct.pack + PRNG mask."""
length = len(data)
fb = 0x80 | opcode
if not mask:
if length < 126:
return _st_BB.pack(fb, length) + data
if length < 65536:
return _st_BBH.pack(fb, 126, length) + data
return _st_BBQ.pack(fb, 127, length) + data
mask_key = _random_mask_key()
masked = xor_mask(data, mask_key)
if length < 126:
return _st_BB4s.pack(fb, 0x80 | length, mask_key) + masked
if length < 65536:
return _st_BBH4s.pack(fb, 0x80 | 126, length, mask_key) + masked
return _st_BBQ4s.pack(fb, 0x80 | 127, length, mask_key) + masked
# ═══════════════════════════════════════════════════════════════════
# _socks5_reply
# ═══════════════════════════════════════════════════════════════════
def socks5_reply_old(status):
return bytes([0x05, status, 0x00, 0x01]) + b'\x00' * 6
_SOCKS5_REPLIES = {s: bytes([0x05, s, 0x00, 0x01, 0, 0, 0, 0, 0, 0])
for s in (0x00, 0x05, 0x07, 0x08)}
def socks5_reply_new(status):
return _SOCKS5_REPLIES[status]
# ═══════════════════════════════════════════════════════════════════
# dc_from_init XOR (8 байт keystream ^ data)
# ═══════════════════════════════════════════════════════════════════
def dc_xor_old(data8: bytes, ks8: bytes) -> bytes:
"""Старая: генераторное выражение."""
return bytes(a ^ b for a, b in zip(data8, ks8))
def dc_xor_new(data8: bytes, ks8: bytes) -> bytes:
"""Новая: int.from_bytes."""
return (int.from_bytes(data8, 'big') ^ int.from_bytes(ks8, 'big')).to_bytes(8, 'big')
# ═══════════════════════════════════════════════════════════════════
# mask key: os.urandom(4) vs PRNG
# ═══════════════════════════════════════════════════════════════════
def mask_key_old() -> bytes:
return os.urandom(4)
def mask_key_new() -> bytes:
return _random_mask_key()
# ═══════════════════════════════════════════════════════════════════
# Бенчмарк
# ═══════════════════════════════════════════════════════════════════
def bench(func, args_list: list, iters: int) -> float:
gc.collect()
for i in range(min(100, iters)):
func(*args_list[i % len(args_list)])
start = time.perf_counter()
for i in range(iters):
func(*args_list[i % len(args_list)])
elapsed = time.perf_counter() - start
return elapsed / iters * 1_000_000 # мкс
def compare(name: str, old_fn, new_fn, args_list: list, iters: int):
t_old = bench(old_fn, args_list, iters)
t_new = bench(new_fn, args_list, iters)
speedup = t_old / t_new if t_new > 0 else float('inf')
marker = '' if speedup >= 1.0 else '⚠️'
print(f" {name:.<42s} OLD {t_old:8.3f} мкс | NEW {t_new:8.3f} мкс | {speedup:5.2f}x {marker}")
# ═══════════════════════════════════════════════════════════════════
def main():
print("=" * 74)
print(" Stress Test: OLD vs NEW (горячие функции tg_ws_proxy)")
print("=" * 74)
N = 500_000
# # ── 1. _build_frame masked ────────────────────────────────────
# print(f"\n── _build_frame masked ({N:,} итераций) ──")
# for size, label in [(SMALL, "64B"), (MEDIUM, "1KB"), (LARGE, "64KB")]:
# data_list = [(0x2, os.urandom(size), True) for _ in range(1000)]
# compare(f"build_frame masked {label}",
# build_frame_old, build_frame_new, data_list, N)
# # ── 2. _build_frame unmasked ──────────────────────────────────
# print(f"\n── _build_frame unmasked ({N:,} итераций) ──")
# for size, label in [(SMALL, "64B"), (MEDIUM, "1KB"), (LARGE, "64KB")]:
# data_list = [(0x2, os.urandom(size), False) for _ in range(1000)]
# compare(f"build_frame unmasked {label}",
# build_frame_old, build_frame_new, data_list, N)
# # ── 3. mask key generation ────────────────────────────────────
# print(f"\n── mask key: os.urandom(4) vs PRNG ({N:,} итераций) ──")
# compare("mask_key", mask_key_old, mask_key_new, [()] * 100, N)
# # ── 4. _socks5_reply ─────────────────────────────────────────
N2 = 2_000_000
# print(f"\n── _socks5_reply ({N2:,} итераций) ──")
# compare("socks5_reply", socks5_reply_old, socks5_reply_new,
# [(s,) for s in (0x00, 0x05, 0x07, 0x08)], N2)
# # ── 5. dc_from_init XOR (8 bytes) ────────────────────────────
# print(f"\n── dc_xor 8B: generator vs int.from_bytes ({N2:,} итераций) ──")
# compare("dc_xor_8B", dc_xor_old, dc_xor_new,
# [(os.urandom(8), os.urandom(8)) for _ in range(1000)], N2)
# ── 6. _read_frame struct.unpack vs pre-compiled ─────────────
print(f"\n── struct unpack read-path ({N2:,} итераций) ──")
_st_H_pre = struct.Struct('>H')
_st_Q_pre = struct.Struct('>Q')
h_bufs = [(os.urandom(2),) for _ in range(1000)]
q_bufs = [(os.urandom(8),) for _ in range(1000)]
compare("unpack >H",
lambda b: struct.unpack('>H', b),
lambda b: _st_H_pre.unpack(b),
h_bufs, N2)
compare("unpack >Q",
lambda b: struct.unpack('>Q', b),
lambda b: _st_Q_pre.unpack(b),
q_bufs, N2)
# ── 7. dc_from_init: 2x unpack vs 1x merged ─────────────────
print(f"\n── dc_from_init unpack: 2 calls vs 1 merged ({N2:,} итераций) ──")
_st_Ih = struct.Struct('<Ih')
plains = [(os.urandom(8),) for _ in range(1000)]
def dc_unpack_old(p):
return struct.unpack('<I', p[0:4])[0], struct.unpack('<h', p[4:6])[0]
def dc_unpack_new(p):
return _st_Ih.unpack(p[:6])
compare("dc_unpack", dc_unpack_old, dc_unpack_new, plains, N2)
# ── 8. bytes() copy vs direct slice ──────────────────────────
print(f"\n── bytes(slice) vs direct slice ({N2:,} итераций) ──")
raw_data = [(os.urandom(64),) for _ in range(1000)]
def slice_copy(d):
return bytes(d[8:40]), bytes(d[40:56])
def slice_direct(d):
return d[8:40], d[40:56]
compare("bytes(slice) vs slice", slice_copy, slice_direct, raw_data, N2)
# ── 9. MsgSplitter unpack_from: struct vs pre-compiled ───────
print(f"\n── unpack_from <I: struct vs pre-compiled ({N2:,} итераций) ──")
_st_I_le = struct.Struct('<I')
splitter_bufs = [(os.urandom(64), 1) for _ in range(1000)]
compare("unpack_from <I",
lambda b, p: struct.unpack_from('<I', b, p),
lambda b, p: _st_I_le.unpack_from(b, p),
splitter_bufs, N2)
print("\n" + "=" * 74)
print(" Готово!")
print("=" * 74)
if __name__ == "__main__":
main()

View File

@@ -3,23 +3,28 @@ from __future__ import annotations
import ctypes import ctypes
import json import json
import logging import logging
import logging.handlers
import os import os
import winreg
import psutil import psutil
import sys import sys
import threading import threading
import time import time
import webbrowser import webbrowser
import pystray
import pyperclip import pyperclip
import asyncio as _asyncio import asyncio as _asyncio
import customtkinter as ctk
from pathlib import Path from pathlib import Path
from typing import Dict, Optional from typing import Dict, Optional
import pystray
import customtkinter as ctk
from PIL import Image, ImageDraw, ImageFont from PIL import Image, ImageDraw, ImageFont
import proxy.tg_ws_proxy as tg_ws_proxy import proxy.tg_ws_proxy as tg_ws_proxy
IS_FROZEN = bool(getattr(sys, "frozen", False))
APP_NAME = "TgWsProxy" APP_NAME = "TgWsProxy"
APP_DIR = Path(os.environ.get("APPDATA", Path.home())) / APP_NAME APP_DIR = Path(os.environ.get("APPDATA", Path.home())) / APP_NAME
CONFIG_FILE = APP_DIR / "config.json" CONFIG_FILE = APP_DIR / "config.json"
@@ -33,6 +38,10 @@ DEFAULT_CONFIG = {
"host": "127.0.0.1", "host": "127.0.0.1",
"dc_ip": ["2:149.154.167.220", "4:149.154.167.220"], "dc_ip": ["2:149.154.167.220", "4:149.154.167.220"],
"verbose": False, "verbose": False,
"autostart": False,
"log_max_mb": 5,
"buf_kb": 256,
"pool_size": 4,
} }
@@ -143,12 +152,17 @@ def save_config(cfg: dict):
json.dump(cfg, f, indent=2, ensure_ascii=False) json.dump(cfg, f, indent=2, ensure_ascii=False)
def setup_logging(verbose: bool = False): def setup_logging(verbose: bool = False, log_max_mb: float = 5):
_ensure_dirs() _ensure_dirs()
root = logging.getLogger() root = logging.getLogger()
root.setLevel(logging.DEBUG if verbose else logging.INFO) root.setLevel(logging.DEBUG if verbose else logging.INFO)
fh = logging.FileHandler(str(LOG_FILE), encoding="utf-8") fh = logging.handlers.RotatingFileHandler(
str(LOG_FILE),
maxBytes=max(32 * 1024, log_max_mb * 1024 * 1024),
backupCount=0,
encoding='utf-8',
)
fh.setLevel(logging.DEBUG) fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter( fh.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-5s %(name)s %(message)s", "%(asctime)s %(levelname)-5s %(name)s %(message)s",
@@ -164,6 +178,64 @@ def setup_logging(verbose: bool = False):
root.addHandler(ch) root.addHandler(ch)
def _autostart_reg_name() -> str:
return APP_NAME
def _supports_autostart() -> bool:
return IS_FROZEN
def _autostart_command() -> str:
return f'"{sys.executable}"'
def is_autostart_enabled() -> bool:
try:
with winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
0,
winreg.KEY_READ,
) as k:
val, _ = winreg.QueryValueEx(k, _autostart_reg_name())
stored = str(val).strip()
expected = _autostart_command().strip()
return stored == expected
except FileNotFoundError:
return False
except OSError:
return False
def set_autostart_enabled(enabled: bool) -> None:
try:
with winreg.CreateKey(
winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Run",
) as k:
if enabled:
winreg.SetValueEx(
k,
_autostart_reg_name(),
0,
winreg.REG_SZ,
_autostart_command(),
)
else:
try:
winreg.DeleteValue(k, _autostart_reg_name())
except FileNotFoundError:
pass
except OSError as exc:
log.error("Failed to update autostart: %s", exc)
_show_error(
"Не удалось изменить автозапуск.\n\n"
"Попробуйте запустить приложение от имени пользователя с правами на реестр.\n\n"
f"Ошибка: {exc}"
)
def _make_icon_image(size: int = 64): def _make_icon_image(size: int = 64):
if Image is None: if Image is None:
raise RuntimeError("Pillow is required for tray icon") raise RuntimeError("Pillow is required for tray icon")
@@ -238,6 +310,13 @@ def start_proxy():
return return
log.info("Starting proxy on %s:%d ...", host, port) log.info("Starting proxy on %s:%d ...", host, port)
buf_kb = cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])
pool_size = cfg.get("pool_size", DEFAULT_CONFIG["pool_size"])
tg_ws_proxy._RECV_BUF = max(4, buf_kb) * 1024
tg_ws_proxy._SEND_BUF = tg_ws_proxy._RECV_BUF
tg_ws_proxy._WS_POOL_SIZE = max(0, pool_size)
_proxy_thread = threading.Thread( _proxy_thread = threading.Thread(
target=_run_proxy_thread, target=_run_proxy_thread,
args=(port, dc_opt, verbose, host), args=(port, dc_opt, verbose, host),
@@ -285,7 +364,7 @@ def _on_open_in_telegram(icon=None, item=None):
pyperclip.copy(url) pyperclip.copy(url)
_show_info( _show_info(
f"Не удалось открыть Telegram автоматически.\n\n" f"Не удалось открыть Telegram автоматически.\n\n"
f"Ссылка скопирована в буфер обмена, отправьте её в телеграмм и нажмите по ней ЛКМ:\n{url}", f"Ссылка скопирована в буфер обмена, отправьте её в Telegram и нажмите по ней ЛКМ:\n{url}",
"TG WS Proxy") "TG WS Proxy")
except Exception as exc: except Exception as exc:
log.error("Clipboard copy failed: %s", exc) log.error("Clipboard copy failed: %s", exc)
@@ -306,6 +385,12 @@ def _edit_config_dialog():
return return
cfg = dict(_config) cfg = dict(_config)
cfg["autostart"] = is_autostart_enabled()
# Make sure that the autostart key is removed if autostart
# is disabled, even if the executable file is moved.
if _supports_autostart() and not cfg["autostart"]:
set_autostart_enabled(False)
ctk.set_appearance_mode("light") ctk.set_appearance_mode("light")
ctk.set_default_color_theme("blue") ctk.set_default_color_theme("blue")
@@ -314,6 +399,8 @@ def _edit_config_dialog():
root.title("TG WS Proxy — Настройки") root.title("TG WS Proxy — Настройки")
root.resizable(False, False) root.resizable(False, False)
root.attributes("-topmost", True) root.attributes("-topmost", True)
icon_path = str(Path(__file__).parent / "icon.ico")
root.iconbitmap(icon_path)
TG_BLUE = "#3390ec" TG_BLUE = "#3390ec"
TG_BLUE_HOVER = "#2b7cd4" TG_BLUE_HOVER = "#2b7cd4"
@@ -324,7 +411,11 @@ def _edit_config_dialog():
TEXT_SECONDARY = "#707579" TEXT_SECONDARY = "#707579"
FONT_FAMILY = "Segoe UI" FONT_FAMILY = "Segoe UI"
w, h = 420, 480 w, h = 420, 540
if _supports_autostart():
h += 70
sw = root.winfo_screenwidth() sw = root.winfo_screenwidth()
sh = root.winfo_screenheight() sh = root.winfo_screenheight()
root.geometry(f"{w}x{h}+{(sw-w)//2}+{(sh-h)//2}") root.geometry(f"{w}x{h}+{(sw-w)//2}+{(sh-h)//2}")
@@ -375,10 +466,42 @@ def _edit_config_dialog():
corner_radius=6, border_width=2, corner_radius=6, border_width=2,
border_color=FIELD_BORDER).pack(anchor="w", pady=(0, 8)) border_color=FIELD_BORDER).pack(anchor="w", pady=(0, 8))
# Info label # Advanced: buf_kb, pool_size, log_max_mb
ctk.CTkLabel(frame, text="Изменения вступят в силу после перезапуска прокси.", adv_frame = ctk.CTkFrame(frame, fg_color="transparent")
font=(FONT_FAMILY, 11), text_color=TEXT_SECONDARY, adv_frame.pack(anchor="w", fill="x", pady=(4, 8))
anchor="w").pack(anchor="w", pady=(0, 16))
for col, (lbl, key, w_) in enumerate([
("Буфер (KB, 256 default)", "buf_kb", 120),
("WS пулов (4 default)", "pool_size", 120),
("Log size (MB, 5 def)", "log_max_mb", 120),
]):
col_frame = ctk.CTkFrame(adv_frame, fg_color="transparent")
col_frame.pack(side="left", padx=(0, 10))
ctk.CTkLabel(col_frame, text=lbl, font=(FONT_FAMILY, 11),
text_color=TEXT_SECONDARY, anchor="w").pack(anchor="w")
ctk.CTkEntry(col_frame, width=w_, height=30, font=(FONT_FAMILY, 12),
corner_radius=8, fg_color=FIELD_BG,
border_color=FIELD_BORDER, border_width=1,
text_color=TEXT_PRIMARY,
textvariable=ctk.StringVar(
value=str(cfg.get(key, DEFAULT_CONFIG[key]))
)).pack(anchor="w")
_adv_entries = list(adv_frame.winfo_children())
_adv_keys = ["buf_kb", "pool_size", "log_max_mb"]
autostart_var = None
if _supports_autostart():
autostart_var = ctk.BooleanVar(value=cfg["autostart"])
ctk.CTkCheckBox(frame, text="Автозапуск при включении Windows",
variable=autostart_var, font=(FONT_FAMILY, 13),
text_color=TEXT_PRIMARY,
fg_color=TG_BLUE, hover_color=TG_BLUE_HOVER,
corner_radius=6, border_width=2,
border_color=FIELD_BORDER).pack(anchor="w", pady=(0, 8))
ctk.CTkLabel(frame, text="При перемещении файла или открытии из другой папки\nавтозапуск будет сброшен",
font=(FONT_FAMILY, 13), text_color=TEXT_SECONDARY,
anchor="w", justify="left").pack(anchor="w", pady=(0, 8))
def on_save(): def on_save():
import socket as _sock import socket as _sock
@@ -410,11 +533,26 @@ def _edit_config_dialog():
"port": port_val, "port": port_val,
"dc_ip": lines, "dc_ip": lines,
"verbose": verbose_var.get(), "verbose": verbose_var.get(),
"autostart": (autostart_var.get() if autostart_var is not None else False),
} }
for i, key in enumerate(_adv_keys):
col_frame = _adv_entries[i]
entry = col_frame.winfo_children()[1]
try:
val = float(entry.get().strip())
if key in ("buf_kb", "pool_size"):
val = int(val)
new_cfg[key] = val
except ValueError:
new_cfg[key] = DEFAULT_CONFIG[key]
save_config(new_cfg) save_config(new_cfg)
_config.update(new_cfg) _config.update(new_cfg)
log.info("Config saved: %s", new_cfg) log.info("Config saved: %s", new_cfg)
if _supports_autostart():
set_autostart_enabled(bool(new_cfg.get("autostart", False)))
_tray_icon.menu = _build_menu() _tray_icon.menu = _build_menu()
from tkinter import messagebox from tkinter import messagebox
@@ -431,18 +569,18 @@ def _edit_config_dialog():
root.destroy() root.destroy()
btn_frame = ctk.CTkFrame(frame, fg_color="transparent") btn_frame = ctk.CTkFrame(frame, fg_color="transparent")
btn_frame.pack(fill="x") btn_frame.pack(fill="x", pady=(20, 0))
ctk.CTkButton(btn_frame, text="Сохранить", width=140, height=38, ctk.CTkButton(btn_frame, text="Сохранить", height=38,
font=(FONT_FAMILY, 14, "bold"), corner_radius=10, font=(FONT_FAMILY, 14, "bold"), corner_radius=10,
fg_color=TG_BLUE, hover_color=TG_BLUE_HOVER, fg_color=TG_BLUE, hover_color=TG_BLUE_HOVER,
text_color="#ffffff", text_color="#ffffff",
command=on_save).pack(side="left", padx=(0, 10)) command=on_save).pack(side="left", fill="x", expand=True, padx=(0, 8))
ctk.CTkButton(btn_frame, text="Отмена", width=140, height=38, ctk.CTkButton(btn_frame, text="Отмена", height=38,
font=(FONT_FAMILY, 14), corner_radius=10, font=(FONT_FAMILY, 14), corner_radius=10,
fg_color=FIELD_BG, hover_color=FIELD_BORDER, fg_color=FIELD_BG, hover_color=FIELD_BORDER,
text_color=TEXT_PRIMARY, border_width=1, text_color=TEXT_PRIMARY, border_width=1,
border_color=FIELD_BORDER, border_color=FIELD_BORDER,
command=on_cancel).pack(side="left") command=on_cancel).pack(side="right", fill="x", expand=True)
root.mainloop() root.mainloop()
@@ -502,6 +640,8 @@ def _show_first_run():
root.title("TG WS Proxy") root.title("TG WS Proxy")
root.resizable(False, False) root.resizable(False, False)
root.attributes("-topmost", True) root.attributes("-topmost", True)
icon_path = str(Path(__file__).parent / "icon.ico")
root.iconbitmap(icon_path)
w, h = 520, 440 w, h = 520, 440
sw = root.winfo_screenwidth() sw = root.winfo_screenwidth()
@@ -608,13 +748,15 @@ def _check_ipv6_warning():
def _show_ipv6_dialog(): def _show_ipv6_dialog():
_show_info( _show_info(
"На вашем компьютере включён IPv6.\n\n" "На вашем компьютере включена поддержка подключения по IPv6.\n\n"
"Telegram может пытаться подключаться через IPv6, " "Telegram может пытаться подключаться через IPv6, "
"что не поддерживается и может привести к ошибкам.\n\n" "что не поддерживается и может привести к ошибкам.\n\n"
"Если прокси не работает или в логах видны ошибки связанные с IPv6, " "Если прокси не работает или в логах присутствуют ошибки, "
"то проверьте в настройках Telegram, что рядом с настройкой прокси не включён " "связанные с попытками подключения по IPv6 - "
"пунукт про IPv6. Если это не поможет, то выключите IPv6 в системе\n\n" "попробуйте отключить в настройках прокси Telegram попытку соединения "
"Это предупрждение будет показано только один раз.", "по IPv6. Если данная мера не помогает, попробуйте отключить IPv6 "
"в системе.\n\n"
"Это предупреждение будет показано только один раз.",
"TG WS Proxy") "TG WS Proxy")
@@ -649,7 +791,8 @@ def run_tray():
except Exception: except Exception:
pass pass
setup_logging(_config.get("verbose", False)) setup_logging(_config.get("verbose", False),
log_max_mb=_config.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"]))
log.info("TG WS Proxy tray app starting") log.info("TG WS Proxy tray app starting")
log.info("Config: %s", _config) log.info("Config: %s", _config)
log.info("Log file: %s", LOG_FILE) log.info("Log file: %s", LOG_FILE)