diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index 21ba025..1adb747 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -29,6 +29,9 @@ class ProxyConfig: dc_overrides: Dict[int, int] = field(default_factory=lambda: {203: 2}) buffer_size: int = 256 * 1024 pool_size: int = 4 + fallback_cfproxy: bool = True + fallback_cfproxy_priority: bool = True + fallback_cfproxy_domain: str = 'pclead.co.uk' proxy_config = ProxyConfig() @@ -40,7 +43,7 @@ DC_DEFAULT_IPS: Dict[int, str] = { 3: '149.154.175.100', 4: '149.154.167.91', 5: '149.154.171.5', - 203: '91.105.192.100' + 203: '149.154.175.50' } HANDSHAKE_LEN = 64 @@ -487,6 +490,7 @@ class Stats: self.connections_active = 0 self.connections_ws = 0 self.connections_tcp_fallback = 0 + self.connections_cfproxy = 0 self.connections_bad = 0 self.ws_errors = 0 self.bytes_up = 0 @@ -502,6 +506,7 @@ class Stats: f"active={self.connections_active} " f"ws={self.connections_ws} " f"tcp_fb={self.connections_tcp_fallback} " + f"cf={self.connections_cfproxy} " f"bad={self.connections_bad} " f"err={self.ws_errors} " f"pool={pool_s} " @@ -784,6 +789,88 @@ def _fallback_ip(dc: int) -> Optional[str]: return DC_DEFAULT_IPS.get(dc) +def _cfproxy_domains(dc: int) -> List[str]: + base = proxy_config.fallback_cfproxy_domain + return [f'kws{dc}.{base}'] + + +async def _cfproxy_fallback(reader, writer, relay_init, label, + dc=None, is_media=False, + clt_decryptor=None, clt_encryptor=None, + tg_encryptor=None, tg_decryptor=None, + splitter=None): + domains = _cfproxy_domains(dc) + media_tag = ' media' if is_media else '' + ws = None + for domain in domains: + log.info("[%s] DC%d%s -> CF proxy wss://%s/apiws", + label, dc, media_tag, domain) + try: + ws = await RawWebSocket.connect(domain, domain, + timeout=10.0) + break + except Exception as exc: + log.warning("[%s] DC%d%s CF proxy %s failed: %s", + label, dc, media_tag, domain, exc) + if ws is None: + return False + _stats.connections_cfproxy += 1 + await ws.send(relay_init) + await _bridge_ws_reencrypt(reader, writer, ws, label, + dc=dc, is_media=is_media, + clt_decryptor=clt_decryptor, + clt_encryptor=clt_encryptor, + tg_encryptor=tg_encryptor, + tg_decryptor=tg_decryptor, + splitter=splitter) + return True + + +async def _do_fallback(reader, writer, relay_init, label, + dc, is_media, media_tag, + clt_decryptor, clt_encryptor, + tg_encryptor, tg_decryptor, + splitter=None): + """Try CF proxy and/or TCP fallback based on config priority.""" + fallback_dst = _fallback_ip(dc) + use_cf = proxy_config.fallback_cfproxy + cf_first = proxy_config.fallback_cfproxy_priority + + methods: List[str] = [] + if use_cf and cf_first: + methods = ['cf', 'tcp'] + elif use_cf: + methods = ['tcp', 'cf'] + else: + methods = ['tcp'] + + for method in methods: + if method == 'cf': + ok = await _cfproxy_fallback( + reader, writer, relay_init, label, + dc=dc, is_media=is_media, + clt_decryptor=clt_decryptor, + clt_encryptor=clt_encryptor, + tg_encryptor=tg_encryptor, + tg_decryptor=tg_decryptor, + splitter=splitter) + if ok: + return True + elif method == 'tcp' and fallback_dst: + log.info("[%s] DC%d%s -> TCP fallback to %s:443", + label, dc, media_tag, fallback_dst) + ok = await _tcp_fallback( + reader, writer, fallback_dst, 443, + relay_init, label, dc=dc, is_media=is_media, + clt_decryptor=clt_decryptor, + clt_encryptor=clt_encryptor, + tg_encryptor=tg_encryptor, + tg_decryptor=tg_decryptor) + if ok: + return True + return False + + async def _handle_client(reader, writer, secret: bytes): _stats.connections_total += 1 _stats.connections_active += 1 @@ -872,22 +959,24 @@ async def _handle_client(reader, writer, secret: bytes): # Fallback if DC not in config or WS blacklisted for this DC/is_media if dc not in proxy_config.dc_redirects or dc_key in ws_blacklist: - fallback_dst = _fallback_ip(dc) - if fallback_dst: - if dc not in proxy_config.dc_redirects: - log.info("[%s] DC%d not in config -> TCP fallback %s:443", - label, dc, fallback_dst) - else: - log.info("[%s] DC%d%s WS blacklisted -> TCP fallback %s:443", - label, dc, media_tag, fallback_dst) - await _tcp_fallback(reader, writer, fallback_dst, 443, - relay_init, label, dc=dc, - is_media=is_media, - clt_decryptor=clt_decryptor, - clt_encryptor=clt_encryptor, - tg_encryptor=tg_encryptor, - tg_decryptor=tg_decryptor) + if dc not in proxy_config.dc_redirects: + log.info("[%s] DC%d not in config -> fallback", + label, dc) else: + log.info("[%s] DC%d%s WS blacklisted -> fallback", + label, dc, media_tag) + splitter = None + try: + splitter = _MsgSplitter(relay_init, proto_int) + except Exception: + pass + ok = await _do_fallback( + reader, writer, relay_init, label, + dc, is_media, media_tag, + clt_decryptor, clt_encryptor, + tg_encryptor, tg_decryptor, + splitter=splitter) + if not ok: log.warning("[%s] DC%d%s no fallback available", label, dc, media_tag) return @@ -948,18 +1037,19 @@ async def _handle_client(reader, writer, secret: bytes): log.info("[%s] DC%d%s WS cooldown for %ds", label, dc, media_tag, int(DC_FAIL_COOLDOWN)) - fallback_dst = _fallback_ip(dc) or target - log.info("[%s] DC%d%s -> TCP fallback to %s:443", - label, dc, media_tag, fallback_dst) - ok = await _tcp_fallback(reader, writer, fallback_dst, 443, - relay_init, label, dc=dc, - is_media=is_media, - clt_decryptor=clt_decryptor, - clt_encryptor=clt_encryptor, - tg_encryptor=tg_encryptor, - tg_decryptor=tg_decryptor) + splitter_fb = None + try: + splitter_fb = _MsgSplitter(relay_init, proto_int) + except Exception: + pass + ok = await _do_fallback( + reader, writer, relay_init, label, + dc, is_media, media_tag, + clt_decryptor, clt_encryptor, + tg_encryptor, tg_decryptor, + splitter=splitter_fb) if ok: - log.info("[%s] DC%d%s TCP fallback closed", + log.info("[%s] DC%d%s fallback closed", label, dc, media_tag) return @@ -1040,6 +1130,10 @@ async def _run(stop_event: Optional[asyncio.Event] = None): for dc in sorted(proxy_config.dc_redirects.keys()): ip = proxy_config.dc_redirects.get(dc) log.info(" DC%d: %s", dc, ip) + if proxy_config.fallback_cfproxy: + prio = 'CF first' if proxy_config.fallback_cfproxy_priority else 'TCP first' + log.info(" CF proxy: %s (%s)", + proxy_config.fallback_cfproxy_domain, prio) log.info("=" * 60) log.info(" Connect link:") log.info(" %s", tg_link) @@ -1119,7 +1213,7 @@ def main(): ap = argparse.ArgumentParser( description='Telegram MTProto WebSocket Bridge Proxy') ap.add_argument('--port', type=int, default=1443, - help=f'Listen port (default 1443)') + help='Listen port (default 1443)') ap.add_argument('--host', type=str, default='127.0.0.1', help='Listen host (default 127.0.0.1)') ap.add_argument('--secret', type=str, default=None, @@ -1139,6 +1233,14 @@ def main(): help='Socket send/recv buffer size in KB (default 256)') ap.add_argument('--pool-size', type=int, default=4, metavar='N', help='WS connection pool size per DC (default 4, min 0)') + ap.add_argument('--cfproxy-domain', type=str, default='pclead.co.uk', + metavar='DOMAIN', + help='Cloudflare-proxied domain for WS fallback ' + '(default: pclead.co.uk)') + ap.add_argument('--no-cfproxy', action='store_true', + help='Disable Cloudflare proxy fallback') + ap.add_argument('--cfproxy-priority', type=bool, default=True, + help='Try cfproxy before tcp fallback (default: true)') args = ap.parse_args() if not args.dc_ip: @@ -1171,7 +1273,10 @@ def main(): secret=secret_hex, dc_redirects=dc_redirects, buffer_size=max(4, args.buf_kb) * 1024, - pool_size=max(0, args.pool_size) + pool_size=max(0, args.pool_size), + fallback_cfproxy=not args.no_cfproxy, + fallback_cfproxy_priority=args.cfproxy_priority, + fallback_cfproxy_domain=args.cfproxy_domain, ) log_level = logging.DEBUG if args.verbose else logging.INFO