Added optional authentication args for SOCKS5-proxy (in 1 commit)

This commit is contained in:
edapin 2026-03-25 18:56:18 +03:00
parent 80e8997721
commit 5bc00d158c
2 changed files with 82 additions and 6 deletions

View File

@ -124,6 +124,7 @@ tg-ws-proxy [--port PORT] [--host HOST] [--dc-ip DC:IP ...] [-v]
|---|---|---| |---|---|---|
| `--port` | `1080` | Порт SOCKS5-прокси | | `--port` | `1080` | Порт SOCKS5-прокси |
| `--host` | `127.0.0.1` | Хост SOCKS5-прокси | | `--host` | `127.0.0.1` | Хост SOCKS5-прокси |
| `-u`, `-user`<br/>`-P`,`--password` | выкл. | Логин и Пароль для авторизации в SOCKS5-прокси |
| `--dc-ip` | `2:149.154.167.220`, `4:149.154.167.220` | Целевой IP для DC (можно указать несколько раз) | | `--dc-ip` | `2:149.154.167.220`, `4:149.154.167.220` | Целевой IP для DC (можно указать несколько раз) |
| `-v`, `--verbose` | выкл. | Подробное логирование (DEBUG) | | `-v`, `--verbose` | выкл. | Подробное логирование (DEBUG) |
| `--host` | `127.0.0.1`| IP-адрес прокси | | `--host` | `127.0.0.1`| IP-адрес прокси |

View File

@ -13,6 +13,7 @@ import sys
import time import time
from typing import Dict, List, Optional, Set, Tuple from typing import Dict, List, Optional, Set, Tuple
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from urllib.parse import quote
DEFAULT_PORT = 1080 DEFAULT_PORT = 1080
@ -76,6 +77,9 @@ _DC_OVERRIDES: Dict[int, int] = {
_dc_opt: Dict[int, Optional[str]] = {} _dc_opt: Dict[int, Optional[str]] = {}
_auth_user = ''
_auth_password = ''
# DCs where WS is known to fail (302 redirect) # DCs where WS is known to fail (302 redirect)
# Raw TCP fallback will be used instead # Raw TCP fallback will be used instead
# Keyed by (dc, is_media) # Keyed by (dc, is_media)
@ -802,10 +806,57 @@ async def _handle_client(reader, writer):
log.debug("[%s] not SOCKS5 (ver=%d)", label, hdr[0]) log.debug("[%s] not SOCKS5 (ver=%d)", label, hdr[0])
writer.close() writer.close()
return return
# Auth check
nmethods = hdr[1] nmethods = hdr[1]
await reader.readexactly(nmethods) auth_methods = await reader.readexactly(nmethods)
writer.write(b'\x05\x00') # no-auth
await writer.drain() if not _auth_user and not _auth_password:
writer.write(b'\x05\x00') # no-auth
await writer.drain()
elif _auth_user and _auth_password:
if b'\x02' not in auth_methods:
log.debug("[%s] auth methods %s", label, auth_methods)
writer.write(b'\x05\xff')
await writer.drain()
writer.close()
return
log.debug("[%s] auth requested", label)
writer.write(b'\x05\x02') # username/password requested
await writer.drain()
# Username/password subnegotiation
auth_hdr = await asyncio.wait_for(reader.readexactly(1), timeout=10)
if auth_hdr[0] != 1:
log.debug("[%s] bad auth request ver: %s", label, auth_hdr)
writer.write(b'\x01\x01')
await writer.drain()
writer.close()
return
ulen = (await reader.readexactly(1))[0]
client_username = (await reader.readexactly(ulen)).decode('utf-8', errors='ignore')
plen = (await reader.readexactly(1))[0]
client_password = (await reader.readexactly(plen)).decode('utf-8', errors='ignore')
if client_username != _auth_user or client_password != _auth_password:
log.warning("[%s] auth failed with creds: %s/%s", label, client_username, client_password)
writer.write(b'\x01\x01')
await writer.drain()
writer.close()
return
log.debug("[%s] auth success", label)
writer.write(b'\x01\x00') # Success
await writer.drain()
else:
# If have some problems
writer.write(b'\x05\xff')
await writer.drain()
writer.close()
return
# -- SOCKS5 CONNECT request -- # -- SOCKS5 CONNECT request --
req = await asyncio.wait_for(reader.readexactly(4), timeout=10) req = await asyncio.wait_for(reader.readexactly(4), timeout=10)
@ -1064,7 +1115,12 @@ async def _run(port: int, dc_opt: Dict[int, Optional[str]],
log.info(" DC%d: %s", dc, ip) log.info(" DC%d: %s", dc, ip)
log.info("=" * 60) log.info("=" * 60)
log.info(" Configure Telegram Desktop:") log.info(" Configure Telegram Desktop:")
log.info(" SOCKS5 proxy -> %s:%d (no user/pass)", host, port) if _auth_user and _auth_password:
log.info(" SOCKS5 proxy -> %s:%d ( %s / %s )", host, port, _auth_user, _auth_password)
log.info(" tg://socks/?server=%s&port=%s&user=%s&pass=%s", host, port, quote(_auth_user), quote(_auth_password))
else:
log.info(" SOCKS5 proxy -> %s:%d (no user/pass)", host, port)
log.info(" tg://socks/?server=%s&port=%s", host, port)
log.info("=" * 60) log.info("=" * 60)
async def log_stats(): async def log_stats():
@ -1101,6 +1157,20 @@ async def _run(port: int, dc_opt: Dict[int, Optional[str]],
_server_instance = None _server_instance = None
def set_auth_credentials(username: str, password: str):
"""Validate 'user' and 'password': both are specified, or both aren't"""
empty_user = username is None or not str(username).strip()
empty_pass = password is None or not str(password).strip()
if empty_user != empty_pass:
raise ValueError(
"Both --username (-u) and --password (-P) must be specified together, or neither should be used")
global _auth_user, _auth_password
_auth_user = username
_auth_password = password
def parse_dc_ip_list(dc_ip_list: List[str]) -> Dict[int, str]: def parse_dc_ip_list(dc_ip_list: List[str]) -> Dict[int, str]:
"""Parse list of 'DC:IP' strings into {dc: ip} dict.""" """Parse list of 'DC:IP' strings into {dc: ip} dict."""
dc_opt: Dict[int, str] = {} dc_opt: Dict[int, str] = {}
@ -1127,10 +1197,14 @@ def run_proxy(port: int, dc_opt: Dict[int, str],
def main(): def main():
ap = argparse.ArgumentParser( ap = argparse.ArgumentParser(
description='Telegram Desktop WebSocket Bridge Proxy') description='Telegram Desktop WebSocket Bridge Proxy')
ap.add_argument('--port', type=int, default=DEFAULT_PORT,
help=f'Listen port (default {DEFAULT_PORT})')
ap.add_argument('--host', type=str, default='127.0.0.1', ap.add_argument('--host', type=str, default='127.0.0.1',
help='Listen host (default 127.0.0.1)') help='Listen host (default 127.0.0.1)')
ap.add_argument('--port', type=int, default=DEFAULT_PORT,
help=f'Listen port (default {DEFAULT_PORT})')
ap.add_argument('-u', '--user', type=str, default='',
help='User for proxy')
ap.add_argument('-P', '--password', type=str, default='',
help='Password for proxy')
ap.add_argument('--dc-ip', metavar='DC:IP', action='append', ap.add_argument('--dc-ip', metavar='DC:IP', action='append',
default=[], default=[],
help='Target IP for a DC, e.g. --dc-ip 1:149.154.175.205' help='Target IP for a DC, e.g. --dc-ip 1:149.154.175.205'
@ -1154,6 +1228,7 @@ def main():
try: try:
dc_opt = parse_dc_ip_list(args.dc_ip) dc_opt = parse_dc_ip_list(args.dc_ip)
set_auth_credentials(args.user, args.password)
except ValueError as e: except ValueError as e:
log.error(str(e)) log.error(str(e))
sys.exit(1) sys.exit(1)