openmax-server/src/oneme/processors.py

1299 lines
56 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import secrets
import hashlib
import time
import logging
from oneme.models import *
from common.proto_tcp import MobileProto
from common.proto_web import WebProto
from common.opcodes import Opcodes
from oneme.config import OnemeConfig
from common.tools import Tools
from common.config import ServerConfig
from common.static import Static
from common.sms import send_sms_code
class Processors:
def __init__(self, db_pool=None, clients={}, send_event=None, telegram_bot=None, type="socket"):
self.tools = Tools()
self.config = ServerConfig()
self.static = Static()
self.opcodes = Opcodes()
self.server_config = OnemeConfig().SERVER_CONFIG
self.error_types = self.static.ErrorTypes()
self.chat_types = self.static.ChatTypes()
self.db_pool = db_pool
self.event = send_event
self.clients = clients
self.telegram_bot = telegram_bot
self.logger = logging.getLogger(__name__)
if type == "socket":
self.proto = MobileProto()
elif type == "web":
self.proto = WebProto()
async def _send(self, writer, packet):
try:
writer.write(packet)
await writer.drain()
except Exception as error:
self.logger.error(f"Ошибка при отправке пакета - {error}")
async def _send_error(self, seq, opcode, type, writer):
payload = self.static.ERROR_TYPES.get(type, {
"localizedMessage": "Неизвестная ошибка",
"error": "unknown.error",
"message": "Unknown error",
"title": "Неизвестная ошибка"
})
packet = self.proto.pack_packet(
cmd=self.proto.CMD_ERR, seq=seq, opcode=opcode, payload=payload
)
await self._send(writer, packet)
async def session_init(self, payload, seq, writer):
"""Обработчик приветствия"""
# Валидируем данные пакета
try:
HelloPayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.SESSION_INIT, self.error_types.INVALID_PAYLOAD, writer)
return None, None
# Получаем данные из пакета
deviceType = payload.get("userAgent").get("deviceType")
deviceName = payload.get("userAgent").get("deviceName")
# Данные пакета
payload = {
"location": "RU",
"app-update-type": 0, # 1 = принудительное обновление
"reg-country-code": self.static.REG_COUNTRY_CODES,
"phone-auto-complete-enabled": False,
"lang": True
}
# Собираем пакет
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.SESSION_INIT, payload=payload
)
# Отправляем
await self._send(writer, packet)
return deviceType, deviceName
async def ping(self, payload, seq, writer):
"""Обработчик пинга"""
# Валидируем данные пакета
try:
PingPayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.PING, self.error_types.INVALID_PAYLOAD, writer)
return
# Собираем пакет
response = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.PING, payload=None
)
# Отправляем
await self._send(writer, response)
async def log(self, payload, seq, writer):
"""Обработчик телеметрии"""
# TODO: можно было бы реализовать валидацию телеметрии, но сейчас это не особо важно
# Собираем пакет
response = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.LOG, payload=None
)
# Отправляем
await self._send(writer, response)
async def auth_request(self, payload, seq, writer):
"""Обработчик запроса кода"""
try:
RequestCodePayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.AUTH_REQUEST, self.error_types.INVALID_PAYLOAD, writer)
return
# Извлекаем телефон из пакета
phone = payload.get("phone").replace("+", "").replace(" ", "").replace("-", "")
# Генерируем токен
token = secrets.token_urlsafe(128)
token_hash = hashlib.sha256(token.encode()).hexdigest()
# Время истечения токена
expires = int(time.time()) + 300
user_exists = False
# Ищем пользователя
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM users WHERE phone = %s", (phone,))
user = await cursor.fetchone()
# Получаем код через SMS шлюз или генерируем локально (безопасность прежде всего)
if self.config.sms_gateway_url:
code = await send_sms_code(self.config.sms_gateway_url, phone)
if code is None:
code = str(secrets.randbelow(900000) + 100000)
else:
code = str(secrets.randbelow(900000) + 100000)
# Хешируем
code_hash = hashlib.sha256(code.encode()).hexdigest()
# Сохраняем токен и если нужно отправляем код через тг
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
if user:
user_exists = True
# Сохраняем токен
await cursor.execute(
"INSERT INTO auth_tokens (phone, token_hash, code_hash, expires) VALUES (%s, %s, %s, %s)",
(phone, token_hash, code_hash, expires,)
)
# Если тг бот включен, и тг привязан к аккаунту - отправляем туда сообщение
if not self.config.sms_gateway_url and self.telegram_bot and user.get("telegram_id"):
await self.telegram_bot.send_code(chat_id=int(user.get("telegram_id")), phone=phone, code=code)
else:
# Пользователь не найден - сохраняем токен со state='register'
# чтобы после верификации кода направить на экран регистрации
await cursor.execute(
"INSERT INTO auth_tokens (phone, token_hash, code_hash, expires, state) VALUES (%s, %s, %s, %s, %s)",
(phone, token_hash, code_hash, expires, "register",)
)
# Данные пакета
payload = {
"requestMaxDuration": 60000,
"requestCountLeft": 10,
"altActionDuration": 60000,
"codeLength": 6,
"token": token
}
# Собираем пакет
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.AUTH_REQUEST, payload=payload
)
# Отправляем
await self._send(writer, packet)
self.logger.debug(f"Код для {phone}: {code} (существующий={user_exists})")
async def auth(self, payload, seq, writer, deviceType, deviceName):
"""Обработчик проверки кода"""
try:
VerifyCodePayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.AUTH, self.error_types.INVALID_PAYLOAD, writer)
return
# Извлекаем данные из пакета
code = payload.get("verifyCode")
token = payload.get("token")
# Хешируем токен с кодом
hashed_code = hashlib.sha256(code.encode()).hexdigest()
hashed_token = hashlib.sha256(token.encode()).hexdigest()
# Генерируем постоянный токен
login = secrets.token_urlsafe(128)
hashed_login = hashlib.sha256(login.encode()).hexdigest()
# Ищем токен с кодом
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
# Ищем токен
await cursor.execute(
"SELECT * FROM auth_tokens WHERE token_hash = %s AND expires > UNIX_TIMESTAMP()",
(hashed_token,)
)
stored_token = await cursor.fetchone()
# Если токен просрочен, или его нет - отправляем ошибку
if stored_token is None:
await self._send_error(seq, self.opcodes.AUTH, self.error_types.CODE_EXPIRED, writer)
return
# Проверяем код
if stored_token.get("code_hash") != hashed_code:
await self._send_error(seq, self.opcodes.AUTH, self.error_types.INVALID_CODE, writer)
return
# Если это новый пользователь - переводим токен в state='verified'
# и отдаём клиенту REGISTER токен, чтобы он показал экран ввода имени
if stored_token.get("state") == "register":
await cursor.execute(
"UPDATE auth_tokens SET state = %s WHERE token_hash = %s",
("verified", hashed_token,)
)
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.AUTH,
payload={
"tokenAttrs": {
"REGISTER": {
"token": token
}
},
"presetAvatars": []
}
)
await self._send(writer, packet)
return
# Ищем аккаунт
await cursor.execute("SELECT * FROM users WHERE phone = %s", (stored_token.get("phone"),))
account = await cursor.fetchone()
# Удаляем токен
await cursor.execute("DELETE FROM auth_tokens WHERE token_hash = %s", (hashed_token,))
# Создаем сессию
await cursor.execute(
"INSERT INTO tokens (phone, token_hash, device_type, device_name, location, time) VALUES (%s, %s, %s, %s, %s, %s)",
(stored_token.get("phone"), hashed_login, deviceType, deviceName, "Little Saint James Island", int(time.time()),) # весь покрытый зеленью, абсолютно весь, остров невезения в океане есть
)
# Генерируем профиль
# Аватарка с биографией
photoId = None if not account.get("avatar_id") else int(account.get("avatar_id"))
avatar_url = None if not photoId else self.config.avatar_base_url + photoId
description = None if not account.get("description") else account.get("description")
# Собираем данные пакета
payload = {
"tokenAttrs": {
"LOGIN": {
"token": login
}
},
"profile": self.tools.generate_profile(
id=account.get("id"),
phone=int(account.get("phone")),
avatarUrl=avatar_url,
photoId=photoId,
updateTime=int(account.get("updatetime")),
firstName=account.get("firstname"),
lastName=account.get("lastname"),
options=json.loads(account.get("options")),
description=description,
accountStatus=int(account.get("accountstatus")),
profileOptions=json.loads(account.get("profileoptions")),
includeProfileOptions=True,
username=account.get("username")
)
}
# Создаем пакет
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.AUTH, payload=payload
)
# Отправляем
await self._send(writer, packet)
async def auth_confirm(self, payload, seq, writer, deviceType, deviceName):
"""Обработчик подтверждения регистрации нового пользователя"""
# Валидируем данные пакета
try:
AuthConfirmRegisterPayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.AUTH_CONFIRM, self.error_types.INVALID_PAYLOAD, writer)
return
# Извлекаем данные из пакета
token = payload.get("token")
first_name = payload.get("firstName").strip()
last_name = (payload.get("lastName") or "").strip()
# Хешируем токен
hashed_token = hashlib.sha256(token.encode()).hexdigest()
# Генерируем постоянный логин-токен
login = secrets.token_urlsafe(128)
hashed_login = hashlib.sha256(login.encode()).hexdigest()
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
# Ищем токен - он должен быть в state='verified'
await cursor.execute(
"SELECT * FROM auth_tokens WHERE token_hash = %s AND expires > UNIX_TIMESTAMP() AND state = %s",
(hashed_token, "verified",)
)
stored_token = await cursor.fetchone()
# Если токен не найден или просрочен - отправляем ошибку
if stored_token is None:
await self._send_error(seq, self.opcodes.AUTH_CONFIRM, self.error_types.CODE_EXPIRED, writer)
return
phone = stored_token.get("phone")
# Проверяем что пользователь с таким телефоном ещё не существует
await cursor.execute("SELECT id FROM users WHERE phone = %s", (phone,))
if await cursor.fetchone():
await self._send_error(seq, self.opcodes.AUTH_CONFIRM, self.error_types.INVALID_PAYLOAD, writer)
return
now_ms = int(time.time() * 1000)
now_s = int(time.time())
# Создаем пользователя
await cursor.execute(
"""
INSERT INTO users
(id, phone, telegram_id, firstname, lastname, username,
profileoptions, options, accountstatus, updatetime, lastseen)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
self.tools.generate_id(), phone, None, first_name, last_name, None,
json.dumps([]), json.dumps(["TT", "ONEME"]),
0, str(now_ms), str(now_s),
)
)
user_id = cursor.lastrowid
# Добавляем данные аккаунта
await cursor.execute(
"""
INSERT INTO user_data
(phone, contacts, folders, user_config, chat_config)
VALUES (%s %s, %s, %s, %s)
""",
(
phone,
json.dumps([]),
json.dumps(self.static.USER_FOLDERS),
json.dumps(self.static.USER_SETTINGS),
json.dumps({}),
)
)
# Удаляем токен
await cursor.execute("DELETE FROM auth_tokens WHERE token_hash = %s", (hashed_token,))
# Создаем сессию
await cursor.execute(
"INSERT INTO tokens (phone, token_hash, device_type, device_name, location, time) VALUES (%s, %s, %s, %s, %s, %s)",
(phone, hashed_login, deviceType or "ANDROID", deviceName or "Unknown", "Little Saint James Island", now_s,)
)
# Генерируем профиль
profile = self.tools.generate_profile(
id=user_id,
phone=int(phone),
avatarUrl=None,
photoId=None,
updateTime=now_ms,
firstName=first_name,
lastName=last_name,
options=["ONEME"],
description=None,
accountStatus=0,
profileOptions=[],
includeProfileOptions=True,
username=None
)
# Собираем данные пакета
payload = {
"userToken": "0",
"profile": profile,
"tokenType": "LOGIN",
"token": login
}
# Создаем пакет
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.AUTH_CONFIRM, payload=payload
)
# Отправляем
await self._send(writer, packet)
self.logger.info(f"Новый пользователь зарегистрирован: phone={phone} id={user_id} name={first_name} {last_name}")
async def login(self, payload, seq, writer):
"""Обработчик авторизации клиента на сервере"""
# Валидируем данные пакета
try:
LoginPayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.LOGIN, self.error_types.INVALID_PAYLOAD, writer)
return
# Чаты, где состоит пользователь
chats = []
# Получаем данные из пакета
token = payload.get("token")
# Хешируем токен
hashed_token = hashlib.sha256(token.encode()).hexdigest()
# Ищем токен в бд
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM tokens WHERE token_hash = %s", (hashed_token,))
token_data = await cursor.fetchone()
# Если токен не найден, отправляем ошибку
if token_data is None:
await self._send_error(seq, self.opcodes.LOGIN, self.error_types.INVALID_TOKEN, writer)
return
# Ищем аккаунт пользователя в бд
await cursor.execute("SELECT * FROM users WHERE phone = %s", (token_data.get("phone"),))
user = await cursor.fetchone()
# Ищем данные пользователя в бд
await cursor.execute("SELECT * FROM user_data WHERE phone = %s", (token_data.get("phone"),))
user_data = await cursor.fetchone()
# Ищем все чаты, где состоит пользователь
await cursor.execute(
"SELECT * FROM chat_participants WHERE user_id = %s",
(user.get('id'))
)
user_chats = await cursor.fetchall()
for chat in user_chats:
chats.append(
chat.get("chat_id")
)
# Аватарка с биографией
photoId = None if not user.get("avatar_id") else int(user.get("avatar_id"))
avatar_url = None if not photoId else self.config.avatar_base_url + photoId
description = None if not user.get("description") else user.get("description")
# Генерируем профиль
profile = self.tools.generate_profile(
id=user.get("id"),
phone=int(user.get("phone")),
avatarUrl=avatar_url,
photoId=photoId,
updateTime=int(user.get("updatetime")),
firstName=user.get("firstname"),
lastName=user.get("lastname"),
options=json.loads(user.get("options")),
description=description,
accountStatus=int(user.get("accountstatus")),
profileOptions=json.loads(user.get("profileoptions")),
includeProfileOptions=True,
username=user.get("username")
)
chats = await self.tools.generate_chats(
chats, self.db_pool, user.get("id")
)
# Формируем данные пакета
payload = {
"profile": profile,
"chats": chats,
"chatMarker": 0,
"messages": {},
"contacts": [],
"presence": {},
"config": {
"server": self.server_config,
"user": json.loads(user_data.get("user_config"))
},
"token": token,
"videoChatHistory": False,
"time": int(time.time() * 1000)
}
# Собираем пакет
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.LOGIN, payload=payload
)
# Отправляем
await self._send(writer, packet)
return int(user.get("phone")), int(user.get("id")), hashed_token
async def logout(self, seq, writer, hashedToken):
"""Обработчик завершения сессии"""
# Удаляем токен из бд
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("DELETE FROM tokens WHERE token_hash = %s", (hashedToken,))
# Создаем пакет
response = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.LOGOUT, payload=None
)
# Отправляем
await self._send(writer, response)
async def assets_update(self, payload, seq, writer):
"""Обработчик запроса ассетов клиента на сервере"""
# Валидируем данные пакета
try:
AssetsPayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.ASSETS_UPDATE, self.error_types.INVALID_PAYLOAD, writer)
return
# TODO: сейчас это заглушка, а попозже нужно сделать полноценную реализацию
# Данные пакета
payload = {
"sections": [],
"sync": int(time.time() * 1000)
}
# Собираем пакет
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.ASSETS_UPDATE, payload=payload
)
# Отправляем
await self._send(writer, packet)
async def video_chat_history(self, payload, seq, writer):
"""Обработчик получения истории звонков"""
# Валидируем данные пакета
try:
GetCallHistoryPayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.VIDEO_CHAT_HISTORY, self.error_types.INVALID_PAYLOAD, writer)
return
# TODO: сейчас это заглушка, а попозже нужно сделать полноценную реализацию
# Данные пакета
payload = {
"hasMore": False,
"history": [],
"backwardMarker": 0,
"forwardMarker": 0
}
# Собираем пакет
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.VIDEO_CHAT_HISTORY, payload=payload
)
# Отправляем
await self._send(writer, packet)
async def msg_send(self, payload, seq, writer, senderId, db_pool):
"""Функция отправки сообщения"""
# Валидируем данные пакета
try:
SendMessagePayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.MSG_SEND, self.error_types.INVALID_PAYLOAD, writer)
return
# Извлекаем данные из пакета
userId = payload.get("userId")
chatId = payload.get("chatId")
message = payload.get("message")
elements = message.get("elements") or []
attaches = message.get("attaches") or []
cid = message.get("cid") or 0
text = message.get("text") or ""
# Вычисляем ID чата по ID пользователя и ID отправителя,
# в случае отсутствия ID чата
if not chatId:
chatId = userId ^ senderId
# Если клиент хочет отправить сообщение в избранное,
# то выставляем в качестве ID чата ID отправителя
# (А ещё используем это, если клиент вообще ничего не указал)
if chatId == 0 or not chatId:
chatId = senderId
participants = [senderId]
else:
# Если все таки клиент хочет отправить сообщение в нормальный чат,
# то ищем его в базе данных (извлекать список участников все таки тоже надо)
async with db_pool.acquire() as db_connection:
async with db_connection.cursor() as cursor:
await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,))
chat = await cursor.fetchone()
# Если нет такого чата - выбрасываем ошибку
if not chat:
await self._send_error(seq, self.opcodes.MSG_SEND, self.error_types.CHAT_NOT_FOUND, writer)
return
# Список участников
participants = await self.tools.get_chat_participants(chatId, db_pool)
# Проверяем, является ли отправитель участником чата
if int(senderId) not in participants:
await self._send_error(seq, self.opcodes.MSG_SEND, self.error_types.CHAT_NOT_ACCESS, writer)
return
# Добавляем сообщение в историю
messageId, lastMessageId, messageTime = await self.tools.insert_message(
chatId=chatId,
senderId=senderId,
text=text,
attaches=attaches,
elements=elements,
cid=cid,
type="USER",
db_pool=self.db_pool
)
# Готовое тело сообщения
bodyMessage = {
"id": messageId,
"time": messageTime,
"type": "USER",
"sender": senderId,
"cid": cid,
"text": text,
"attaches": attaches,
"elements": elements
}
# Отправляем событие всем участникам чата
for participant in participants:
await self.event(
participant,
{
"eventType": "new_msg",
"chatId": 0 if chatId == senderId else chatId,
"message": bodyMessage,
"prevMessageId": lastMessageId,
"time": messageTime,
"writer": writer
}
)
# Данные пакета
payload = {
"chatId": 0 if chatId == senderId else chatId,
"message": bodyMessage,
"unread": 0,
"mark": messageTime
}
# Собираем пакет
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.MSG_SEND, payload=payload
)
# Отправляем
await self._send(writer, packet)
async def folders_get(self, payload, seq, writer, senderPhone):
"""Синхронизация папок с сервером"""
# Валидируем данные пакета
try:
SyncFoldersPayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.FOLDERS_GET, self.error_types.INVALID_PAYLOAD, writer)
return
# Ищем папки в бд
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT folders FROM user_data WHERE phone = %s", (int(senderPhone),))
result_folders = await cursor.fetchone()
user_folders = json.loads(result_folders.get("folders"))
# Создаем данные пакета
payload = {
"folderSync": int(time.time() * 1000),
"folders": self.static.ALL_CHAT_FOLDER + user_folders.get("folders"),
"foldersOrder": self.static.ALL_CHAT_FOLDER_ORDER + user_folders.get("foldersOrder"),
"allFilterExcludeFolders": user_folders.get("allFilterExcludeFolders")
}
# Собираем пакет
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.FOLDERS_GET, payload=payload
)
# Отправляем
await self._send(writer, packet)
async def sessions_info(self, payload, seq, writer, senderPhone, hashedToken):
"""Получение активных сессий на аккаунте"""
# Готовый список сессий
sessions = []
# Ищем сессии в бд
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM tokens WHERE phone = %s", (str(senderPhone),))
user_sessions = await cursor.fetchall()
# Собираем сессии в список
for session in user_sessions:
sessions.append(
{
"time": int(session.get("time")),
"client": f"MAX {session.get('device_type')}",
"info": session.get("device_name"),
"location": session.get("location"),
"current": True if session.get("token_hash") == hashedToken else False
}
)
# Создаем данные пакета
payload = {
"sessions": sessions
}
# Создаем пакет
response = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.SESSIONS_INFO, payload=payload
)
# Отправляем
await self._send(writer, response)
async def contact_info(self, payload, seq, writer):
"""Поиск пользователей по ID"""
# Валидируем данные пакета
try:
SearchUsersPayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.CONTACT_INFO, self.error_types.INVALID_PAYLOAD, writer)
return
# Итоговый список пользователей
users = []
# ID пользователей, которые нам предстоит найти
contactIds = payload.get("contactIds")
# Ищем пользователей в бд
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
for contactId in contactIds:
await cursor.execute("SELECT * FROM users WHERE id = %s", (contactId,))
user = await cursor.fetchone()
# Если такой пользователь есть, добавляем его в список
if user:
# Аватарка с биографией
photoId = None if not user.get("avatar_id") else int(user.get("avatar_id"))
avatar_url = None if not photoId else self.config.avatar_base_url + photoId
description = None if not user.get("description") else user.get("description")
# Генерируем профиль
users.append(
self.tools.generate_profile(
id=user.get("id"),
phone=int(user.get("phone")),
avatarUrl=avatar_url,
photoId=photoId,
updateTime=int(user.get("updatetime")),
firstName=user.get("firstname"),
lastName=user.get("lastname"),
options=json.loads(user.get("options")),
description=description,
accountStatus=int(user.get("accountstatus")),
profileOptions=json.loads(user.get("profileoptions")),
includeProfileOptions=False,
username=user.get("username")
)
)
# Создаем данные пакета
payload = {
"contacts": users
}
# Создаем пакет
response = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CONTACT_INFO, payload=payload
)
# Отправляем
await self._send(writer, response)
async def chat_info(self, payload, seq, writer, senderId):
"""Поиск чатов по ID"""
# Валидируем данные пакета
try:
SearchChatsPayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.CHAT_INFO, self.error_types.INVALID_PAYLOAD, writer)
return
# Итоговый список чатов
chats = []
# ID чатов, которые нам предстоит найти
chatIds = payload.get("chatIds")
# Ищем чаты в бд
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
for chatId in chatIds:
if chatId != 0:
await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,))
chat = await cursor.fetchone()
if chat:
# Проверяем, является ли пользователь участником чата
participants = await self.tools.get_chat_participants(chatId, self.db_pool)
# (в max нельзя смотреть и отправлять сообщения в чат, в котором ты не участник, в отличие от tg (например, комментарии в каналах),
# так что надо тоже так делать)
if int(senderId) not in participants:
continue
# Получаем последнее сообщение из чата
message, messageTime = await self.tools.get_last_message(
chatId, self.db_pool
)
# Добавляем чат в список
chats.append(
self.tools.generate_chat(
chatId, chat.get("owner"),
chat.get("type"), participants,
message, messageTime
)
)
else:
# Получаем последнее сообщение из чата
message, messageTime = await self.tools.get_last_message(
senderId, self.db_pool
)
# ID избранного
chatId = senderId ^ senderId
# Добавляем чат в список
chats.append(
self.tools.generate_chat(
chatId, senderId,
"DIALOG", [senderId],
message, messageTime
)
)
# Создаем данные пакета
payload = {
"chats": chats
}
# Собираем пакет
response = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CHAT_INFO, payload=payload
)
# Отправляем
await self._send(writer, response)
async def contact_info_by_phone(self, payload, seq, writer, senderId):
"""Поиск по номеру телефона"""
# Валидируем данные пакета
try:
SearchByPhonePayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.CONTACT_INFO_BY_PHONE, self.error_types.INVALID_PAYLOAD, writer)
return
# Ищем пользователя в бд
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM users WHERE phone = %s", (int(payload.get("phone")),))
user = await cursor.fetchone()
# Если пользователь не найден, отправляем ошибку
if not user:
await self._send_error(seq, self.opcodes.CONTACT_INFO_BY_PHONE, self.error_types.USER_NOT_FOUND, writer)
return
# ID чата
chatId = senderId ^ user.get("id")
# Ищем диалог в бд
await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,))
chat = await cursor.fetchone()
# Если диалога нет - создаем
if not chat:
await cursor.execute(
"INSERT INTO chats (id, owner, type) VALUES (%s, %s, %s)",
(chatId, senderId, "DIALOG")
)
# Добавляем участников в таблицу chat_participants
participants = [int(senderId), int(user.get("id"))]
for user_id in participants:
await cursor.execute(
"INSERT INTO chat_participants (chat_id, user_id) VALUES (%s, %s)",
(chatId, user_id)
)
# Аватарка с биографией
photoId = None if not user.get("avatar_id") else int(user.get("avatar_id"))
avatar_url = None if not photoId else self.config.avatar_base_url + photoId
description = None if not user.get("description") else user.get("description")
# Генерируем профиль
profile = self.tools.generate_profile(
id=user.get("id"),
phone=int(user.get("phone")),
avatarUrl=avatar_url,
photoId=photoId,
updateTime=int(user.get("updatetime")),
firstName=user.get("firstname"),
lastName=user.get("lastname"),
options=json.loads(user.get("options")),
description=description,
accountStatus=int(user.get("accountstatus")),
profileOptions=json.loads(user.get("profileoptions")),
includeProfileOptions=False,
username=user.get("username")
)
# Создаем данные пакета
payload = {
"contact": profile
}
# Создаем пакет
response = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CONTACT_INFO_BY_PHONE, payload=payload
)
# Отправляем
await self._send(writer, response)
async def ok_token(self, payload, seq, writer):
"""Получение токена для звонка"""
# Валидируем данные пакета
try:
GetCallTokenPayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.OK_TOKEN, self.error_types.INVALID_PAYLOAD, writer)
return
# TODO: когда-то взяться за звонки
# Данные пакета
payload = {
"token": secrets.token_urlsafe(128),
"token_lifetime_ts": int(time.time() * 1000),
"token_refresh_ts": int(time.time() * 1000)
}
# Создаем пакет
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.OK_TOKEN, payload=payload
)
# Отправляем
await self._send(writer, packet)
async def msg_typing(self, payload, seq, writer, senderId):
"""Обработчик события печатания"""
# Валидируем данные пакета
try:
TypingPayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.MSG_TYPING, self.error_types.INVALID_PAYLOAD, writer)
return
# Извлекаем данные из пакета
chatId = payload.get("chatId")
type = payload.get("type") or "TYPING"
# Ищем чат в базе данных
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,))
chat = await cursor.fetchone()
# Если чат не найден, отправляем ошибку
if not chat:
await self._send_error(seq, self.opcodes.MSG_TYPING, self.error_types.CHAT_NOT_FOUND, writer)
return
# Участники чата
participants = await self.tools.get_chat_participants(chatId, self.db_pool)
# Проверяем, является ли отправитель участником чата
if int(senderId) not in participants:
await self._send_error(seq, self.opcodes.MSG_TYPING, self.error_types.CHAT_NOT_ACCESS, writer)
return
# Рассылаем событие участникам чата
for participant in participants:
if participant != senderId:
# Если участник не является отправителем, отправляем
await self.event(
participant,
{
"eventType": "typing",
"chatId": chatId,
"type": type,
"userId": senderId
}
)
# Создаем пакет
packet = self.proto.pack_packet(
seq=seq, opcode=self.opcodes.MSG_TYPING
)
# Отправляем пакет
await self._send(writer, packet)
async def complain_reasons_get(self, payload, seq, writer):
"""Обработчик получения причин жалоб"""
# Валидируем данные пакета
try:
ComplainReasonsGetPayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.COMPLAIN_REASONS_GET, self.error_types.INVALID_PAYLOAD, writer)
return
# Собираем данные пакета
payload = {
"complains": self.static.COMPLAIN_REASONS,
"complainSync": int(time.time())
}
# Создаем пакет
packet = self.proto.pack_packet(
seq=seq, opcode=self.opcodes.COMPLAIN_REASONS_GET, payload=payload
)
# Отправляем пакет
await self._send(writer, packet)
async def chat_history(self, payload, seq, writer, senderId):
"""Обработчик получения истории чата"""
# Валидируем данные пакета
try:
ChatHistoryPayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.CHAT_HISTORY, self.error_types.INVALID_PAYLOAD, writer)
return
# Извлекаем данные из пакета
chatId = payload.get("chatId")
forward = payload.get("forward", 0)
backward = payload.get("backward", 0)
from_time = payload.get("from", 0)
getMessages = payload.get("getMessages", True)
messages = []
# Если пользователь хочет получить историю из избранного,
# то выставляем в качестве ID чата его ID
if chatId == 0:
chatId = senderId
# Проверяем, существует ли чат
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
# Проверяем состоит ли пользователь в чате,
# только в случае того, если это не избранное
if chatId != senderId:
await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,))
chat = await cursor.fetchone()
# Выбрасываем ошибку, если чата нет
if not chat:
await self._send_error(seq, self.opcodes.CHAT_HISTORY, self.error_types.CHAT_NOT_FOUND, writer)
return
# Проверяем, является ли пользователь участником чата
participants = await self.tools.get_chat_participants(chatId, self.db_pool)
if int(senderId) not in participants:
await self._send_error(seq, self.opcodes.CHAT_HISTORY, self.error_types.CHAT_NOT_ACCESS, writer)
return
# Если запрошены сообщения
if getMessages:
if backward > 0:
await cursor.execute(
"SELECT * FROM messages WHERE chat_id = %s AND time < %s ORDER BY time ASC LIMIT %s",
(chatId, from_time, backward)
)
result = await cursor.fetchall()
for row in result:
messages.append({
"id": row.get("id"),
"time": int(row.get("time")),
"type": row.get("type"),
"sender": row.get("sender"),
"text": row.get("text"),
"attaches": json.loads(row.get("attaches")),
"elements": json.loads(row.get("elements")),
"reactionInfo": {}
})
if forward > 0:
await cursor.execute(
"SELECT * FROM messages WHERE chat_id = %s AND time > %s ORDER BY time ASC LIMIT %s",
(chatId, from_time, forward)
)
result = await cursor.fetchall()
for row in result:
messages.append({
"id": row.get("id"),
"time": int(row.get("time")),
"type": row.get("type"),
"sender": row.get("sender"),
"text": row.get("text"),
"attaches": json.loads(row.get("attaches")),
"elements": json.loads(row.get("elements")),
"reactionInfo": {}
})
# Сортируем сообщения по времени
messages.sort(key=lambda x: x["time"])
# Формируем ответ
payload = {
"messages": messages
}
# Собираем пакет
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CHAT_HISTORY, payload=payload
)
# Отправялем
await self._send(writer, packet)
async def profile(self, payload, seq, writer, userId, userPhone):
# Валидируем входные данные
try:
UpdateProfilePayloadModel.model_validate(payload)
except Exception as e:
await self._send_error(seq, self.opcodes.PROFILE, self.error_types.INVALID_PAYLOAD, writer)
return
# Извлекаем поля из пакета (каждое может быть None)
description = payload.get("description")
firstName = payload.get("firstName")
lastName = payload.get("lastName")
# Обновляем только те поля, которые пришли в запросе
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
if description is not None:
# При изменении описания также обновляем время последнего изменения профиля
await cursor.execute(
"UPDATE users SET description = %s, updatetime = %s WHERE id = %s",
(description, int(time.time() * 1000), userId)
)
if firstName is not None:
await cursor.execute(
"UPDATE users SET firstname = %s WHERE id = %s",
(firstName, userId)
)
if lastName is not None:
await cursor.execute(
"UPDATE users SET lastname = %s WHERE id = %s",
(lastName, userId)
)
# Получаем актуальные данные пользователя после обновления
await cursor.execute("SELECT * FROM users WHERE id = %s", (userId,))
user = await cursor.fetchone()
# Формируем URL аватарки если она есть
photoId = None if not user.get("avatar_id") else int(user.get("avatar_id"))
avatar_url = None if not photoId else self.config.avatar_base_url + str(photoId)
# Генерируем профиль для отправки клиенту
profile = self.tools.generate_profile(
id=user.get("id"),
phone=int(user.get("phone")),
avatarUrl=avatar_url,
photoId=photoId,
updateTime=int(user.get("updatetime")),
firstName=user.get("firstname"),
lastName=user.get("lastname"),
options=json.loads(user.get("options")),
description=user.get("description"),
accountStatus=int(user.get("accountstatus")),
profileOptions=json.loads(user.get("profileoptions")),
includeProfileOptions=True,
username=user.get("username")
)
# Данные пакета
payload = {
"profile": profile
}
# Отправляем ответ на запрос (CMD_OK)
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.PROFILE, payload=payload
)
await self._send(writer, packet)
# Отправляем всем сессиям о изменении профиля
await self.event(
user.get('id'),
{
"eventType": "profile_updated",
"profile": profile,
"writer": writer
}
)
async def chat_subscribe(self, payload, seq, writer):
# Валидируем входные данные
try:
ChatSubscribePayloadModel.model_validate(payload)
except Exception as e:
await self._send_error(seq, self.opcodes.CHAT_SUBSCRIBE, self.error_types.INVALID_PAYLOAD, writer)
return
# Созадаем пакет
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CHAT_SUBSCRIBE, payload=None
)
# Отправялем
await self._send(writer, packet)