11 Commits

Author SHA1 Message Date
Flowseal
d4f8b51326 version bump 2026-05-30 20:34:26 +03:00
Flowseal
ca431633d7 Version bump 2026-05-30 20:32:11 +03:00
Flowseal
ea4e8e790a Possibility to pass few cfproxy and worker domains 2026-05-30 20:30:47 +03:00
Flowseal
05d6de269b import path fixes 2026-05-30 19:39:58 +03:00
Flowseal
1c4b103df2 Pool for cloudflare worker 2026-05-30 19:34:47 +03:00
Erik
23f0e4d426 Fall back to system libcrypto when cryptography is unavailable (#894) 2026-05-30 19:31:47 +03:00
Konukhov Yaroslav
49e62ca142 perf(bridge): split MTProto packets in O(N) instead of O(N^2) (#913) 2026-05-30 19:25:56 +03:00
delewer
5915a0e1f3 docs: update images (#858) 2026-05-17 01:04:37 +03:00
Flowseal
7bc9e133c8 Update README.md 2026-05-16 20:01:26 +03:00
Flowseal
12d3d5e478 Update README.md 2026-05-16 20:00:34 +03:00
Flowseal
b7cca232ea Update CfWorker.md 2026-05-16 11:47:56 +03:00
19 changed files with 608 additions and 232 deletions

View File

@@ -47,8 +47,8 @@ tg-ws-proxy [--port PORT] [--host HOST] [--dc-ip DC:IP ...] [-v]
| `--secret` | `random` | 32-значный hex-ключ для авторизации клиентов |
| `--dc-ip` | `2:149.154.167.220`, `4:149.154.167.220` | Целевой IP для DC (параметр можно указывать несколько раз) |
| `--no-cfproxy` | `false` | Отключить попытку [проксирования через Cloudflare](./CfProxy.md) |
| `--cfproxy-domain` | | Указать свой домен для проксирования через Cloudflare. [Подробнее](./CfProxy.md) |
| `--cfproxy-worker-domain` | | Домен Cloudflare Worker [Подробнее](./CfWorker.md) |
| `--cfproxy-domain` | | Указать свой домен для проксирования через Cloudflare [Подробнее](./CfProxy.md). Можно указать несколько через повторение аргумента. |
| `--cfproxy-worker-domain` | | Домен Cloudflare Worker [Подробнее](./CfWorker.md). Можно указать несколько через повторение аргумента. |
| `--fake-tls-domain` | | Включить маскировку Fake TLS (ee-secret) с указанным SNI-доменом |
| `--proxy-protocol` | выкл. | Принимать HAProxy PROXY protocol v1 (для работы за nginx/haproxy с `proxy_protocol on`) |
| `--buf-kb` | `256` | Размер буфера в КБ |

View File

@@ -13,6 +13,7 @@ cloudflare.dev
workers.dev
```
2. Создайте аккаунт в [Cloudflare](https://dash.cloudflare.com/) (или войдите в существующий)
* **После создания аккаунта подтвердите почту с помощью письма, который вам пришел на email**
3. Слева в панели выберите `Compute``Workers & Pages`
<img width="250" height="768" alt="image" src="https://github.com/user-attachments/assets/d81e3522-045a-4e65-9c2e-5545b7ad409a" />

View File

@@ -1,3 +1,12 @@
<div align="center">
<br />
<p>
<img width="1729" height="910" alt="tgwsproxy" src="./images/workflow.png" />
</p>
</div>
##
> [!TIP]
>
> ### [🎉 Поддержать меня](./Funding.md)
@@ -24,8 +33,8 @@
**Локальный MTProto-прокси** для Telegram Desktop, который **ускоряет работу Telegram**, перенаправляя трафик через WebSocket-соединения. Данные передаются в том же зашифрованном виде, а для работы не нужны сторонние серверы.
<picture>
<source srcset="https://github.com/user-attachments/assets/17f1d15e-e1c2-41ea-a452-220d13359262" media="(prefers-color-scheme: dark)">
<img src="https://github.com/user-attachments/assets/8d595468-83a1-4e4f-bac4-9ce4a07027bd">
<source srcset="./images/preview-dark.png" media="(prefers-color-scheme: dark)">
<img src="./images/preview-white.png">
</picture>
## Навигация

Binary file not shown.

After

Width:  |  Height:  |  Size: 245 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 233 KiB

BIN
docs/images/workflow.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.0 MiB

View File

@@ -24,7 +24,7 @@ try:
except ImportError:
pyperclip = None
from proxy import __version__, get_link_host, parse_dc_ip_list, proxy_config
from proxy import __version__, get_link_host, parse_dc_ip_list, proxy_config, coerce_domain_list
from proxy.tg_ws_proxy import _run
from utils.tray_common import (
@@ -115,7 +115,7 @@ def _ask_cfworker_domain(default: str) -> Optional[str]:
value = default
while True:
script = (
f'set d to display dialog "{_esc("Cloudflare Worker домен (например, name.account.workers.dev):")}" '
f'set d to display dialog "{_esc("Cloudflare Worker домены через запятую (например, name.account.workers.dev):")}" '
f'default answer "{_esc(value)}" '
f'with title "TG WS Proxy" '
f'buttons {{"Закрыть", "?", "OK"}} '
@@ -425,19 +425,24 @@ def _edit_config_dialog() -> None:
return
cfproxy_domain = _osascript_input(
"Свой CF-домен (оставьте пустым для автоматического выбора):\n"
"Свои CF-домены через запятую (оставьте пустым для автоматического выбора):\n"
"DNS записи kws1-kws5,kws203 должны указывать на IP датацентров Telegram через Cloudflare.",
cfg.get("cfproxy_user_domain", DEFAULT_CONFIG.get("cfproxy_user_domain", "")),
", ".join(coerce_domain_list(
cfg.get("cfproxy_user_domain", DEFAULT_CONFIG.get("cfproxy_user_domain", []))
)),
)
if cfproxy_domain is None:
return
cfproxy_domain = cfproxy_domain.strip()
cfproxy_domains = coerce_domain_list(cfproxy_domain)
cfworker_domain = _ask_cfworker_domain(
cfg.get("cfproxy_worker_domain", DEFAULT_CONFIG.get("cfproxy_worker_domain", ""))
", ".join(coerce_domain_list(
cfg.get("cfproxy_worker_domain", DEFAULT_CONFIG.get("cfproxy_worker_domain", []))
))
)
if cfworker_domain is None:
return
cfworker_domains = coerce_domain_list(cfworker_domain)
new_cfg = {
"host": host,
@@ -450,8 +455,8 @@ def _edit_config_dialog() -> None:
"log_max_mb": adv.get("log_max_mb", cfg.get("log_max_mb", DEFAULT_CONFIG["log_max_mb"])),
"check_updates": cfg.get("check_updates", True),
"cfproxy": cfproxy,
"cfproxy_user_domain": cfproxy_domain,
"cfproxy_worker_domain": cfworker_domain,
"cfproxy_user_domain": cfproxy_domains,
"cfproxy_worker_domain": cfworker_domains,
}
save_config(new_cfg)
log.info("Config saved: %s", new_cfg)

View File

@@ -4,8 +4,8 @@
# http://msdn.microsoft.com/en-us/library/ms646997.aspx
VSVersionInfo(
ffi=FixedFileInfo(
filevers=(1, 7, 0, 0),
prodvers=(1, 7, 0, 0),
filevers=(1, 7, 1, 0),
prodvers=(1, 7, 1, 0),
mask=0x3f,
flags=0x0,
OS=0x40004,
@@ -21,12 +21,12 @@ VSVersionInfo(
[
StringStruct(u'CompanyName', u'Flowseal'),
StringStruct(u'FileDescription', u'Telegram Desktop WebSocket Bridge Proxy'),
StringStruct(u'FileVersion', u'1.7.0.0'),
StringStruct(u'FileVersion', u'1.7.1.0'),
StringStruct(u'InternalName', u'TgWsProxy'),
StringStruct(u'LegalCopyright', u'Copyright (c) Flowseal. MIT License.'),
StringStruct(u'OriginalFilename', u'TgWsProxy.exe'),
StringStruct(u'ProductName', u'TG WS Proxy'),
StringStruct(u'ProductVersion', u'1.7.0.0'),
StringStruct(u'ProductVersion', u'1.7.1.0'),
]
)
]

View File

@@ -1,6 +1,6 @@
from .config import parse_dc_ip_list, proxy_config
from .config import parse_dc_ip_list, proxy_config, coerce_domain_list
from .utils import get_link_host, build_github_opener
__version__ = "1.7.0"
__version__ = "1.7.1"
__all__ = ["__version__", "get_link_host", "proxy_config", "parse_dc_ip_list", "build_github_opener"]
__all__ = ["__version__", "get_link_host", "proxy_config", "parse_dc_ip_list", "build_github_opener", "coerce_domain_list"]

130
proxy/_aes.py Normal file
View File

@@ -0,0 +1,130 @@
"""
AES-CTR shim.
Prefers `cryptography` if available (desktop / Docker). Falls back to a
ctypes wrapper over the system OpenSSL `libcrypto` for environments where
installing `cryptography` is painful (Entware on routers, embedded boxes
without a Rust toolchain). The public surface mimics the small subset of
`cryptography.hazmat.primitives.ciphers` that this project actually uses:
Cipher(algorithms.AES(key), modes.CTR(iv)).encryptor().update(data)
"""
from __future__ import annotations
try:
from cryptography.hazmat.primitives.ciphers import ( # noqa: F401
Cipher, algorithms, modes,
)
except ImportError:
import ctypes
import ctypes.util
def _load_libcrypto():
name = ctypes.util.find_library("crypto")
candidates = []
if name:
candidates.append(name)
candidates += [
"libcrypto.so.3", "libcrypto.so.1.1", "libcrypto.so.1.0.0",
"libcrypto.so", "/opt/lib/libcrypto.so",
"/opt/lib/libcrypto.so.1.1", "/opt/lib/libcrypto.so.3",
]
last_err = None
for c in candidates:
try:
return ctypes.CDLL(c)
except OSError as e:
last_err = e
raise RuntimeError(
"libcrypto not found; install openssl-util or "
"`opkg install libopenssl`. Last error: %r" % last_err
)
_libcrypto = _load_libcrypto()
_libcrypto.EVP_CIPHER_CTX_new.restype = ctypes.c_void_p
_libcrypto.EVP_CIPHER_CTX_free.argtypes = [ctypes.c_void_p]
_libcrypto.EVP_aes_128_ctr.restype = ctypes.c_void_p
_libcrypto.EVP_aes_192_ctr.restype = ctypes.c_void_p
_libcrypto.EVP_aes_256_ctr.restype = ctypes.c_void_p
_libcrypto.EVP_EncryptInit_ex.argtypes = [
ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p,
ctypes.c_char_p, ctypes.c_char_p,
]
_libcrypto.EVP_EncryptInit_ex.restype = ctypes.c_int
_libcrypto.EVP_EncryptUpdate.argtypes = [
ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int),
ctypes.c_char_p, ctypes.c_int,
]
_libcrypto.EVP_EncryptUpdate.restype = ctypes.c_int
_EVP_BY_KEY = {
16: _libcrypto.EVP_aes_128_ctr,
24: _libcrypto.EVP_aes_192_ctr,
32: _libcrypto.EVP_aes_256_ctr,
}
class algorithms:
class AES:
__slots__ = ("key",)
def __init__(self, key: bytes):
if len(key) not in _EVP_BY_KEY:
raise ValueError("AES key must be 16/24/32 bytes")
self.key = bytes(key)
class modes:
class CTR:
__slots__ = ("iv",)
def __init__(self, iv: bytes):
if len(iv) != 16:
raise ValueError("CTR IV must be 16 bytes")
self.iv = bytes(iv)
class _CtrStream:
__slots__ = ("_ctx",)
def __init__(self, key: bytes, iv: bytes):
ctx = _libcrypto.EVP_CIPHER_CTX_new()
if not ctx:
raise RuntimeError("EVP_CIPHER_CTX_new failed")
self._ctx = ctx
evp = _EVP_BY_KEY[len(key)]()
if _libcrypto.EVP_EncryptInit_ex(ctx, evp, None, key, iv) != 1:
_libcrypto.EVP_CIPHER_CTX_free(ctx)
self._ctx = None
raise RuntimeError("EVP_EncryptInit_ex failed")
def update(self, data: bytes) -> bytes:
if not data:
return b""
outlen = ctypes.c_int(0)
buf = ctypes.create_string_buffer(len(data) + 16)
if _libcrypto.EVP_EncryptUpdate(
self._ctx, buf, ctypes.byref(outlen), bytes(data), len(data)
) != 1:
raise RuntimeError("EVP_EncryptUpdate failed")
return buf.raw[:outlen.value]
def __del__(self):
ctx = getattr(self, "_ctx", None)
if ctx:
_libcrypto.EVP_CIPHER_CTX_free(ctx)
self._ctx = None
class Cipher:
__slots__ = ("_key", "_iv")
def __init__(self, algorithm, mode):
if not isinstance(algorithm, algorithms.AES):
raise TypeError("only AES is supported")
if not isinstance(mode, modes.CTR):
raise TypeError("only CTR mode is supported")
self._key = algorithm.key
self._iv = mode.iv
def encryptor(self) -> _CtrStream:
return _CtrStream(self._key, self._iv)
# CTR is symmetric — decryption == encryption with the same keystream.
decryptor = encryptor

View File

@@ -2,8 +2,7 @@ import asyncio
import logging
import struct
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from typing import Dict, List, Optional
from typing import List, Optional
from urllib.parse import urlencode
from .utils import *
@@ -11,20 +10,14 @@ from .stats import stats
from .balancer import balancer
from .config import proxy_config
from .raw_websocket import RawWebSocket
from .pool import cf_worker_pool
from ._aes import Cipher, algorithms, modes
log = logging.getLogger('tg-mtproto-proxy')
_st_I_le = struct.Struct('<I')
ZERO_64 = b'\x00' * 64
DC_DEFAULT_IPS: Dict[int, str] = {
1: '149.154.175.50',
2: '149.154.167.51',
3: '149.154.175.100',
4: '149.154.167.91',
5: '149.154.171.5',
203: '91.105.192.100'
}
class CryptoCtx:
@@ -64,19 +57,27 @@ class MsgSplitter:
self._plain_buf.extend(self._dec.update(chunk))
parts = []
while self._cipher_buf:
packet_len = self._next_packet_len()
offset = 0
buf_len = len(self._cipher_buf)
# Walk the buffer with an offset instead of deleting each packet from
# the front. Front-deletion on a bytearray shifts the remaining bytes,
# so a chunk holding many small packets degrades to O(N^2); a single
# trailing del keeps splitting O(N).
while offset < buf_len:
packet_len = self._next_packet_len(offset, buf_len - offset)
if packet_len is None:
break
if packet_len <= 0:
parts.append(bytes(self._cipher_buf))
self._cipher_buf.clear()
self._plain_buf.clear()
parts.append(bytes(self._cipher_buf[offset:]))
offset = buf_len
self._disabled = True
break
parts.append(bytes(self._cipher_buf[:packet_len]))
del self._cipher_buf[:packet_len]
del self._plain_buf[:packet_len]
parts.append(bytes(self._cipher_buf[offset:offset + packet_len]))
offset += packet_len
if offset:
del self._cipher_buf[:offset]
del self._plain_buf[:offset]
return parts
def flush(self) -> List[bytes]:
@@ -87,22 +88,23 @@ class MsgSplitter:
self._plain_buf.clear()
return [tail]
def _next_packet_len(self) -> Optional[int]:
if not self._plain_buf:
def _next_packet_len(self, offset: int, avail: int) -> Optional[int]:
if avail <= 0:
return None
if self._proto == PROTO_ABRIDGED_INT:
return self._next_abridged_len()
return self._next_abridged_len(offset, avail)
if self._proto in (PROTO_INTERMEDIATE_INT,
PROTO_PADDED_INTERMEDIATE_INT):
return self._next_intermediate_len()
return self._next_intermediate_len(offset, avail)
return 0
def _next_abridged_len(self) -> Optional[int]:
first = self._plain_buf[0]
def _next_abridged_len(self, offset: int, avail: int) -> Optional[int]:
first = self._plain_buf[offset]
if first in (0x7F, 0xFF):
if len(self._plain_buf) < 4:
if avail < 4:
return None
payload_len = int.from_bytes(self._plain_buf[1:4], 'little') * 4
payload_len = int.from_bytes(
self._plain_buf[offset + 1:offset + 4], 'little') * 4
header_len = 4
else:
payload_len = (first & 0x7F) * 4
@@ -110,33 +112,32 @@ class MsgSplitter:
if payload_len <= 0:
return 0
packet_len = header_len + payload_len
if len(self._plain_buf) < packet_len:
if avail < packet_len:
return None
return packet_len
def _next_intermediate_len(self) -> Optional[int]:
if len(self._plain_buf) < 4:
def _next_intermediate_len(self, offset: int, avail: int) -> Optional[int]:
if avail < 4:
return None
payload_len = _st_I_le.unpack_from(self._plain_buf, 0)[0] & 0x7FFFFFFF
payload_len = _st_I_le.unpack_from(self._plain_buf, offset)[0] & 0x7FFFFFFF
if payload_len <= 0:
return 0
packet_len = 4 + payload_len
if len(self._plain_buf) < packet_len:
if avail < packet_len:
return None
return packet_len
async def do_fallback(reader, writer, relay_init, label,
dc: int, is_media: bool, media_tag: str,
ctx: CryptoCtx, splitter=None):
fallback_dst = DC_DEFAULT_IPS.get(dc)
use_cf = proxy_config.fallback_cfproxy
worker_domain = proxy_config.cfproxy_worker_domain
worker_domains = proxy_config.cfproxy_worker_domains
methods: List[str] = []
if worker_domain and fallback_dst:
if worker_domains and fallback_dst:
methods.append('cf_worker')
if use_cf:
methods.append('cf')
@@ -175,34 +176,40 @@ async def _cfproxy_worker_fallback(reader, writer, relay_init, label,
fallback_dst: str,
splitter=None):
media_tag = ' media' if is_media else ''
worker_domain = proxy_config.cfproxy_worker_domain
if not worker_domain:
worker_domains = proxy_config.cfproxy_worker_domains
if not worker_domains:
return False
query = urlencode({
'dst': fallback_dst,
'dc': str(dc),
'media': '1' if is_media else '0',
})
path = f'/apiws?{query}'
for worker_domain in worker_domains:
ws = await cf_worker_pool.get(dc, worker_domain, fallback_dst)
if ws:
log.info("[%s] DC%d%s -> CF worker pool hit for %s",
label, dc, media_tag, fallback_dst)
else:
query = urlencode({
'dst': fallback_dst,
'dc': str(dc),
})
path = f'/apiws?{query}'
log.info("[%s] DC%d%s -> trying CF worker for %s",
label, dc, media_tag, fallback_dst)
log.info("[%s] DC%d%s -> trying CF worker %s for %s",
label, dc, media_tag, worker_domain, fallback_dst)
try:
ws = await RawWebSocket.connect(worker_domain, worker_domain,
timeout=10.0, path=path)
except Exception as exc:
log.warning("[%s] DC%d%s CF worker failed: %s",
label, dc, media_tag, repr(exc))
return False
try:
ws = await RawWebSocket.connect(worker_domain, worker_domain,
timeout=10.0, path=path)
except Exception as exc:
log.warning("[%s] DC%d%s CF worker %s failed: %s",
label, dc, media_tag, worker_domain, repr(exc))
continue
stats.connections_cfproxy += 1
await ws.send(relay_init)
await bridge_ws_reencrypt(reader, writer, ws, label, ctx,
dc=dc, is_media=is_media,
splitter=splitter)
return True
stats.connections_cfproxy += 1
await ws.send(relay_init)
await bridge_ws_reencrypt(reader, writer, ws, label, ctx,
dc=dc, is_media=is_media,
splitter=splitter)
return True
return False
async def _cfproxy_fallback(reader, writer, relay_init, label,

View File

@@ -58,8 +58,8 @@ class ProxyConfig:
buffer_size: int = 256 * 1024
pool_size: int = 4
fallback_cfproxy: bool = True
cfproxy_user_domain: str = ''
cfproxy_worker_domain: str = ''
cfproxy_user_domains: List[str] = field(default_factory=list)
cfproxy_worker_domains: List[str] = field(default_factory=list)
fake_tls_domain: str = ''
proxy_protocol: bool = False
@@ -67,6 +67,30 @@ class ProxyConfig:
proxy_config = ProxyConfig()
def coerce_domain_list(value) -> List[str]:
if isinstance(value, str):
items = value.replace(',', ' ').replace(';', ' ').split()
elif isinstance(value, (list, tuple)):
items: List[str] = []
for entry in value:
if isinstance(entry, str):
items.extend(entry.replace(',', ' ').replace(';', ' ').split())
else:
return []
seen = set()
result: List[str] = []
for item in items:
item = item.strip()
if not item:
continue
key = item.lower()
if key in seen:
continue
seen.add(key)
result.append(item)
return result
def _fetch_cfproxy_domain_list() -> List[str]:
try:
req = Request(CFPROXY_DOMAINS_URL + "?" + "".join(random.choices(string.ascii_letters, k=7)),
@@ -120,7 +144,7 @@ def _normalize_domain_pool(domains: List[str]) -> List[str]:
def refresh_cfproxy_domains() -> None:
if proxy_config.cfproxy_user_domain:
if proxy_config.cfproxy_user_domains:
return
fetched = _fetch_cfproxy_domain_list()

214
proxy/pool.py Normal file
View File

@@ -0,0 +1,214 @@
import asyncio
import logging
import time
from collections import deque
from urllib.parse import urlencode
from typing import Dict, List, Optional, Tuple, Set
from .raw_websocket import RawWebSocket, WsHandshakeError
from .stats import stats
from .config import proxy_config
from .utils import ws_domains, DC_DEFAULT_IPS
log = logging.getLogger('tg-mtproto-proxy')
class _WsPool:
WS_POOL_MAX_AGE = 120.0
def __init__(self):
self._idle: Dict[Tuple[int, bool], deque] = {}
self._refilling: Set[Tuple[int, bool]] = set()
async def get(self, dc: int, is_media: bool,
target_ip: str, domains: List[str]
) -> Optional[RawWebSocket]:
key = (dc, is_media)
now = time.monotonic()
bucket = self._idle.get(key)
if bucket is None:
bucket = deque()
self._idle[key] = bucket
while bucket:
ws, created = bucket.popleft()
age = now - created
if (age > self.WS_POOL_MAX_AGE or ws._closed
or ws.writer.transport.is_closing()):
asyncio.create_task(self._quiet_close(ws))
continue
stats.pool_hits += 1
log.debug("WS pool hit DC%d%s (age=%.1fs, left=%d)",
dc, 'm' if is_media else '', age, len(bucket))
self._schedule_refill(key, target_ip, domains)
return ws
stats.pool_misses += 1
self._schedule_refill(key, target_ip, domains)
return None
def _schedule_refill(self, key, target_ip, domains):
if key in self._refilling:
return
self._refilling.add(key)
asyncio.create_task(self._refill(key, target_ip, domains))
async def _refill(self, key, target_ip, domains):
dc, is_media = key
try:
bucket = self._idle.setdefault(key, deque())
needed = proxy_config.pool_size - len(bucket)
if needed <= 0:
return
tasks = [asyncio.create_task(
self._connect_one(target_ip, domains))
for _ in range(needed)]
for t in tasks:
try:
ws = await t
if ws:
bucket.append((ws, time.monotonic()))
except Exception:
pass
log.debug("WS pool refilled DC%d%s: %d ready",
dc, 'm' if is_media else '', len(bucket))
finally:
self._refilling.discard(key)
@staticmethod
async def _connect_one(target_ip, domains) -> Optional[RawWebSocket]:
for domain in domains:
try:
return await RawWebSocket.connect(
target_ip, domain, timeout=8)
except WsHandshakeError as exc:
if exc.is_redirect:
continue
return None
except Exception:
return None
return None
@staticmethod
async def _quiet_close(ws):
try:
await ws.close()
except Exception:
pass
async def warmup(self):
for dc, target_ip in proxy_config.dc_redirects.items():
if target_ip is None:
continue
for is_media in (False, True):
domains = ws_domains(dc, is_media)
self._schedule_refill((dc, is_media), target_ip, domains)
log.info("WS pool warmup started for %d DC(s)", len(proxy_config.dc_redirects))
def reset(self):
self._idle.clear()
self._refilling.clear()
class _CfWorkerPool:
WS_POOL_MAX_AGE = 120.0
def __init__(self):
self._idle: Dict[Tuple[int, str], deque] = {}
self._refilling: Set[Tuple[int, str]] = set()
async def get(self, dc: int, worker_domain: str, fallback_dst: str) -> Optional[RawWebSocket]:
now = time.monotonic()
key = (dc, worker_domain)
bucket = self._idle.get(key)
if bucket is None:
bucket = deque()
self._idle[key] = bucket
while bucket:
ws, created = bucket.popleft()
age = now - created
if (age > self.WS_POOL_MAX_AGE or ws._closed
or ws.writer.transport.is_closing()):
asyncio.create_task(self._quiet_close(ws))
continue
stats.cf_pool_hits += 1
log.debug("CF worker pool hit DC%d (age=%.1fs, left=%d)",
dc, age, len(bucket))
self._schedule_refill(key, fallback_dst)
return ws
stats.cf_pool_misses += 1
self._schedule_refill(key, fallback_dst)
return None
def _schedule_refill(self, key, fallback_dst):
if key in self._refilling:
return
self._refilling.add(key)
asyncio.create_task(self._refill(key, fallback_dst))
async def _refill(self, key, fallback_dst):
dc, worker_domain = key
try:
bucket = self._idle.setdefault(key, deque())
needed = proxy_config.pool_size - len(bucket)
if needed <= 0:
return
tasks = [asyncio.create_task(
self._connect_one(worker_domain, fallback_dst, dc))
for _ in range(needed)]
for t in tasks:
try:
ws = await t
if ws:
bucket.append((ws, time.monotonic()))
except Exception:
pass
log.debug("CF worker pool refilled DC%d: %d ready",
dc, len(bucket))
finally:
self._refilling.discard(key)
@staticmethod
async def _connect_one(worker_domain, fallback_dst, dc) -> Optional[RawWebSocket]:
query = urlencode({
'dst': fallback_dst,
'dc': str(dc),
})
path = f'/apiws?{query}'
try:
return await RawWebSocket.connect(
worker_domain, worker_domain, timeout=8, path=path)
except Exception:
return None
@staticmethod
async def _quiet_close(ws):
try:
await ws.close()
except Exception:
pass
async def warmup(self):
cf_fallbacks = {
dc: ip for dc, ip in DC_DEFAULT_IPS.items()
if dc not in proxy_config.dc_redirects
}
if not cf_fallbacks or not proxy_config.cfproxy_worker_domains:
return
for worker_domain in proxy_config.cfproxy_worker_domains:
for dc, fallback_dst in cf_fallbacks.items():
self._schedule_refill((dc, worker_domain), fallback_dst)
log.info("CF worker pool warmup started for %d DC(s)", len(cf_fallbacks))
def reset(self):
self._idle.clear()
self._refilling.clear()
ws_pool = _WsPool()
cf_worker_pool = _CfWorkerPool()

View File

@@ -14,11 +14,16 @@ class _Stats:
self.bytes_down = 0
self.pool_hits = 0
self.pool_misses = 0
self.cf_pool_hits = 0
self.cf_pool_misses = 0
def summary(self) -> str:
pool_total = self.pool_hits + self.pool_misses
pool_s = (f"{self.pool_hits}/{pool_total}"
if pool_total else "n/a")
cf_pool_total = self.cf_pool_hits + self.cf_pool_misses
cf_pool_s = (f"{self.cf_pool_hits}/{cf_pool_total}"
if cf_pool_total else "n/a")
return (f"total={self.connections_total} "
f"active={self.connections_active} "
f"ws={self.connections_ws} "
@@ -28,6 +33,7 @@ class _Stats:
f"masked={self.connections_masked} "
f"err={self.ws_errors} "
f"pool={pool_s} "
f"cf_pool={cf_pool_s} "
f"up={human_bytes(self.bytes_up)} "
f"down={human_bytes(self.bytes_down)}")

View File

@@ -11,10 +11,8 @@ import logging
import logging.handlers
import socket as _socket
from collections import deque
from typing import Dict, List, Optional, Set, Tuple
from typing import Dict, Optional, Set, Tuple
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
if __name__ == '__main__' and (__package__ is None or __package__ == ''):
_repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@@ -24,11 +22,13 @@ if __name__ == '__main__' and (__package__ is None or __package__ == ''):
from .utils import *
from .stats import stats
from .config import proxy_config, parse_dc_ip_list, start_cfproxy_domain_refresh
from .config import proxy_config, parse_dc_ip_list, start_cfproxy_domain_refresh, coerce_domain_list
from .bridge import MsgSplitter, CryptoCtx, do_fallback, bridge_ws_reencrypt
from .raw_websocket import RawWebSocket, WsHandshakeError, set_sock_opts
from .fake_tls import proxy_to_masking_domain, verify_client_hello, build_server_hello, FakeTlsStream, TLS_RECORD_HANDSHAKE
from .balancer import balancer
from .pool import ws_pool, cf_worker_pool
from ._aes import Cipher, algorithms, modes
log = logging.getLogger('tg-mtproto-proxy')
@@ -100,112 +100,8 @@ def _generate_relay_init(proto_tag: bytes, dc_idx: int) -> bytes:
return bytes(result)
def _ws_domains(dc: int, is_media) -> List[str]:
if dc == 203:
dc = 2
if is_media is None or is_media:
return [f'kws{dc}-1.web.telegram.org', f'kws{dc}.web.telegram.org']
return [f'kws{dc}.web.telegram.org', f'kws{dc}-1.web.telegram.org']
class _WsPool:
WS_POOL_MAX_AGE = 120.0
def __init__(self):
self._idle: Dict[Tuple[int, bool], deque] = {}
self._refilling: Set[Tuple[int, bool]] = set()
async def get(self, dc: int, is_media: bool,
target_ip: str, domains: List[str]
) -> Optional[RawWebSocket]:
key = (dc, is_media)
now = time.monotonic()
bucket = self._idle.get(key)
if bucket is None:
bucket = deque()
self._idle[key] = bucket
while bucket:
ws, created = bucket.popleft()
age = now - created
if (age > self.WS_POOL_MAX_AGE or ws._closed
or ws.writer.transport.is_closing()):
asyncio.create_task(self._quiet_close(ws))
continue
stats.pool_hits += 1
log.debug("WS pool hit DC%d%s (age=%.1fs, left=%d)",
dc, 'm' if is_media else '', age, len(bucket))
self._schedule_refill(key, target_ip, domains)
return ws
stats.pool_misses += 1
self._schedule_refill(key, target_ip, domains)
return None
def _schedule_refill(self, key, target_ip, domains):
if key in self._refilling:
return
self._refilling.add(key)
asyncio.create_task(self._refill(key, target_ip, domains))
async def _refill(self, key, target_ip, domains):
dc, is_media = key
try:
bucket = self._idle.setdefault(key, deque())
needed = proxy_config.pool_size - len(bucket)
if needed <= 0:
return
tasks = [asyncio.create_task(
self._connect_one(target_ip, domains))
for _ in range(needed)]
for t in tasks:
try:
ws = await t
if ws:
bucket.append((ws, time.monotonic()))
except Exception:
pass
log.debug("WS pool refilled DC%d%s: %d ready",
dc, 'm' if is_media else '', len(bucket))
finally:
self._refilling.discard(key)
@staticmethod
async def _connect_one(target_ip, domains) -> Optional[RawWebSocket]:
for domain in domains:
try:
return await RawWebSocket.connect(
target_ip, domain, timeout=8)
except WsHandshakeError as exc:
if exc.is_redirect:
continue
return None
except Exception:
return None
return None
@staticmethod
async def _quiet_close(ws):
try:
await ws.close()
except Exception:
pass
async def warmup(self, dc_redirects: Dict[int, str]):
for dc, target_ip in dc_redirects.items():
if target_ip is None:
continue
for is_media in (False, True):
domains = _ws_domains(dc, is_media)
self._schedule_refill((dc, is_media), target_ip, domains)
log.info("WS pool warmup started for %d DC(s)", len(dc_redirects))
def reset(self):
self._idle.clear()
self._refilling.clear()
_ws_pool = _WsPool()
async def _read_client_init(reader, writer, secret, label, masking):
if proxy_config.proxy_protocol:
@@ -420,13 +316,13 @@ async def _handle_client(reader, writer, secret: bytes):
fail_until = dc_fail_until.get(dc_key, 0)
ws_timeout = WS_FAIL_TIMEOUT if now < fail_until else 10.0
domains = _ws_domains(dc, is_media)
domains = ws_domains(dc, is_media)
target = proxy_config.dc_redirects[dc]
ws = None
ws_failed_redirect = False
all_redirects = True
ws = await _ws_pool.get(dc, is_media, target, domains)
ws = await ws_pool.get(dc, is_media, target, domains)
if ws:
log.info("[%s] DC%d%s -> pool hit via %s",
label, dc, media_tag, target)
@@ -536,15 +432,16 @@ async def _run(stop_event: Optional[asyncio.Event] = None):
global _server_instance, _server_stop_event
_server_stop_event = stop_event
_ws_pool.reset()
ws_pool.reset()
cf_worker_pool.reset()
ws_blacklist.clear()
dc_fail_until.clear()
_client_tasks.clear()
if proxy_config.fallback_cfproxy:
user = proxy_config.cfproxy_user_domain
user = proxy_config.cfproxy_user_domains
if user:
balancer.update_domains_list([user])
balancer.update_domains_list(user)
else:
start_cfproxy_domain_refresh()
@@ -587,11 +484,11 @@ async def _run(stop_event: Optional[asyncio.Event] = None):
ip = proxy_config.dc_redirects.get(dc)
log.info(" DC%d: %s", dc, ip)
if proxy_config.fallback_cfproxy:
user_domain = "user" if proxy_config.cfproxy_user_domain else "auto"
user_domain = "user" if proxy_config.cfproxy_user_domains else "auto"
log.info(" CF proxy: enabled (%s)", user_domain)
if proxy_config.cfproxy_worker_domain:
if proxy_config.cfproxy_worker_domains:
log.info(" CF worker: enabled (%s)",
proxy_config.cfproxy_worker_domain)
", ".join(proxy_config.cfproxy_worker_domains))
log.info("=" * 60)
log.info(" Connect:")
if ftls:
@@ -611,7 +508,8 @@ async def _run(stop_event: Optional[asyncio.Event] = None):
log_stats_task = asyncio.create_task(log_stats())
await _ws_pool.warmup(proxy_config.dc_redirects)
await ws_pool.warmup()
await cf_worker_pool.warmup()
try:
async with server:
@@ -676,13 +574,15 @@ def main():
help='Socket send/recv buffer size in KB (default 256)')
ap.add_argument('--pool-size', type=int, default=4, metavar='N',
help='WS connection pool size per DC (default 4, min 0)')
ap.add_argument('--cfproxy-domain', type=str, default='',
ap.add_argument('--cfproxy-domain', action='append', default=None,
metavar='DOMAIN',
help='User defined Cloudflare-proxied domain for WS fallback')
ap.add_argument('--cfproxy-worker-domain', type=str, default='',
help='User defined Cloudflare-proxied domain for WS fallback '
'(repeatable for multiple domains)')
ap.add_argument('--cfproxy-worker-domain', action='append', default=None,
metavar='DOMAIN',
help='Cloudflare Worker domain for WS fallback '
'(tried before other fallback methods)')
'(tried before other fallback methods, '
'repeatable for multiple domains)')
ap.add_argument('--no-cfproxy', action='store_true',
help='Disable Cloudflare proxy fallback')
ap.add_argument('--fake-tls-domain', type=str, default='',
@@ -724,8 +624,8 @@ def main():
proxy_config.buffer_size = max(4, args.buf_kb) * 1024
proxy_config.pool_size = max(0, args.pool_size)
proxy_config.fallback_cfproxy = not args.no_cfproxy
proxy_config.cfproxy_user_domain = args.cfproxy_domain.strip()
proxy_config.cfproxy_worker_domain = args.cfproxy_worker_domain.strip()
proxy_config.cfproxy_user_domains = coerce_domain_list(args.cfproxy_domain)
proxy_config.cfproxy_worker_domains = coerce_domain_list(args.cfproxy_worker_domain)
proxy_config.fake_tls_domain = args.fake_tls_domain.strip()
proxy_config.proxy_protocol = args.proxy_protocol

View File

@@ -2,7 +2,7 @@ import socket as _socket
import urllib.request
import http.client
from typing import Optional, Dict
from typing import Optional, Dict, List
from urllib.request import Request
@@ -34,6 +34,23 @@ _GITHUB_IPS: Dict[str, str] = {
"raw.githubusercontent.com": "185.199.109.133",
}
DC_DEFAULT_IPS: Dict[int, str] = {
1: '149.154.175.50',
2: '149.154.167.51',
3: '149.154.175.100',
4: '149.154.167.91',
5: '149.154.171.5',
203: '91.105.192.100'
}
def ws_domains(dc: int, is_media) -> List[str]:
if dc == 203:
dc = 2
if is_media is None or is_media:
return [f'kws{dc}-1.web.telegram.org', f'kws{dc}.web.telegram.org']
return [f'kws{dc}.web.telegram.org', f'kws{dc}-1.web.telegram.org']
def human_bytes(n: int) -> str:
for unit in ('B', 'KB', 'MB', 'GB'):

View File

@@ -6,7 +6,7 @@ import webbrowser
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from proxy import __version__, get_link_host, parse_dc_ip_list
from proxy import __version__, get_link_host, parse_dc_ip_list, coerce_domain_list
from proxy.balancer import balancer
from utils.update_check import RELEASES_PAGE_URL, get_status
@@ -59,15 +59,17 @@ _TIP_CFPROXY = (
"Использовать Cloudflare прокси для недоступных датацентров"
)
_TIP_CFPROXY_DOMAIN = (
"Ваш собственный домен, проксируемый через Cloudflare, для WS-подключения.\n"
"Если не указан — выбирается автоматически из поддерживаемых доменов"
"Ваши собственные домены, проксируемые через Cloudflare, для WS-подключения.\n"
"Несколько доменов указывайте через запятую.\n"
"Если не указаны — выбираются автоматически из поддерживаемых доменов"
)
_TIP_CFPROXY_USER_DOMAIN_CB = (
"Указать свой домен вместо автоматического выбора"
"Указать свои домены вместо автоматического выбора"
)
_TIP_CFWORKER_DOMAIN = (
"Домен Cloudflare Worker (например, name.account.workers.dev).\n"
"Прокси передает через него подключение к Telegram DC по IP"
"Домены Cloudflare Worker (например, name.account.workers.dev).\n"
"Несколько доменов указывайте через запятую.\n"
"Прокси передает через них подключение к Telegram DC по IP"
)
_TIP_SAVE = "Сохранить настройки"
_TIP_CANCEL = "Закрыть окно без сохранения изменений"
@@ -149,6 +151,14 @@ def _run_cfworker_connectivity_test(domain: str) -> dict:
return _run_connectivity_test(cases)
def _run_cfproxy_multi_test(domains: list) -> dict:
return {domain: _run_cfproxy_connectivity_test(domain) for domain in domains}
def _run_cfworker_multi_test(domains: list) -> dict:
return {domain: _run_cfworker_connectivity_test(domain) for domain in domains}
def _run_cfproxy_auto_test(domains: list) -> tuple:
merged: dict = {}
best_domain = None
@@ -207,6 +217,52 @@ def _show_connectivity_results(title_base: str, results: dict,
_mb.showinfo(title, msg, parent=root)
root.destroy()
def _show_multi_connectivity_results(title_base: str, per_domain: dict,
label_prefix: str = 'DC') -> None:
import tkinter as _tk
from tkinter import messagebox as _mb
total = len(_CFPROXY_TEST_DCS)
all_ok = True
any_ok = False
blocks = []
for domain, results in per_domain.items():
ok = [dc for dc, v in results.items() if v is True]
fail = [(dc, v) for dc, v in results.items() if v is not True]
if len(ok) == total:
any_ok = True
blocks.append(f"\u2713 {domain}: все {total} серверов доступны")
elif not ok:
all_ok = False
blocks.append(f"\u2717 {domain}: недоступен")
else:
all_ok = False
any_ok = True
blocks.append(
f"~ {domain}: работают "
f"{', '.join(f'{label_prefix}{dc}' for dc in ok)}; "
f"недоступны "
f"{', '.join(f'{label_prefix}{dc}' for dc, _ in fail)}"
)
if all_ok:
title = f"{title_base}: всё работает"
elif any_ok:
title = f"{title_base}: частично работает"
else:
title = f"{title_base}: недоступен"
msg = "\n\n".join(blocks)
root = _tk.Tk()
root.withdraw()
try:
root.attributes("-topmost", True)
except Exception:
pass
_mb.showinfo(title, msg, parent=root)
root.destroy()
_INNER_W = 396
_APPEARANCE_OPTIONS = ["Авто", "Светлая", "Тёмная"]
@@ -450,20 +506,23 @@ def install_tray_config_form(
_cf_test_btn = [None]
def _on_cf_test():
user_domain = cfproxy_user_domain_var.get().strip() if cf_custom_cb_var.get() else ""
user_domains = (
coerce_domain_list(cfproxy_user_domain_var.get())
if cf_custom_cb_var.get() else []
)
btn = _cf_test_btn[0]
if btn:
btn.configure(text="...", state="disabled")
import threading as _threading
if user_domain:
if user_domains:
def _worker():
try:
res = _run_cfproxy_connectivity_test(user_domain)
per = _run_cfproxy_multi_test(user_domains)
if btn:
btn.after(
0,
lambda: _show_connectivity_results(
"CF-прокси", res, domain=user_domain, label_prefix='kws',
lambda: _show_multi_connectivity_results(
"CF-прокси", per, label_prefix='kws',
),
)
except Exception as exc:
@@ -508,8 +567,10 @@ def install_tray_config_form(
cf_custom_row = ctk.CTkFrame(cf_inner, fg_color="transparent")
cf_custom_row.pack(fill="x")
saved_user_domain = cfg.get("cfproxy_user_domain", default_config.get("cfproxy_user_domain", ""))
cf_custom_cb_var = ctk.BooleanVar(value=bool(saved_user_domain))
saved_user_domains = coerce_domain_list(
cfg.get("cfproxy_user_domain", default_config.get("cfproxy_user_domain", ""))
)
cf_custom_cb_var = ctk.BooleanVar(value=bool(saved_user_domains))
cf_custom_cb = _checkbox(ctk, cf_custom_row, theme, "Свой домен", cf_custom_cb_var)
cf_custom_cb.pack(side="left", padx=(0, 10))
attach_ctk_tooltip(cf_custom_cb, _TIP_CFPROXY_USER_DOMAIN_CB)
@@ -522,7 +583,7 @@ def install_tray_config_form(
command=lambda: webbrowser.open(_CFPROXY_HELP_URL),
).pack(side="right")
cfproxy_user_domain_var = ctk.StringVar(value=saved_user_domain)
cfproxy_user_domain_var = ctk.StringVar(value=", ".join(saved_user_domains))
cf_domain_entry = _entry(
ctk, cf_custom_row, theme, var=cfproxy_user_domain_var,
height=32, radius=8,
@@ -543,14 +604,16 @@ def install_tray_config_form(
cf_worker_row = ctk.CTkFrame(cf_worker_inner, fg_color="transparent")
cf_worker_row.pack(fill="x", pady=(0, 4))
cf_worker_lbl = _label(ctk, cf_worker_row, theme, "Cloudflare Worker домен", size=11)
cf_worker_lbl = _label(ctk, cf_worker_row, theme, "Cloudflare Worker домены (через запятую)", size=11)
cf_worker_lbl.pack(anchor="w", pady=(0, 2))
cf_worker_input = ctk.CTkFrame(cf_worker_inner, fg_color="transparent")
cf_worker_input.pack(fill="x")
cfproxy_worker_domain_var = ctk.StringVar(
value=cfg.get("cfproxy_worker_domain", default_config.get("cfproxy_worker_domain", ""))
value=", ".join(coerce_domain_list(
cfg.get("cfproxy_worker_domain", default_config.get("cfproxy_worker_domain", ""))
))
)
cf_worker_entry = _entry(
ctk, cf_worker_input, theme, var=cfproxy_worker_domain_var,
@@ -565,24 +628,24 @@ def install_tray_config_form(
btn = _cfworker_test_btn[0]
if btn is None:
return
enabled = bool(cfproxy_worker_domain_var.get().strip())
enabled = bool(coerce_domain_list(cfproxy_worker_domain_var.get()))
btn.configure(state="normal" if enabled else "disabled")
def _on_cfworker_test():
domain = cfproxy_worker_domain_var.get().strip()
domains = coerce_domain_list(cfproxy_worker_domain_var.get())
btn = _cfworker_test_btn[0]
if not domain or btn is None:
if not domains or btn is None:
return
btn.configure(text="...", state="disabled")
import threading as _threading
def _worker():
try:
res = _run_cfworker_connectivity_test(domain)
per = _run_cfworker_multi_test(domains)
btn.after(
0,
lambda: _show_connectivity_results(
"CF Worker", res, domain=domain, label_prefix='DC',
lambda: _show_multi_connectivity_results(
"CF Worker", per, label_prefix='DC',
),
)
except Exception as exc:
@@ -784,9 +847,9 @@ def validate_config_form(
if widgets.cfproxy_var is not None:
new_cfg["cfproxy"] = bool(widgets.cfproxy_var.get())
if widgets.cfproxy_user_domain_var is not None:
new_cfg["cfproxy_user_domain"] = widgets.cfproxy_user_domain_var.get().strip()
new_cfg["cfproxy_user_domain"] = coerce_domain_list(widgets.cfproxy_user_domain_var.get())
if widgets.cfproxy_worker_domain_var is not None:
new_cfg["cfproxy_worker_domain"] = widgets.cfproxy_worker_domain_var.get().strip()
new_cfg["cfproxy_worker_domain"] = coerce_domain_list(widgets.cfproxy_worker_domain_var.get())
if widgets.appearance_var is not None:
new_cfg["appearance"] = _APPEARANCE_TO_CFG.get(widgets.appearance_var.get(), "auto")
return new_cfg

View File

@@ -18,8 +18,8 @@ _TRAY_DEFAULTS_COMMON: Dict[str, Any] = {
"buf_kb": 256,
"pool_size": 4,
"cfproxy": True,
"cfproxy_user_domain": "",
"cfproxy_worker_domain": "",
"cfproxy_user_domain": [],
"cfproxy_worker_domain": [],
}

View File

@@ -14,7 +14,7 @@ from typing import Any, Callable, Dict, Optional, Tuple
import psutil
from proxy import __version__, get_link_host, parse_dc_ip_list, proxy_config
from proxy import __version__, get_link_host, parse_dc_ip_list, proxy_config, coerce_domain_list
from proxy.tg_ws_proxy import _run
from utils.default_config import default_tray_config
@@ -271,8 +271,8 @@ def apply_proxy_config(cfg: dict) -> bool:
pc.buffer_size = max(4, cfg.get("buf_kb", DEFAULT_CONFIG["buf_kb"])) * 1024
pc.pool_size = max(0, cfg.get("pool_size", DEFAULT_CONFIG["pool_size"]))
pc.fallback_cfproxy = cfg.get("cfproxy", DEFAULT_CONFIG["cfproxy"])
pc.cfproxy_user_domain = cfg.get("cfproxy_user_domain", DEFAULT_CONFIG["cfproxy_user_domain"])
pc.cfproxy_worker_domain = cfg.get("cfproxy_worker_domain", DEFAULT_CONFIG["cfproxy_worker_domain"])
pc.cfproxy_user_domains = coerce_domain_list(cfg.get("cfproxy_user_domain", DEFAULT_CONFIG["cfproxy_user_domain"]))
pc.cfproxy_worker_domains = coerce_domain_list(cfg.get("cfproxy_worker_domain", DEFAULT_CONFIG["cfproxy_worker_domain"]))
return True