Fronting fallback

This commit is contained in:
Flowseal
2026-06-26 18:37:30 +03:00
parent d3d30799a1
commit c8f6f8caf4
4 changed files with 64 additions and 6 deletions
+5 -3
View File
@@ -19,6 +19,7 @@ class _WsPool:
def __init__(self):
self._idle: Dict[Tuple[int, bool], deque] = {}
self._refilling: Set[Tuple[int, bool]] = set()
self.fronting_until: float = 0.0
async def get(self, dc: int, is_media: bool,
target_ip: str, domains: List[str]
@@ -61,7 +62,7 @@ class _WsPool:
if needed <= 0:
return
tasks = [asyncio.create_task(
self._connect_one(target_ip, domains))
self._connect_one(target_ip, domains, time.monotonic() < self.fronting_until))
for _ in range(needed)]
for t in tasks:
try:
@@ -76,11 +77,11 @@ class _WsPool:
self._refilling.discard(key)
@staticmethod
async def _connect_one(target_ip, domains) -> Optional[RawWebSocket]:
async def _connect_one(target_ip, domains, fronting_active) -> Optional[RawWebSocket]:
for domain in domains:
try:
return await RawWebSocket.connect(
target_ip, domain, timeout=8)
target_ip, domain, timeout=8, sni="sprinthost.ru" if fronting_active else None)
except WsHandshakeError as exc:
if exc.is_redirect:
continue
@@ -108,6 +109,7 @@ class _WsPool:
def reset(self):
self._idle.clear()
self._refilling.clear()
self.fronting_until = 0.0
class _CfWorkerPool:
+7 -2
View File
@@ -82,10 +82,14 @@ class RawWebSocket:
@staticmethod
async def connect(host: str, domain: str, timeout: float = 10.0,
path: str = '/apiws') -> 'RawWebSocket':
path: str = '/apiws', *,
sni: Optional[str] = None) -> 'RawWebSocket':
if sni is None:
sni = domain
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, 443, ssl=_ssl_ctx,
server_hostname=domain),
server_hostname=sni),
timeout=min(timeout, 10))
set_sock_opts(writer.transport, proxy_config.buffer_size)
@@ -102,6 +106,7 @@ class RawWebSocket:
f'Sec-WebSocket-Protocol: binary\r\n'
f'\r\n'
)
writer.write(req.encode())
await writer.drain()
+2
View File
@@ -7,6 +7,7 @@ class _Stats:
self.connections_ws = 0
self.connections_tcp_fallback = 0
self.connections_cfproxy = 0
self.connections_fronting = 0
self.connections_bad = 0
self.connections_masked = 0
self.ws_errors = 0
@@ -29,6 +30,7 @@ class _Stats:
f"ws={self.connections_ws} "
f"tcp_fb={self.connections_tcp_fallback} "
f"cf={self.connections_cfproxy} "
f"front={self.connections_fronting} "
f"bad={self.connections_bad} "
f"masked={self.connections_masked} "
f"err={self.ws_errors} "
+50 -1
View File
@@ -35,10 +35,12 @@ log = logging.getLogger('tg-mtproto-proxy')
DC_FAIL_COOLDOWN = 30.0
WS_FAIL_TIMEOUT = 2.0
FRONTING_COOLDOWN = 1800.0
LISTENER_CHECK_INTERVAL = 5.0
LISTENER_RESTART_DELAY = 1.0
ws_blacklist: Set[str] = set()
dc_fail_until: Dict[str, float] = {}
fronting_until: float = 0.0
def _try_handshake(handshake: bytes, secret: bytes) -> Optional[Tuple[int, bool, bytes, bytes]]:
@@ -246,6 +248,8 @@ def _build_crypto_ctx(client_dec_prekey_iv, secret, relay_init):
async def _handle_client(reader, writer, secret: bytes):
global fronting_until
stats.connections_total += 1
stats.connections_active += 1
peer = writer.get_extra_info('peername')
@@ -317,17 +321,37 @@ async def _handle_client(reader, writer, secret: bytes):
now = time.monotonic()
fail_until = dc_fail_until.get(dc_key, 0)
ws_timeout = WS_FAIL_TIMEOUT if now < fail_until else 10.0
fronting_active = now < fronting_until
domains = ws_domains(dc, is_media)
target = proxy_config.dc_redirects[dc]
ws = None
ws_failed_redirect = False
ws_timed_out = False
all_redirects = True
ws = await ws_pool.get(dc, is_media, target, domains)
if ws:
log.info("[%s] DC%d%s -> pool hit via %s",
label, dc, media_tag, target)
elif fronting_active:
log.info("[%s] DC%d%s -> fronting / Host %s",
label, dc, media_tag, domains[0])
try:
ws = await RawWebSocket.connect(target, domains[0],
timeout=10.0,
sni="sprinthost.ru")
except Exception as exc:
stats.ws_errors += 1
log.warning("[%s] DC%d%s fronting failed: %s",
label, dc, media_tag, repr(exc))
if ws:
stats.connections_fronting += 1
fronting_until = now + FRONTING_COOLDOWN
ws_pool.fronting_until = fronting_until
else:
fronting_until = 0.0
ws_pool.fronting_until = 0.0
else:
for domain in domains:
url = f'wss://{domain}/apiws'
@@ -351,12 +375,36 @@ async def _handle_client(reader, writer, secret: bytes):
all_redirects = False
log.warning("[%s] DC%d%s WS handshake: %s",
label, dc, media_tag, exc.status_line)
except asyncio.TimeoutError:
stats.ws_errors += 1
ws_timed_out = True
log.warning("[%s] DC%d%s WS connect timed out via %s",
label, dc, media_tag, domain)
except Exception as exc:
stats.ws_errors += 1
all_redirects = False
log.warning("[%s] DC%d%s WS connect failed: %s",
label, dc, media_tag, repr(exc))
# Fronting fallback if WS timed out
if ws is None and ws_timed_out and not fronting_active:
log.info("[%s] DC%d%s -> fronting fallback (Host %s)",
label, dc, media_tag, domains[0])
try:
ws = await RawWebSocket.connect(target, domains[0],
timeout=10.0,
sni="sprinthost.ru")
except Exception as exc:
stats.ws_errors += 1
log.warning("[%s] DC%d%s fronting failed: %s",
label, dc, media_tag, repr(exc))
if ws:
fronting_until = now + FRONTING_COOLDOWN
ws_pool.fronting_until = now + FRONTING_COOLDOWN
stats.connections_fronting += 1
log.info("[%s] DC%d%s fronting OK for %ds",
label, dc, media_tag, int(FRONTING_COOLDOWN))
# WS failed -> fallback
if ws is None:
if ws_failed_redirect and all_redirects:
@@ -431,7 +479,7 @@ _client_tasks: Set[asyncio.Task] = set()
async def _run(stop_event: Optional[asyncio.Event] = None):
global _server_instance, _server_stop_event
global _server_instance, _server_stop_event, fronting_until
_server_stop_event = stop_event
ws_pool.reset()
@@ -439,6 +487,7 @@ async def _run(stop_event: Optional[asyncio.Event] = None):
ws_blacklist.clear()
dc_fail_until.clear()
_client_tasks.clear()
fronting_until = 0.0
if proxy_config.fallback_cfproxy:
user = proxy_config.cfproxy_user_domains