From 23f0e4d4269266f62898bbf12ef8396231deedf5 Mon Sep 17 00:00:00 2001 From: Erik <46452865+k1zn@users.noreply.github.com> Date: Sat, 30 May 2026 19:31:47 +0300 Subject: [PATCH] Fall back to system libcrypto when `cryptography` is unavailable (#894) --- proxy/_aes.py | 130 +++++++++++++++++++++++++++++++++++++++++++ proxy/bridge.py | 2 +- proxy/tg_ws_proxy.py | 2 +- 3 files changed, 132 insertions(+), 2 deletions(-) create mode 100644 proxy/_aes.py diff --git a/proxy/_aes.py b/proxy/_aes.py new file mode 100644 index 0000000..ccdb2ca --- /dev/null +++ b/proxy/_aes.py @@ -0,0 +1,130 @@ +""" +AES-CTR shim. + +Prefers `cryptography` if available (desktop / Docker). Falls back to a +ctypes wrapper over the system OpenSSL `libcrypto` for environments where +installing `cryptography` is painful (Entware on routers, embedded boxes +without a Rust toolchain). The public surface mimics the small subset of +`cryptography.hazmat.primitives.ciphers` that this project actually uses: + Cipher(algorithms.AES(key), modes.CTR(iv)).encryptor().update(data) +""" +from __future__ import annotations + +try: + from cryptography.hazmat.primitives.ciphers import ( # noqa: F401 + Cipher, algorithms, modes, + ) +except ImportError: + import ctypes + import ctypes.util + + def _load_libcrypto(): + name = ctypes.util.find_library("crypto") + candidates = [] + if name: + candidates.append(name) + candidates += [ + "libcrypto.so.3", "libcrypto.so.1.1", "libcrypto.so.1.0.0", + "libcrypto.so", "/opt/lib/libcrypto.so", + "/opt/lib/libcrypto.so.1.1", "/opt/lib/libcrypto.so.3", + ] + last_err = None + for c in candidates: + try: + return ctypes.CDLL(c) + except OSError as e: + last_err = e + raise RuntimeError( + "libcrypto not found; install openssl-util or " + "`opkg install libopenssl`. Last error: %r" % last_err + ) + + _libcrypto = _load_libcrypto() + + _libcrypto.EVP_CIPHER_CTX_new.restype = ctypes.c_void_p + _libcrypto.EVP_CIPHER_CTX_free.argtypes = [ctypes.c_void_p] + _libcrypto.EVP_aes_128_ctr.restype = ctypes.c_void_p + _libcrypto.EVP_aes_192_ctr.restype = ctypes.c_void_p + _libcrypto.EVP_aes_256_ctr.restype = ctypes.c_void_p + _libcrypto.EVP_EncryptInit_ex.argtypes = [ + ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, + ctypes.c_char_p, ctypes.c_char_p, + ] + _libcrypto.EVP_EncryptInit_ex.restype = ctypes.c_int + _libcrypto.EVP_EncryptUpdate.argtypes = [ + ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int), + ctypes.c_char_p, ctypes.c_int, + ] + _libcrypto.EVP_EncryptUpdate.restype = ctypes.c_int + + _EVP_BY_KEY = { + 16: _libcrypto.EVP_aes_128_ctr, + 24: _libcrypto.EVP_aes_192_ctr, + 32: _libcrypto.EVP_aes_256_ctr, + } + + class algorithms: + class AES: + __slots__ = ("key",) + + def __init__(self, key: bytes): + if len(key) not in _EVP_BY_KEY: + raise ValueError("AES key must be 16/24/32 bytes") + self.key = bytes(key) + + class modes: + class CTR: + __slots__ = ("iv",) + + def __init__(self, iv: bytes): + if len(iv) != 16: + raise ValueError("CTR IV must be 16 bytes") + self.iv = bytes(iv) + + class _CtrStream: + __slots__ = ("_ctx",) + + def __init__(self, key: bytes, iv: bytes): + ctx = _libcrypto.EVP_CIPHER_CTX_new() + if not ctx: + raise RuntimeError("EVP_CIPHER_CTX_new failed") + self._ctx = ctx + evp = _EVP_BY_KEY[len(key)]() + if _libcrypto.EVP_EncryptInit_ex(ctx, evp, None, key, iv) != 1: + _libcrypto.EVP_CIPHER_CTX_free(ctx) + self._ctx = None + raise RuntimeError("EVP_EncryptInit_ex failed") + + def update(self, data: bytes) -> bytes: + if not data: + return b"" + outlen = ctypes.c_int(0) + buf = ctypes.create_string_buffer(len(data) + 16) + if _libcrypto.EVP_EncryptUpdate( + self._ctx, buf, ctypes.byref(outlen), bytes(data), len(data) + ) != 1: + raise RuntimeError("EVP_EncryptUpdate failed") + return buf.raw[:outlen.value] + + def __del__(self): + ctx = getattr(self, "_ctx", None) + if ctx: + _libcrypto.EVP_CIPHER_CTX_free(ctx) + self._ctx = None + + class Cipher: + __slots__ = ("_key", "_iv") + + def __init__(self, algorithm, mode): + if not isinstance(algorithm, algorithms.AES): + raise TypeError("only AES is supported") + if not isinstance(mode, modes.CTR): + raise TypeError("only CTR mode is supported") + self._key = algorithm.key + self._iv = mode.iv + + def encryptor(self) -> _CtrStream: + return _CtrStream(self._key, self._iv) + + # CTR is symmetric — decryption == encryption with the same keystream. + decryptor = encryptor diff --git a/proxy/bridge.py b/proxy/bridge.py index eff59eb..a68d1b3 100644 --- a/proxy/bridge.py +++ b/proxy/bridge.py @@ -2,7 +2,7 @@ import asyncio import logging import struct -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from ._aes import Cipher, algorithms, modes from typing import Dict, List, Optional from urllib.parse import urlencode diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index 8e875c3..ba07d63 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -14,7 +14,7 @@ import socket as _socket from collections import deque from typing import Dict, List, Optional, Set, Tuple -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from ._aes import Cipher, algorithms, modes if __name__ == '__main__' and (__package__ is None or __package__ == ''): _repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))