TamTam: websocket transport for web version

This commit is contained in:
Alexey Polyakov 2026-03-27 19:00:14 +03:00
parent ac76015d08
commit 7a2e5a20d6
7 changed files with 193 additions and 11 deletions

View File

@ -28,6 +28,10 @@ class BaseProcessor:
async def _send(self, writer, packet):
try:
# Если объектом является вебсокет, то используем функцию send для отправки
if hasattr(writer, 'send'):
await writer.send(packet)
else: # В ином случае отправляем как в обычный сокет
writer.write(packet)
await writer.drain()
except Exception:

View File

@ -1,9 +1,9 @@
# Импортирование библиотек
import ssl, logging, asyncio
from common.config import ServerConfig
from oneme.controller import OnemeMobileController
from oneme.controller import OnemeController
from telegrambot.controller import TelegramBotController
from tamtam.controller import TTMobileController
from tamtam.controller import TTController
# Конфиг сервера
server_config = ServerConfig()
@ -72,8 +72,8 @@ async def main():
}
controllers = {
"oneme_mobile": OnemeMobileController(),
"tamtam_mobile": TTMobileController(),
"oneme": OnemeController(),
"tamtam": TTController(),
"telegrambot": TelegramBotController()
}

View File

@ -6,7 +6,7 @@ from classes.controllerbase import ControllerBase
from common.config import ServerConfig
from common.opcodes import Opcodes
class OnemeMobileController(ControllerBase):
class OnemeController(ControllerBase):
def __init__(self):
self.config = ServerConfig()
self.proto = MobileProto()

View File

@ -1,9 +1,10 @@
import asyncio
from tamtam.socket import TTMobileServer
from tamtam.websocket import TTWebSocketServer
from classes.controllerbase import ControllerBase
from common.config import ServerConfig
class TTMobileController(ControllerBase):
class TTController(ControllerBase):
def __init__(self):
self.config = ServerConfig()
@ -17,6 +18,14 @@ class TTMobileController(ControllerBase):
db_pool=api['db'],
clients=api['clients'],
send_event=api['event']
).start(),
TTWebSocketServer(
host=self.config.host,
port=self.config.tamtam_ws_port,
ssl_context=api['ssl'],
db_pool=api['db'],
clients=api['clients'],
send_event=api['event']
).start()
)

View File

@ -6,7 +6,7 @@ class UserAgentModel(pydantic.BaseModel):
osVersion: str
timezone: str
screen: str
pushDeviceType: str
pushDeviceType: str = None
locale: str
deviceName: str
deviceLocale: str

View File

@ -1,4 +1,6 @@
import asyncio, logging, traceback
import asyncio
import logging
import traceback
from common.proto_tcp import MobileProto
from tamtam.processors import Processors
from common.rate_limiter import RateLimiter
@ -6,7 +8,7 @@ from common.opcodes import Opcodes
from common.tools import Tools
class TTMobileServer:
def __init__(self, host="0.0.0.0", port=443, ssl_context=None, db_pool=None, clients={}, send_event=None):
def __init__(self, host, port, ssl_context, db_pool, clients, send_event):
self.host = host
self.port = port
self.ssl_context = ssl_context

167
src/tamtam/websocket.py Normal file
View File

@ -0,0 +1,167 @@
import logging
import traceback
import websockets
from common.proto_web import WebProto
from tamtam.processors import Processors
from common.rate_limiter import RateLimiter
from common.opcodes import Opcodes
from common.tools import Tools
class TTWebSocketServer:
def __init__(self, host, port, clients, ssl_context, db_pool, send_event):
self.host = host
self.port = port
self.ssl_context = ssl_context
self.server = None
self.logger = logging.getLogger(__name__)
self.db_pool = db_pool
self.clients = clients
self.opcodes = Opcodes()
self.proto = WebProto()
self.processors = Processors(db_pool=db_pool, clients=clients, send_event=send_event, type="web")
self.auth_required = Tools().auth_required
# rate limiter
self.auth_rate_limiter = RateLimiter(max_attempts=5, window_seconds=60)
self.read_timeout = 300 # Таймаут чтения из websocket (секунды)
self.max_read_size = 65536 # Максимальный размер данных
async def handle_client(self, websocket, path):
"""Функция для обработки WebSocket подключений"""
# IP-адрес клиента
address = websocket.remote_address
self.logger.info(f"Работаю с клиентом {address[0]}:{address[1]}")
deviceType = None
deviceName = None
userPhone = None
userId = None
hashedToken = None
try:
async for message in websocket:
# Проверяем размер данных
if len(message) > self.max_read_size:
self.logger.warning(f"Пакет от {address[0]}:{address[1]} превышает лимит ({len(message)} байт)")
break
# Распаковываем данные
packet = self.proto.unpack_packet(message)
# Если пакет невалидный — пропускаем
if not packet:
self.logger.warning(f"Невалидный пакет от {address[0]}:{address[1]}")
continue
opcode = packet.get("opcode")
seq = packet.get("seq")
payload = packet.get("payload")
match opcode:
case self.opcodes.SESSION_INIT:
deviceType, deviceName = await self.processors.session_init(payload, seq, websocket)
case self.opcodes.PING:
await self.processors.ping(payload, seq, websocket)
case self.opcodes.LOG:
await self.processors.log(payload, seq, websocket)
case self.opcodes.AUTH_REQUEST:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.AUTH_REQUEST, self.processors.error_types.RATE_LIMITED, websocket)
else:
await self.processors.auth_request(payload, seq, websocket)
case self.opcodes.AUTH:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.AUTH, self.processors.error_types.RATE_LIMITED, websocket)
else:
await self.processors.auth(payload, seq, websocket)
case self.opcodes.AUTH_CONFIRM:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.AUTH_CONFIRM, self.processors.error_types.RATE_LIMITED, websocket)
else:
await self.processors.auth_confirm(payload, seq, websocket, deviceType, deviceName)
case self.opcodes.LOGIN:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.LOGIN, self.processors.error_types.RATE_LIMITED, websocket)
else:
userPhone, userId, hashedToken = await self.processors.login(payload, seq, websocket)
if userPhone:
await self._finish_auth(websocket, address, userPhone, userId)
case self.opcodes.CONTACT_INFO:
await self.auth_required(
userPhone, self.processors.contact_info, payload, seq, websocket
)
case _:
self.logger.warning(f"Неизвестный опкод {opcode}")
except websockets.exceptions.ConnectionClosed:
self.logger.info(f"Прекратил работать с клиентом {address[0]}:{address[1]}")
except Exception as e:
self.logger.error(f" Произошла ошибка при работе с клиентом {address[0]}:{address[1]}: {e}")
traceback.print_exc()
# Удаляем клиента из словаря при отключении
if userId:
await self._end_session(userId, address[0], address[1])
self.logger.info(f"Прекратил работать с клиентом {address[0]}:{address[1]}")
async def _finish_auth(self, websocket, addr, phone, id):
"""Завершение открытия сессии"""
# Ищем пользователя в словаре
user = self.clients.get(id)
# Добавляем новое подключение в словарь
if user:
user["clients"].append(
{
"writer": websocket,
"ip": addr[0],
"port": addr[1],
"protocol": "tamtam_websocket"
}
)
else:
self.clients[id] = {
"phone": phone,
"id": id,
"clients": [
{
"writer": websocket,
"ip": addr[0],
"port": addr[1],
"protocol": "tamtam_websocket"
}
]
}
async def _end_session(self, id, ip, port):
"""Завершение сессии"""
# Получаем пользователя в списке
user = self.clients.get(id)
if not user:
return
# Получаем подключения пользователя
clients = user.get("clients", [])
# Удаляем нужное подключение из словаря
for i, client in enumerate(clients):
if (client.get("ip"), client.get("port")) == (ip, port):
clients.pop(i)
async def start(self):
"""Функция для запуска WebSocket сервера"""
self.server = await websockets.serve(
self.handle_client,
self.host,
self.port,
ssl=self.ssl_context
)
self.logger.info(f"TT WebSocket запущен на порту {self.port}")
await self.server.wait_closed()