From 7a2e5a20d6ca17da9ddc4e840f935954c611ba09 Mon Sep 17 00:00:00 2001 From: Alexey Polyakov Date: Fri, 27 Mar 2026 19:00:14 +0300 Subject: [PATCH] TamTam: websocket transport for web version --- src/classes/baseprocessor.py | 8 +- src/main.py | 8 +- src/oneme/controller.py | 2 +- src/tamtam/controller.py | 11 ++- src/tamtam/models.py | 2 +- src/tamtam/socket.py | 6 +- src/tamtam/websocket.py | 167 +++++++++++++++++++++++++++++++++++ 7 files changed, 193 insertions(+), 11 deletions(-) create mode 100644 src/tamtam/websocket.py diff --git a/src/classes/baseprocessor.py b/src/classes/baseprocessor.py index 3bff792..7fc1160 100644 --- a/src/classes/baseprocessor.py +++ b/src/classes/baseprocessor.py @@ -28,8 +28,12 @@ class BaseProcessor: async def _send(self, writer, packet): try: - writer.write(packet) - await writer.drain() + # Если объектом является вебсокет, то используем функцию send для отправки + if hasattr(writer, 'send'): + await writer.send(packet) + else: # В ином случае отправляем как в обычный сокет + writer.write(packet) + await writer.drain() except Exception: pass diff --git a/src/main.py b/src/main.py index b0509c1..d271415 100644 --- a/src/main.py +++ b/src/main.py @@ -1,9 +1,9 @@ # Импортирование библиотек import ssl, logging, asyncio from common.config import ServerConfig -from oneme.controller import OnemeMobileController +from oneme.controller import OnemeController from telegrambot.controller import TelegramBotController -from tamtam.controller import TTMobileController +from tamtam.controller import TTController # Конфиг сервера server_config = ServerConfig() @@ -72,8 +72,8 @@ async def main(): } controllers = { - "oneme_mobile": OnemeMobileController(), - "tamtam_mobile": TTMobileController(), + "oneme": OnemeController(), + "tamtam": TTController(), "telegrambot": TelegramBotController() } diff --git a/src/oneme/controller.py b/src/oneme/controller.py index 924a194..1fadd59 100644 --- a/src/oneme/controller.py +++ b/src/oneme/controller.py @@ -6,7 +6,7 @@ from classes.controllerbase import ControllerBase from common.config import ServerConfig from common.opcodes import Opcodes -class OnemeMobileController(ControllerBase): +class OnemeController(ControllerBase): def __init__(self): self.config = ServerConfig() self.proto = MobileProto() diff --git a/src/tamtam/controller.py b/src/tamtam/controller.py index c21e479..b264766 100644 --- a/src/tamtam/controller.py +++ b/src/tamtam/controller.py @@ -1,9 +1,10 @@ import asyncio from tamtam.socket import TTMobileServer +from tamtam.websocket import TTWebSocketServer from classes.controllerbase import ControllerBase from common.config import ServerConfig -class TTMobileController(ControllerBase): +class TTController(ControllerBase): def __init__(self): self.config = ServerConfig() @@ -17,6 +18,14 @@ class TTMobileController(ControllerBase): db_pool=api['db'], clients=api['clients'], send_event=api['event'] + ).start(), + TTWebSocketServer( + host=self.config.host, + port=self.config.tamtam_ws_port, + ssl_context=api['ssl'], + db_pool=api['db'], + clients=api['clients'], + send_event=api['event'] ).start() ) diff --git a/src/tamtam/models.py b/src/tamtam/models.py index 84df963..7d72e59 100644 --- a/src/tamtam/models.py +++ b/src/tamtam/models.py @@ -6,7 +6,7 @@ class UserAgentModel(pydantic.BaseModel): osVersion: str timezone: str screen: str - pushDeviceType: str + pushDeviceType: str = None locale: str deviceName: str deviceLocale: str diff --git a/src/tamtam/socket.py b/src/tamtam/socket.py index 685c61f..c5f0d1d 100644 --- a/src/tamtam/socket.py +++ b/src/tamtam/socket.py @@ -1,4 +1,6 @@ -import asyncio, logging, traceback +import asyncio +import logging +import traceback from common.proto_tcp import MobileProto from tamtam.processors import Processors from common.rate_limiter import RateLimiter @@ -6,7 +8,7 @@ from common.opcodes import Opcodes from common.tools import Tools class TTMobileServer: - def __init__(self, host="0.0.0.0", port=443, ssl_context=None, db_pool=None, clients={}, send_event=None): + def __init__(self, host, port, ssl_context, db_pool, clients, send_event): self.host = host self.port = port self.ssl_context = ssl_context diff --git a/src/tamtam/websocket.py b/src/tamtam/websocket.py new file mode 100644 index 0000000..7a20da1 --- /dev/null +++ b/src/tamtam/websocket.py @@ -0,0 +1,167 @@ +import logging +import traceback +import websockets +from common.proto_web import WebProto +from tamtam.processors import Processors +from common.rate_limiter import RateLimiter +from common.opcodes import Opcodes +from common.tools import Tools + +class TTWebSocketServer: + def __init__(self, host, port, clients, ssl_context, db_pool, send_event): + self.host = host + self.port = port + self.ssl_context = ssl_context + self.server = None + self.logger = logging.getLogger(__name__) + self.db_pool = db_pool + self.clients = clients + + self.opcodes = Opcodes() + + self.proto = WebProto() + self.processors = Processors(db_pool=db_pool, clients=clients, send_event=send_event, type="web") + self.auth_required = Tools().auth_required + + # rate limiter + self.auth_rate_limiter = RateLimiter(max_attempts=5, window_seconds=60) + + self.read_timeout = 300 # Таймаут чтения из websocket (секунды) + self.max_read_size = 65536 # Максимальный размер данных + + async def handle_client(self, websocket, path): + """Функция для обработки WebSocket подключений""" + # IP-адрес клиента + address = websocket.remote_address + self.logger.info(f"Работаю с клиентом {address[0]}:{address[1]}") + + deviceType = None + deviceName = None + + userPhone = None + userId = None + hashedToken = None + + try: + async for message in websocket: + # Проверяем размер данных + if len(message) > self.max_read_size: + self.logger.warning(f"Пакет от {address[0]}:{address[1]} превышает лимит ({len(message)} байт)") + break + + # Распаковываем данные + packet = self.proto.unpack_packet(message) + + # Если пакет невалидный — пропускаем + if not packet: + self.logger.warning(f"Невалидный пакет от {address[0]}:{address[1]}") + continue + + opcode = packet.get("opcode") + seq = packet.get("seq") + payload = packet.get("payload") + + match opcode: + case self.opcodes.SESSION_INIT: + deviceType, deviceName = await self.processors.session_init(payload, seq, websocket) + case self.opcodes.PING: + await self.processors.ping(payload, seq, websocket) + case self.opcodes.LOG: + await self.processors.log(payload, seq, websocket) + case self.opcodes.AUTH_REQUEST: + if not self.auth_rate_limiter.is_allowed(address[0]): + await self.processors._send_error(seq, self.opcodes.AUTH_REQUEST, self.processors.error_types.RATE_LIMITED, websocket) + else: + await self.processors.auth_request(payload, seq, websocket) + case self.opcodes.AUTH: + if not self.auth_rate_limiter.is_allowed(address[0]): + await self.processors._send_error(seq, self.opcodes.AUTH, self.processors.error_types.RATE_LIMITED, websocket) + else: + await self.processors.auth(payload, seq, websocket) + case self.opcodes.AUTH_CONFIRM: + if not self.auth_rate_limiter.is_allowed(address[0]): + await self.processors._send_error(seq, self.opcodes.AUTH_CONFIRM, self.processors.error_types.RATE_LIMITED, websocket) + else: + await self.processors.auth_confirm(payload, seq, websocket, deviceType, deviceName) + case self.opcodes.LOGIN: + if not self.auth_rate_limiter.is_allowed(address[0]): + await self.processors._send_error(seq, self.opcodes.LOGIN, self.processors.error_types.RATE_LIMITED, websocket) + else: + userPhone, userId, hashedToken = await self.processors.login(payload, seq, websocket) + + if userPhone: + await self._finish_auth(websocket, address, userPhone, userId) + case self.opcodes.CONTACT_INFO: + await self.auth_required( + userPhone, self.processors.contact_info, payload, seq, websocket + ) + case _: + self.logger.warning(f"Неизвестный опкод {opcode}") + except websockets.exceptions.ConnectionClosed: + self.logger.info(f"Прекратил работать с клиентом {address[0]}:{address[1]}") + except Exception as e: + self.logger.error(f" Произошла ошибка при работе с клиентом {address[0]}:{address[1]}: {e}") + traceback.print_exc() + + # Удаляем клиента из словаря при отключении + if userId: + await self._end_session(userId, address[0], address[1]) + + self.logger.info(f"Прекратил работать с клиентом {address[0]}:{address[1]}") + + async def _finish_auth(self, websocket, addr, phone, id): + """Завершение открытия сессии""" + # Ищем пользователя в словаре + user = self.clients.get(id) + + # Добавляем новое подключение в словарь + if user: + user["clients"].append( + { + "writer": websocket, + "ip": addr[0], + "port": addr[1], + "protocol": "tamtam_websocket" + } + ) + else: + self.clients[id] = { + "phone": phone, + "id": id, + "clients": [ + { + "writer": websocket, + "ip": addr[0], + "port": addr[1], + "protocol": "tamtam_websocket" + } + ] + } + + async def _end_session(self, id, ip, port): + """Завершение сессии""" + # Получаем пользователя в списке + user = self.clients.get(id) + if not user: + return + + # Получаем подключения пользователя + clients = user.get("clients", []) + + # Удаляем нужное подключение из словаря + for i, client in enumerate(clients): + if (client.get("ip"), client.get("port")) == (ip, port): + clients.pop(i) + + async def start(self): + """Функция для запуска WebSocket сервера""" + self.server = await websockets.serve( + self.handle_client, + self.host, + self.port, + ssl=self.ssl_context + ) + + self.logger.info(f"TT WebSocket запущен на порту {self.port}") + + await self.server.wait_closed() \ No newline at end of file