Compare commits

...

4 Commits

Author SHA1 Message Date
Alexey Polyakov 4014bab5c9 Правка контроллеров небольшая 2026-03-27 19:35:59 +03:00
Alexey Polyakov 0b7282b284 TamTam && MAX: история (в мохе она вроде теперь получше работает) 2026-03-27 19:26:20 +03:00
Alexey Polyakov 7a2e5a20d6 TamTam: websocket transport for web version 2026-03-27 19:00:14 +03:00
Alexey Polyakov ac76015d08 Common: поправил 1 прикол 2026-03-27 17:41:09 +03:00
12 changed files with 315 additions and 20 deletions

View File

@ -28,8 +28,12 @@ class BaseProcessor:
async def _send(self, writer, packet): async def _send(self, writer, packet):
try: try:
writer.write(packet) # Если объектом является вебсокет, то используем функцию send для отправки
await writer.drain() if hasattr(writer, 'send'):
await writer.send(packet)
else: # В ином случае отправляем как в обычный сокет
writer.write(packet)
await writer.drain()
except Exception: except Exception:
pass pass

View File

@ -71,7 +71,7 @@ class MobileProto:
"payload": payload, "payload": payload,
} }
def pack_packet(self, ver: int = 10, cmd: int = 1, seq: int = 1, opcode: int = 6, payload: dict = None) -> bytes: def pack_packet(self, ver: int = 10, cmd: int = 0x100, seq: int = 1, opcode: int = 6, payload: dict = None) -> bytes:
# Запаковываем заголовок # Запаковываем заголовок
ver_b = ver.to_bytes(1, "big") ver_b = ver.to_bytes(1, "big")
cmd_b = cmd.to_bytes(2, "big") cmd_b = cmd.to_bytes(2, "big")

View File

@ -1,9 +1,9 @@
# Импортирование библиотек # Импортирование библиотек
import ssl, logging, asyncio import ssl, logging, asyncio
from common.config import ServerConfig from common.config import ServerConfig
from oneme.controller import OnemeMobileController from oneme.controller import OnemeController
from telegrambot.controller import TelegramBotController from telegrambot.controller import TelegramBotController
from tamtam.controller import TTMobileController from tamtam.controller import TTController
# Конфиг сервера # Конфиг сервера
server_config = ServerConfig() server_config = ServerConfig()
@ -72,8 +72,8 @@ async def main():
} }
controllers = { controllers = {
"oneme_mobile": OnemeMobileController(), "oneme": OnemeController(),
"tamtam_mobile": TTMobileController(), "tamtam": TTController(),
"telegrambot": TelegramBotController() "telegrambot": TelegramBotController()
} }

View File

@ -6,7 +6,7 @@ from classes.controllerbase import ControllerBase
from common.config import ServerConfig from common.config import ServerConfig
from common.opcodes import Opcodes from common.opcodes import Opcodes
class OnemeMobileController(ControllerBase): class OnemeController(ControllerBase):
def __init__(self): def __init__(self):
self.config = ServerConfig() self.config = ServerConfig()
self.proto = MobileProto() self.proto = MobileProto()

View File

@ -1148,7 +1148,7 @@ class Processors:
if getMessages: if getMessages:
if backward > 0: if backward > 0:
await cursor.execute( await cursor.execute(
"SELECT * FROM messages WHERE chat_id = %s AND time < %s ORDER BY id DESC LIMIT %s", "SELECT * FROM messages WHERE chat_id = %s AND time < %s ORDER BY time ASC LIMIT %s",
(chatId, from_time, backward) (chatId, from_time, backward)
) )
@ -1168,7 +1168,7 @@ class Processors:
if forward > 0: if forward > 0:
await cursor.execute( await cursor.execute(
"SELECT * FROM messages WHERE chat_id = %s AND time > %s ORDER BY id ASC LIMIT %s", "SELECT * FROM messages WHERE chat_id = %s AND time > %s ORDER BY time ASC LIMIT %s",
(chatId, from_time, forward) (chatId, from_time, forward)
) )
@ -1186,6 +1186,9 @@ class Processors:
"reactionInfo": {} "reactionInfo": {}
}) })
# Сортируем сообщения по времени
messages.sort(key=lambda x: x["time"])
# Формируем ответ # Формируем ответ
payload = { payload = {
"messages": messages "messages": messages

View File

@ -188,7 +188,7 @@ class OnemeMobileServer:
"writer": writer, "writer": writer,
"ip": addr[0], "ip": addr[0],
"port": addr[1], "port": addr[1],
"protocol": "oneme_mobile" "protocol": "oneme"
} }
) )
else: else:
@ -200,7 +200,7 @@ class OnemeMobileServer:
"writer": writer, "writer": writer,
"ip": addr[0], "ip": addr[0],
"port": addr[1], "port": addr[1],
"protocol": "oneme_mobile" "protocol": "oneme"
} }
] ]
} }

View File

@ -1,9 +1,10 @@
import asyncio import asyncio
from tamtam.socket import TTMobileServer from tamtam.socket import TTMobileServer
from tamtam.websocket import TTWebSocketServer
from classes.controllerbase import ControllerBase from classes.controllerbase import ControllerBase
from common.config import ServerConfig from common.config import ServerConfig
class TTMobileController(ControllerBase): class TTController(ControllerBase):
def __init__(self): def __init__(self):
self.config = ServerConfig() self.config = ServerConfig()
@ -17,6 +18,14 @@ class TTMobileController(ControllerBase):
db_pool=api['db'], db_pool=api['db'],
clients=api['clients'], clients=api['clients'],
send_event=api['event'] send_event=api['event']
).start(),
TTWebSocketServer(
host=self.config.host,
port=self.config.tamtam_ws_port,
ssl_context=api['ssl'],
db_pool=api['db'],
clients=api['clients'],
send_event=api['event']
).start() ).start()
) )

View File

@ -6,7 +6,7 @@ class UserAgentModel(pydantic.BaseModel):
osVersion: str osVersion: str
timezone: str timezone: str
screen: str screen: str
pushDeviceType: str pushDeviceType: str = None
locale: str locale: str
deviceName: str deviceName: str
deviceLocale: str deviceLocale: str
@ -37,4 +37,8 @@ class SearchUsersPayloadModel(pydantic.BaseModel):
contactIds: list contactIds: list
class PingPayloadModel(pydantic.BaseModel): class PingPayloadModel(pydantic.BaseModel):
interactive: bool interactive: bool
class ChatHistoryPayloadModel(pydantic.BaseModel):
chatId: int
backward: int

View File

@ -1,6 +1,10 @@
from .main import MainProcessors from .main import MainProcessors
from .auth import AuthProcessors from .auth import AuthProcessors
from .search import SearchProcessors from .search import SearchProcessors
from .history import HistoryProcessors
class Processors(MainProcessors, AuthProcessors, SearchProcessors): class Processors(MainProcessors,
AuthProcessors,
SearchProcessors,
HistoryProcessors):
pass pass

View File

@ -0,0 +1,98 @@
import pydantic
import json
from classes.baseprocessor import BaseProcessor
from tamtam.models import ChatHistoryPayloadModel
class HistoryProcessors(BaseProcessor):
async def chat_history(self, payload, seq, writer, senderId):
"""Обработчик получения истории чата"""
# Валидируем данные пакета
try:
ChatHistoryPayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.CHAT_HISTORY, self.error_types.INVALID_PAYLOAD, writer)
return
# Извлекаем данные из пакета
chatId = payload.get("chatId")
forward = payload.get("forward", 0)
backward = payload.get("backward", 0)
from_time = payload.get("from", 0)
getMessages = payload.get("getMessages", True)
messages = []
# Проверяем, существует ли чат
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,))
chat = await cursor.fetchone()
# Выбрасываем ошибку, если чата нет
if not chat:
await self._send_error(seq, self.opcodes.CHAT_HISTORY, self.error_types.CHAT_NOT_FOUND, writer)
return
# Проверяем, является ли пользователь участником чата
participants = await self.tools.get_chat_participants(chatId, self.db_pool)
if int(senderId) not in participants:
await self._send_error(seq, self.opcodes.CHAT_HISTORY, self.error_types.CHAT_NOT_ACCESS, writer)
return
# Если запрошены сообщения
if getMessages:
if backward > 0:
await cursor.execute(
"SELECT * FROM messages WHERE chat_id = %s AND time < %s ORDER BY time ASC LIMIT %s",
(chatId, from_time, backward)
)
result = await cursor.fetchall()
for row in result:
messages.append({
"id": row.get("id"),
"time": int(row.get("time")),
"type": row.get("type"),
"sender": row.get("sender"),
"text": row.get("text"),
"attaches": json.loads(row.get("attaches")),
"elements": json.loads(row.get("elements")),
"reactionInfo": {}
})
if forward > 0:
await cursor.execute(
"SELECT * FROM messages WHERE chat_id = %s AND time > %s ORDER BY time ASC LIMIT %s",
(chatId, from_time, forward)
)
result = await cursor.fetchall()
for row in result:
messages.append({
"id": row.get("id"),
"time": int(row.get("time")),
"type": row.get("type"),
"sender": row.get("sender"),
"text": row.get("text"),
"attaches": json.loads(row.get("attaches")),
"elements": json.loads(row.get("elements")),
"reactionInfo": {}
})
# Сортируем сообщения по времени
messages.sort(key=lambda x: x["time"])
# Формируем ответ
payload = {
"messages": messages
}
# Собираем пакет
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CHAT_HISTORY, payload=payload
)
# Отправялем
await self._send(writer, packet)

View File

@ -1,4 +1,6 @@
import asyncio, logging, traceback import asyncio
import logging
import traceback
from common.proto_tcp import MobileProto from common.proto_tcp import MobileProto
from tamtam.processors import Processors from tamtam.processors import Processors
from common.rate_limiter import RateLimiter from common.rate_limiter import RateLimiter
@ -6,7 +8,7 @@ from common.opcodes import Opcodes
from common.tools import Tools from common.tools import Tools
class TTMobileServer: class TTMobileServer:
def __init__(self, host="0.0.0.0", port=443, ssl_context=None, db_pool=None, clients={}, send_event=None): def __init__(self, host, port, ssl_context, db_pool, clients, send_event):
self.host = host self.host = host
self.port = port self.port = port
self.ssl_context = ssl_context self.ssl_context = ssl_context
@ -128,7 +130,7 @@ class TTMobileServer:
"writer": writer, "writer": writer,
"ip": addr[0], "ip": addr[0],
"port": addr[1], "port": addr[1],
"protocol": "tamtam_mobile" "protocol": "tamtam"
} }
) )
else: else:
@ -140,7 +142,7 @@ class TTMobileServer:
"writer": writer, "writer": writer,
"ip": addr[0], "ip": addr[0],
"port": addr[1], "port": addr[1],
"protocol": "tamtam_mobile" "protocol": "tamtam"
} }
] ]
} }

171
src/tamtam/websocket.py Normal file
View File

@ -0,0 +1,171 @@
import logging
import traceback
import websockets
from common.proto_web import WebProto
from tamtam.processors import Processors
from common.rate_limiter import RateLimiter
from common.opcodes import Opcodes
from common.tools import Tools
class TTWebSocketServer:
def __init__(self, host, port, clients, ssl_context, db_pool, send_event):
self.host = host
self.port = port
self.ssl_context = ssl_context
self.server = None
self.logger = logging.getLogger(__name__)
self.db_pool = db_pool
self.clients = clients
self.opcodes = Opcodes()
self.proto = WebProto()
self.processors = Processors(db_pool=db_pool, clients=clients, send_event=send_event, type="web")
self.auth_required = Tools().auth_required
# rate limiter
self.auth_rate_limiter = RateLimiter(max_attempts=5, window_seconds=60)
self.read_timeout = 300 # Таймаут чтения из websocket (секунды)
self.max_read_size = 65536 # Максимальный размер данных
async def handle_client(self, websocket, path):
"""Функция для обработки WebSocket подключений"""
# IP-адрес клиента
address = websocket.remote_address
self.logger.info(f"Работаю с клиентом {address[0]}:{address[1]}")
deviceType = None
deviceName = None
userPhone = None
userId = None
hashedToken = None
try:
async for message in websocket:
# Проверяем размер данных
if len(message) > self.max_read_size:
self.logger.warning(f"Пакет от {address[0]}:{address[1]} превышает лимит ({len(message)} байт)")
break
# Распаковываем данные
packet = self.proto.unpack_packet(message)
# Если пакет невалидный — пропускаем
if not packet:
self.logger.warning(f"Невалидный пакет от {address[0]}:{address[1]}")
continue
opcode = packet.get("opcode")
seq = packet.get("seq")
payload = packet.get("payload")
match opcode:
case self.opcodes.SESSION_INIT:
deviceType, deviceName = await self.processors.session_init(payload, seq, websocket)
case self.opcodes.PING:
await self.processors.ping(payload, seq, websocket)
case self.opcodes.LOG:
await self.processors.log(payload, seq, websocket)
case self.opcodes.AUTH_REQUEST:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.AUTH_REQUEST, self.processors.error_types.RATE_LIMITED, websocket)
else:
await self.processors.auth_request(payload, seq, websocket)
case self.opcodes.AUTH:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.AUTH, self.processors.error_types.RATE_LIMITED, websocket)
else:
await self.processors.auth(payload, seq, websocket)
case self.opcodes.AUTH_CONFIRM:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.AUTH_CONFIRM, self.processors.error_types.RATE_LIMITED, websocket)
else:
await self.processors.auth_confirm(payload, seq, websocket, deviceType, deviceName)
case self.opcodes.LOGIN:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.LOGIN, self.processors.error_types.RATE_LIMITED, websocket)
else:
userPhone, userId, hashedToken = await self.processors.login(payload, seq, websocket)
if userPhone:
await self._finish_auth(websocket, address, userPhone, userId)
case self.opcodes.CONTACT_INFO:
await self.auth_required(
userPhone, self.processors.contact_info, payload, seq, websocket
)
case self.opcodes.CHAT_HISTORY:
await self.auth_required(
userPhone, self.processors.chat_history, payload, seq, websocket, userId
)
case _:
self.logger.warning(f"Неизвестный опкод {opcode}")
except websockets.exceptions.ConnectionClosed:
self.logger.info(f"Прекратил работать с клиентом {address[0]}:{address[1]}")
except Exception as e:
self.logger.error(f" Произошла ошибка при работе с клиентом {address[0]}:{address[1]}: {e}")
traceback.print_exc()
# Удаляем клиента из словаря при отключении
if userId:
await self._end_session(userId, address[0], address[1])
self.logger.info(f"Прекратил работать с клиентом {address[0]}:{address[1]}")
async def _finish_auth(self, websocket, addr, phone, id):
"""Завершение открытия сессии"""
# Ищем пользователя в словаре
user = self.clients.get(id)
# Добавляем новое подключение в словарь
if user:
user["clients"].append(
{
"writer": websocket,
"ip": addr[0],
"port": addr[1],
"protocol": "tamtam"
}
)
else:
self.clients[id] = {
"phone": phone,
"id": id,
"clients": [
{
"writer": websocket,
"ip": addr[0],
"port": addr[1],
"protocol": "tamtam"
}
]
}
async def _end_session(self, id, ip, port):
"""Завершение сессии"""
# Получаем пользователя в списке
user = self.clients.get(id)
if not user:
return
# Получаем подключения пользователя
clients = user.get("clients", [])
# Удаляем нужное подключение из словаря
for i, client in enumerate(clients):
if (client.get("ip"), client.get("port")) == (ip, port):
clients.pop(i)
async def start(self):
"""Функция для запуска WebSocket сервера"""
self.server = await websockets.serve(
self.handle_client,
self.host,
self.port,
ssl=self.ssl_context
)
self.logger.info(f"TT WebSocket запущен на порту {self.port}")
await self.server.wait_closed()