diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index 83cc689..e0c5a49 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -33,13 +33,15 @@ from ._aes import Cipher, algorithms, modes log = logging.getLogger('tg-mtproto-proxy') -DC_FAIL_COOLDOWN = 30.0 +IP_FAIL_COOLDOWN = 3600.0 +DC_FAIL_COOLDOWN = 60.0 WS_FAIL_TIMEOUT = 2.0 FRONTING_COOLDOWN = 1800.0 LISTENER_CHECK_INTERVAL = 5.0 LISTENER_RESTART_DELAY = 1.0 ws_blacklist: Set[str] = set() dc_fail_until: Dict[str, float] = {} +ip_fail_until: Dict[str, float] = {} fronting_until: float = 0.0 @@ -295,15 +297,24 @@ async def _handle_client(reader, writer, secret: bytes): dc_key = f'{dc}{"m" if is_media else ""}' media_tag = " media" if is_media else "" + now = time.monotonic() + target = proxy_config.dc_redirects.get(dc) + is_any_cf_fallback = proxy_config.fallback_cfproxy or proxy_config.cfproxy_worker_domains + + # Fallback if DC not in config, if WS blacklisted for this DC/is_media or if connect to ip is timed out + if (dc not in proxy_config.dc_redirects + or dc_key in ws_blacklist + or now < ip_fail_until.get(target, 0) and is_any_cf_fallback): - # Fallback if DC not in config or WS blacklisted for this DC/is_media - if dc not in proxy_config.dc_redirects or dc_key in ws_blacklist: if dc not in proxy_config.dc_redirects: log.info("[%s] DC%d not in config -> fallback", label, dc) - else: + elif dc_key in ws_blacklist: log.info("[%s] DC%d%s WS blacklisted -> fallback", label, dc, media_tag) + else: + log.info("[%s] DC%d%s WS connect to %s was timed out -> fallback", + label, dc, media_tag, target) splitter = None try: splitter = MsgSplitter(relay_init, proto_int) @@ -318,13 +329,10 @@ async def _handle_client(reader, writer, secret: bytes): label, dc, media_tag) return - now = time.monotonic() - fail_until = dc_fail_until.get(dc_key, 0) - ws_timeout = WS_FAIL_TIMEOUT if now < fail_until else 10.0 + ws_timeout = WS_FAIL_TIMEOUT if now < dc_fail_until.get(dc_key, 0) else 5.0 fronting_active = now < fronting_until domains = ws_domains(dc, is_media) - target = proxy_config.dc_redirects[dc] ws = None ws_failed_redirect = False ws_timed_out = False @@ -336,12 +344,11 @@ async def _handle_client(reader, writer, secret: bytes): label, dc, media_tag, target) elif fronting_active: # TODO: Move fronting logic into bridge.py where other fallbacks are handled - # Make less timeout, and don't try it next time for X seconds if it has failed as well log.info("[%s] DC%d%s -> fronting / Host %s", label, dc, media_tag, domains[0]) try: ws = await RawWebSocket.connect(target, domains[0], - timeout=10.0, + timeout=5.0, sni="sprinthost.ru") except Exception as exc: stats.ws_errors += 1 @@ -382,6 +389,7 @@ async def _handle_client(reader, writer, secret: bytes): ws_timed_out = True log.warning("[%s] DC%d%s WS connect timed out via %s", label, dc, media_tag, domain) + break except Exception as exc: stats.ws_errors += 1 all_redirects = False @@ -396,7 +404,7 @@ async def _handle_client(reader, writer, secret: bytes): label, dc, media_tag, domains[0]) try: ws = await RawWebSocket.connect(target, domains[0], - timeout=10.0, + timeout=5.0, sni="sprinthost.ru") except Exception as exc: stats.ws_errors += 1 @@ -411,6 +419,9 @@ async def _handle_client(reader, writer, secret: bytes): # WS failed -> fallback if ws is None: + if ws_timed_out: + ip_fail_until[target] = now + IP_FAIL_COOLDOWN + if ws_failed_redirect and all_redirects: ws_blacklist.add(dc_key) log.warning("[%s] DC%d%s blacklisted for WS (all 302)", @@ -419,8 +430,6 @@ async def _handle_client(reader, writer, secret: bytes): dc_fail_until[dc_key] = now + DC_FAIL_COOLDOWN else: dc_fail_until[dc_key] = now + DC_FAIL_COOLDOWN - # TODO: may be don't try regular WS connection and do fallback instanstly - # instead of waiting for WS_FAIL_TIMEOUT and then fallback log.info("[%s] DC%d%s WS cooldown for %ds", label, dc, media_tag, int(DC_FAIL_COOLDOWN)) @@ -439,6 +448,7 @@ async def _handle_client(reader, writer, secret: bytes): return dc_fail_until.pop(dc_key, None) + ip_fail_until.pop(target, None) stats.connections_ws += 1 splitter = None @@ -492,6 +502,7 @@ async def _run(stop_event: Optional[asyncio.Event] = None): cf_worker_pool.reset() ws_blacklist.clear() dc_fail_until.clear() + ip_fail_until.clear() _client_tasks.clear() fronting_until = 0.0