From e5607adb9b56d4322ef908f8d7e14663b9112efe Mon Sep 17 00:00:00 2001 From: Alexey Polyakov Date: Sun, 29 Mar 2026 11:04:45 +0300 Subject: [PATCH] =?UTF-8?q?MAX:=20=D0=A0=D0=B0=D0=B7=D0=B4=D0=B5=D0=BB?= =?UTF-8?q?=D0=B8=D0=BB=20=D0=BF=D1=80=D0=BE=D1=86=D0=B5=D1=81=D1=81=D0=BE?= =?UTF-8?q?=D1=80=D1=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/oneme/processors.py | 1298 ----------------------------- src/oneme/processors/__init__.py | 26 + src/oneme/processors/assets.py | 31 + src/oneme/processors/auth.py | 454 ++++++++++ src/oneme/processors/calls.py | 64 ++ src/oneme/processors/chats.py | 20 + src/oneme/processors/complains.py | 29 + src/oneme/processors/folders.py | 39 + src/oneme/processors/history.py | 106 +++ src/oneme/processors/main.py | 130 +++ src/oneme/processors/messages.py | 168 ++++ src/oneme/processors/search.py | 205 +++++ src/oneme/processors/sessions.py | 38 + src/oneme/socket.py | 4 +- 14 files changed, 1312 insertions(+), 1300 deletions(-) delete mode 100644 src/oneme/processors.py create mode 100644 src/oneme/processors/__init__.py create mode 100644 src/oneme/processors/assets.py create mode 100644 src/oneme/processors/auth.py create mode 100644 src/oneme/processors/calls.py create mode 100644 src/oneme/processors/chats.py create mode 100644 src/oneme/processors/complains.py create mode 100644 src/oneme/processors/folders.py create mode 100644 src/oneme/processors/history.py create mode 100644 src/oneme/processors/main.py create mode 100644 src/oneme/processors/messages.py create mode 100644 src/oneme/processors/search.py create mode 100644 src/oneme/processors/sessions.py diff --git a/src/oneme/processors.py b/src/oneme/processors.py deleted file mode 100644 index 2b76391..0000000 --- a/src/oneme/processors.py +++ /dev/null @@ -1,1298 +0,0 @@ -import json -import secrets -import hashlib -import time -import logging -from oneme.models import * -from common.proto_tcp import MobileProto -from common.proto_web import WebProto -from common.opcodes import Opcodes -from oneme.config import OnemeConfig -from common.tools import Tools -from common.config import ServerConfig -from common.static import Static -from common.sms import send_sms_code - -class Processors: - def __init__(self, db_pool=None, clients={}, send_event=None, telegram_bot=None, type="socket"): - self.tools = Tools() - self.config = ServerConfig() - self.static = Static() - self.opcodes = Opcodes() - self.server_config = OnemeConfig().SERVER_CONFIG - self.error_types = self.static.ErrorTypes() - self.chat_types = self.static.ChatTypes() - - self.db_pool = db_pool - self.event = send_event - self.clients = clients - self.telegram_bot = telegram_bot - self.logger = logging.getLogger(__name__) - - if type == "socket": - self.proto = MobileProto() - elif type == "web": - self.proto = WebProto() - - async def _send(self, writer, packet): - try: - writer.write(packet) - await writer.drain() - except Exception as error: - self.logger.error(f"Ошибка при отправке пакета - {error}") - - async def _send_error(self, seq, opcode, type, writer): - payload = self.static.ERROR_TYPES.get(type, { - "localizedMessage": "Неизвестная ошибка", - "error": "unknown.error", - "message": "Unknown error", - "title": "Неизвестная ошибка" - }) - - packet = self.proto.pack_packet( - cmd=self.proto.CMD_ERR, seq=seq, opcode=opcode, payload=payload - ) - - await self._send(writer, packet) - - async def session_init(self, payload, seq, writer): - """Обработчик приветствия""" - # Валидируем данные пакета - try: - HelloPayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.SESSION_INIT, self.error_types.INVALID_PAYLOAD, writer) - return None, None - - # Получаем данные из пакета - deviceType = payload.get("userAgent").get("deviceType") - deviceName = payload.get("userAgent").get("deviceName") - - # Данные пакета - payload = { - "location": "RU", - "app-update-type": 0, # 1 = принудительное обновление - "reg-country-code": self.static.REG_COUNTRY_CODES, - "phone-auto-complete-enabled": False, - "lang": True - } - - # Собираем пакет - packet = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.SESSION_INIT, payload=payload - ) - - # Отправляем - await self._send(writer, packet) - return deviceType, deviceName - - async def ping(self, payload, seq, writer): - """Обработчик пинга""" - # Валидируем данные пакета - try: - PingPayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.PING, self.error_types.INVALID_PAYLOAD, writer) - return - - # Собираем пакет - response = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.PING, payload=None - ) - - # Отправляем - await self._send(writer, response) - - async def log(self, payload, seq, writer): - """Обработчик телеметрии""" - # TODO: можно было бы реализовать валидацию телеметрии, но сейчас это не особо важно - - # Собираем пакет - response = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.LOG, payload=None - ) - - # Отправляем - await self._send(writer, response) - - async def auth_request(self, payload, seq, writer): - """Обработчик запроса кода""" - try: - RequestCodePayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.AUTH_REQUEST, self.error_types.INVALID_PAYLOAD, writer) - return - - # Извлекаем телефон из пакета - phone = payload.get("phone").replace("+", "").replace(" ", "").replace("-", "") - - # Генерируем токен - token = secrets.token_urlsafe(128) - token_hash = hashlib.sha256(token.encode()).hexdigest() - - # Время истечения токена - expires = int(time.time()) + 300 - - user_exists = False - - # Ищем пользователя - async with self.db_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT * FROM users WHERE phone = %s", (phone,)) - user = await cursor.fetchone() - - # Получаем код через SMS шлюз или генерируем локально (безопасность прежде всего) - if self.config.sms_gateway_url: - code = await send_sms_code(self.config.sms_gateway_url, phone) - - if code is None: - code = str(secrets.randbelow(900000) + 100000) - else: - code = str(secrets.randbelow(900000) + 100000) - - # Хешируем - code_hash = hashlib.sha256(code.encode()).hexdigest() - - # Сохраняем токен и если нужно отправляем код через тг - async with self.db_pool.acquire() as conn: - async with conn.cursor() as cursor: - if user: - user_exists = True - # Сохраняем токен - await cursor.execute( - "INSERT INTO auth_tokens (phone, token_hash, code_hash, expires) VALUES (%s, %s, %s, %s)", - (phone, token_hash, code_hash, expires,) - ) - - # Если тг бот включен, и тг привязан к аккаунту - отправляем туда сообщение - if not self.config.sms_gateway_url and self.telegram_bot and user.get("telegram_id"): - await self.telegram_bot.send_code(chat_id=int(user.get("telegram_id")), phone=phone, code=code) - else: - # Пользователь не найден - сохраняем токен со state='register' - # чтобы после верификации кода направить на экран регистрации - await cursor.execute( - "INSERT INTO auth_tokens (phone, token_hash, code_hash, expires, state) VALUES (%s, %s, %s, %s, %s)", - (phone, token_hash, code_hash, expires, "register",) - ) - - # Данные пакета - payload = { - "requestMaxDuration": 60000, - "requestCountLeft": 10, - "altActionDuration": 60000, - "codeLength": 6, - "token": token - } - - # Собираем пакет - packet = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.AUTH_REQUEST, payload=payload - ) - - # Отправляем - await self._send(writer, packet) - self.logger.debug(f"Код для {phone}: {code} (существующий={user_exists})") - - async def auth(self, payload, seq, writer, deviceType, deviceName): - """Обработчик проверки кода""" - try: - VerifyCodePayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.AUTH, self.error_types.INVALID_PAYLOAD, writer) - return - - # Извлекаем данные из пакета - code = payload.get("verifyCode") - token = payload.get("token") - - # Хешируем токен с кодом - hashed_code = hashlib.sha256(code.encode()).hexdigest() - hashed_token = hashlib.sha256(token.encode()).hexdigest() - - # Генерируем постоянный токен - login = secrets.token_urlsafe(128) - hashed_login = hashlib.sha256(login.encode()).hexdigest() - - # Ищем токен с кодом - async with self.db_pool.acquire() as conn: - async with conn.cursor() as cursor: - # Ищем токен - await cursor.execute( - "SELECT * FROM auth_tokens WHERE token_hash = %s AND expires > UNIX_TIMESTAMP()", - (hashed_token,) - ) - stored_token = await cursor.fetchone() - - # Если токен просрочен, или его нет - отправляем ошибку - if stored_token is None: - await self._send_error(seq, self.opcodes.AUTH, self.error_types.CODE_EXPIRED, writer) - return - - # Проверяем код - if stored_token.get("code_hash") != hashed_code: - await self._send_error(seq, self.opcodes.AUTH, self.error_types.INVALID_CODE, writer) - return - - # Если это новый пользователь - переводим токен в state='verified' - # и отдаём клиенту REGISTER токен, чтобы он показал экран ввода имени - if stored_token.get("state") == "register": - await cursor.execute( - "UPDATE auth_tokens SET state = %s WHERE token_hash = %s", - ("verified", hashed_token,) - ) - packet = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.AUTH, - payload={ - "tokenAttrs": { - "REGISTER": { - "token": token - } - }, - "presetAvatars": [] - } - ) - await self._send(writer, packet) - return - - # Ищем аккаунт - await cursor.execute("SELECT * FROM users WHERE phone = %s", (stored_token.get("phone"),)) - account = await cursor.fetchone() - - # Удаляем токен - await cursor.execute("DELETE FROM auth_tokens WHERE token_hash = %s", (hashed_token,)) - - # Создаем сессию - await cursor.execute( - "INSERT INTO tokens (phone, token_hash, device_type, device_name, location, time) VALUES (%s, %s, %s, %s, %s, %s)", - (stored_token.get("phone"), hashed_login, deviceType, deviceName, "Little Saint James Island", int(time.time()),) # весь покрытый зеленью, абсолютно весь, остров невезения в океане есть - ) - - # Генерируем профиль - # Аватарка с биографией - photoId = None if not account.get("avatar_id") else int(account.get("avatar_id")) - avatar_url = None if not photoId else self.config.avatar_base_url + photoId - description = None if not account.get("description") else account.get("description") - - # Собираем данные пакета - payload = { - "tokenAttrs": { - "LOGIN": { - "token": login - } - }, - "profile": self.tools.generate_profile( - id=account.get("id"), - phone=int(account.get("phone")), - avatarUrl=avatar_url, - photoId=photoId, - updateTime=int(account.get("updatetime")), - firstName=account.get("firstname"), - lastName=account.get("lastname"), - options=json.loads(account.get("options")), - description=description, - accountStatus=int(account.get("accountstatus")), - profileOptions=json.loads(account.get("profileoptions")), - includeProfileOptions=True, - username=account.get("username") - ) - } - - # Создаем пакет - packet = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.AUTH, payload=payload - ) - - # Отправляем - await self._send(writer, packet) - - async def auth_confirm(self, payload, seq, writer, deviceType, deviceName): - """Обработчик подтверждения регистрации нового пользователя""" - # Валидируем данные пакета - try: - AuthConfirmRegisterPayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.AUTH_CONFIRM, self.error_types.INVALID_PAYLOAD, writer) - return - - # Извлекаем данные из пакета - token = payload.get("token") - first_name = payload.get("firstName").strip() - last_name = (payload.get("lastName") or "").strip() - - # Хешируем токен - hashed_token = hashlib.sha256(token.encode()).hexdigest() - - # Генерируем постоянный логин-токен - login = secrets.token_urlsafe(128) - hashed_login = hashlib.sha256(login.encode()).hexdigest() - - async with self.db_pool.acquire() as conn: - async with conn.cursor() as cursor: - # Ищем токен - он должен быть в state='verified' - await cursor.execute( - "SELECT * FROM auth_tokens WHERE token_hash = %s AND expires > UNIX_TIMESTAMP() AND state = %s", - (hashed_token, "verified",) - ) - stored_token = await cursor.fetchone() - - # Если токен не найден или просрочен - отправляем ошибку - if stored_token is None: - await self._send_error(seq, self.opcodes.AUTH_CONFIRM, self.error_types.CODE_EXPIRED, writer) - return - - phone = stored_token.get("phone") - - # Проверяем что пользователь с таким телефоном ещё не существует - await cursor.execute("SELECT id FROM users WHERE phone = %s", (phone,)) - if await cursor.fetchone(): - await self._send_error(seq, self.opcodes.AUTH_CONFIRM, self.error_types.INVALID_PAYLOAD, writer) - return - - now_ms = int(time.time() * 1000) - now_s = int(time.time()) - - # Создаем пользователя - await cursor.execute( - """ - INSERT INTO users - (id, phone, telegram_id, firstname, lastname, username, - profileoptions, options, accountstatus, updatetime, lastseen) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - """, - ( - self.tools.generate_id(), phone, None, first_name, last_name, None, - json.dumps([]), json.dumps(["TT", "ONEME"]), - 0, str(now_ms), str(now_s), - ) - ) - - user_id = cursor.lastrowid - - # Добавляем данные аккаунта - await cursor.execute( - """ - INSERT INTO user_data - (phone, contacts, folders, user_config, chat_config) - VALUES (%s %s, %s, %s, %s) - """, - ( - phone, - json.dumps([]), - json.dumps(self.static.USER_FOLDERS), - json.dumps(self.static.USER_SETTINGS), - json.dumps({}), - ) - ) - - # Удаляем токен - await cursor.execute("DELETE FROM auth_tokens WHERE token_hash = %s", (hashed_token,)) - - # Создаем сессию - await cursor.execute( - "INSERT INTO tokens (phone, token_hash, device_type, device_name, location, time) VALUES (%s, %s, %s, %s, %s, %s)", - (phone, hashed_login, deviceType or "ANDROID", deviceName or "Unknown", "Little Saint James Island", now_s,) - ) - - # Генерируем профиль - profile = self.tools.generate_profile( - id=user_id, - phone=int(phone), - avatarUrl=None, - photoId=None, - updateTime=now_ms, - firstName=first_name, - lastName=last_name, - options=["ONEME"], - description=None, - accountStatus=0, - profileOptions=[], - includeProfileOptions=True, - username=None - ) - - # Собираем данные пакета - payload = { - "userToken": "0", - "profile": profile, - "tokenType": "LOGIN", - "token": login - } - - # Создаем пакет - packet = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.AUTH_CONFIRM, payload=payload - ) - - # Отправляем - await self._send(writer, packet) - self.logger.info(f"Новый пользователь зарегистрирован: phone={phone} id={user_id} name={first_name} {last_name}") - - async def login(self, payload, seq, writer): - """Обработчик авторизации клиента на сервере""" - # Валидируем данные пакета - try: - LoginPayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.LOGIN, self.error_types.INVALID_PAYLOAD, writer) - return - - # Чаты, где состоит пользователь - chats = [] - - # Получаем данные из пакета - token = payload.get("token") - - # Хешируем токен - hashed_token = hashlib.sha256(token.encode()).hexdigest() - - # Ищем токен в бд - async with self.db_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT * FROM tokens WHERE token_hash = %s", (hashed_token,)) - token_data = await cursor.fetchone() - - # Если токен не найден, отправляем ошибку - if token_data is None: - await self._send_error(seq, self.opcodes.LOGIN, self.error_types.INVALID_TOKEN, writer) - return - - # Ищем аккаунт пользователя в бд - await cursor.execute("SELECT * FROM users WHERE phone = %s", (token_data.get("phone"),)) - user = await cursor.fetchone() - - # Ищем данные пользователя в бд - await cursor.execute("SELECT * FROM user_data WHERE phone = %s", (token_data.get("phone"),)) - user_data = await cursor.fetchone() - - # Ищем все чаты, где состоит пользователь - await cursor.execute( - "SELECT * FROM chat_participants WHERE user_id = %s", - (user.get('id')) - ) - user_chats = await cursor.fetchall() - - for chat in user_chats: - chats.append( - chat.get("chat_id") - ) - - # Аватарка с биографией - photoId = None if not user.get("avatar_id") else int(user.get("avatar_id")) - avatar_url = None if not photoId else self.config.avatar_base_url + photoId - description = None if not user.get("description") else user.get("description") - - # Генерируем профиль - profile = self.tools.generate_profile( - id=user.get("id"), - phone=int(user.get("phone")), - avatarUrl=avatar_url, - photoId=photoId, - updateTime=int(user.get("updatetime")), - firstName=user.get("firstname"), - lastName=user.get("lastname"), - options=json.loads(user.get("options")), - description=description, - accountStatus=int(user.get("accountstatus")), - profileOptions=json.loads(user.get("profileoptions")), - includeProfileOptions=True, - username=user.get("username") - ) - - chats = await self.tools.generate_chats( - chats, self.db_pool, user.get("id") - ) - - # Формируем данные пакета - payload = { - "profile": profile, - "chats": chats, - "chatMarker": 0, - "messages": {}, - "contacts": [], - "presence": {}, - "config": { - "server": self.server_config, - "user": json.loads(user_data.get("user_config")) - }, - "token": token, - "videoChatHistory": False, - "time": int(time.time() * 1000) - } - - # Собираем пакет - packet = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.LOGIN, payload=payload - ) - - # Отправляем - await self._send(writer, packet) - return int(user.get("phone")), int(user.get("id")), hashed_token - - async def logout(self, seq, writer, hashedToken): - """Обработчик завершения сессии""" - # Удаляем токен из бд - async with self.db_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("DELETE FROM tokens WHERE token_hash = %s", (hashedToken,)) - - # Создаем пакет - response = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.LOGOUT, payload=None - ) - - # Отправляем - await self._send(writer, response) - - async def assets_update(self, payload, seq, writer): - """Обработчик запроса ассетов клиента на сервере""" - # Валидируем данные пакета - try: - AssetsPayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.ASSETS_UPDATE, self.error_types.INVALID_PAYLOAD, writer) - return - - # TODO: сейчас это заглушка, а попозже нужно сделать полноценную реализацию - - # Данные пакета - payload = { - "sections": [], - "sync": int(time.time() * 1000) - } - - # Собираем пакет - packet = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.ASSETS_UPDATE, payload=payload - ) - - # Отправляем - await self._send(writer, packet) - - async def video_chat_history(self, payload, seq, writer): - """Обработчик получения истории звонков""" - # Валидируем данные пакета - try: - GetCallHistoryPayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.VIDEO_CHAT_HISTORY, self.error_types.INVALID_PAYLOAD, writer) - return - - # TODO: сейчас это заглушка, а попозже нужно сделать полноценную реализацию - - # Данные пакета - payload = { - "hasMore": False, - "history": [], - "backwardMarker": 0, - "forwardMarker": 0 - } - - # Собираем пакет - packet = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.VIDEO_CHAT_HISTORY, payload=payload - ) - - # Отправляем - await self._send(writer, packet) - - async def msg_send(self, payload, seq, writer, senderId, db_pool): - """Функция отправки сообщения""" - # Валидируем данные пакета - try: - SendMessagePayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.MSG_SEND, self.error_types.INVALID_PAYLOAD, writer) - return - - # Извлекаем данные из пакета - userId = payload.get("userId") - chatId = payload.get("chatId") - message = payload.get("message") - - elements = message.get("elements") or [] - attaches = message.get("attaches") or [] - cid = message.get("cid") or 0 - text = message.get("text") or "" - - # Вычисляем ID чата по ID пользователя и ID отправителя, - # в случае отсутствия ID чата - if not chatId: - chatId = userId ^ senderId - - # Если клиент хочет отправить сообщение в избранное, - # то выставляем в качестве ID чата ID отправителя - # (А ещё используем это, если клиент вообще ничего не указал) - if chatId == 0 or not chatId: - chatId = senderId - participants = [senderId] - else: - # Если все таки клиент хочет отправить сообщение в нормальный чат, - # то ищем его в базе данных (извлекать список участников все таки тоже надо) - async with db_pool.acquire() as db_connection: - async with db_connection.cursor() as cursor: - await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,)) - chat = await cursor.fetchone() - - # Если нет такого чата - выбрасываем ошибку - if not chat: - await self._send_error(seq, self.opcodes.MSG_SEND, self.error_types.CHAT_NOT_FOUND, writer) - return - - # Список участников - participants = await self.tools.get_chat_participants(chatId, db_pool) - - # Проверяем, является ли отправитель участником чата - if int(senderId) not in participants: - await self._send_error(seq, self.opcodes.MSG_SEND, self.error_types.CHAT_NOT_ACCESS, writer) - return - - # Добавляем сообщение в историю - messageId, lastMessageId, messageTime = await self.tools.insert_message( - chatId=chatId, - senderId=senderId, - text=text, - attaches=attaches, - elements=elements, - cid=cid, - type="USER", - db_pool=self.db_pool - ) - - # Готовое тело сообщения - bodyMessage = { - "id": messageId, - "time": messageTime, - "type": "USER", - "sender": senderId, - "cid": cid, - "text": text, - "attaches": attaches, - "elements": elements - } - - # Отправляем событие всем участникам чата - for participant in participants: - await self.event( - participant, - { - "eventType": "new_msg", - "chatId": 0 if chatId == senderId else chatId, - "message": bodyMessage, - "prevMessageId": lastMessageId, - "time": messageTime, - "writer": writer - } - ) - - # Данные пакета - payload = { - "chatId": 0 if chatId == senderId else chatId, - "message": bodyMessage, - "unread": 0, - "mark": messageTime - } - - # Собираем пакет - packet = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.MSG_SEND, payload=payload - ) - - # Отправляем - await self._send(writer, packet) - - async def folders_get(self, payload, seq, writer, senderPhone): - """Синхронизация папок с сервером""" - # Валидируем данные пакета - try: - SyncFoldersPayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.FOLDERS_GET, self.error_types.INVALID_PAYLOAD, writer) - return - - # Ищем папки в бд - async with self.db_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT folders FROM user_data WHERE phone = %s", (int(senderPhone),)) - result_folders = await cursor.fetchone() - user_folders = json.loads(result_folders.get("folders")) - - # Создаем данные пакета - payload = { - "folderSync": int(time.time() * 1000), - "folders": self.static.ALL_CHAT_FOLDER + user_folders.get("folders"), - "foldersOrder": self.static.ALL_CHAT_FOLDER_ORDER + user_folders.get("foldersOrder"), - "allFilterExcludeFolders": user_folders.get("allFilterExcludeFolders") - } - - # Собираем пакет - packet = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.FOLDERS_GET, payload=payload - ) - - # Отправляем - await self._send(writer, packet) - - async def sessions_info(self, payload, seq, writer, senderPhone, hashedToken): - """Получение активных сессий на аккаунте""" - # Готовый список сессий - sessions = [] - - # Ищем сессии в бд - async with self.db_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT * FROM tokens WHERE phone = %s", (str(senderPhone),)) - user_sessions = await cursor.fetchall() - - # Собираем сессии в список - for session in user_sessions: - sessions.append( - { - "time": int(session.get("time")), - "client": f"MAX {session.get('device_type')}", - "info": session.get("device_name"), - "location": session.get("location"), - "current": True if session.get("token_hash") == hashedToken else False - } - ) - - # Создаем данные пакета - payload = { - "sessions": sessions - } - - # Создаем пакет - response = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.SESSIONS_INFO, payload=payload - ) - - # Отправляем - await self._send(writer, response) - - async def contact_info(self, payload, seq, writer): - """Поиск пользователей по ID""" - # Валидируем данные пакета - try: - SearchUsersPayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.CONTACT_INFO, self.error_types.INVALID_PAYLOAD, writer) - return - - # Итоговый список пользователей - users = [] - - # ID пользователей, которые нам предстоит найти - contactIds = payload.get("contactIds") - - # Ищем пользователей в бд - async with self.db_pool.acquire() as conn: - async with conn.cursor() as cursor: - for contactId in contactIds: - await cursor.execute("SELECT * FROM users WHERE id = %s", (contactId,)) - user = await cursor.fetchone() - - # Если такой пользователь есть, добавляем его в список - if user: - # Аватарка с биографией - photoId = None if not user.get("avatar_id") else int(user.get("avatar_id")) - avatar_url = None if not photoId else self.config.avatar_base_url + photoId - description = None if not user.get("description") else user.get("description") - - # Генерируем профиль - users.append( - self.tools.generate_profile( - id=user.get("id"), - phone=int(user.get("phone")), - avatarUrl=avatar_url, - photoId=photoId, - updateTime=int(user.get("updatetime")), - firstName=user.get("firstname"), - lastName=user.get("lastname"), - options=json.loads(user.get("options")), - description=description, - accountStatus=int(user.get("accountstatus")), - profileOptions=json.loads(user.get("profileoptions")), - includeProfileOptions=False, - username=user.get("username") - ) - ) - - # Создаем данные пакета - payload = { - "contacts": users - } - - # Создаем пакет - response = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CONTACT_INFO, payload=payload - ) - - # Отправляем - await self._send(writer, response) - - async def chat_info(self, payload, seq, writer, senderId): - """Поиск чатов по ID""" - # Валидируем данные пакета - try: - SearchChatsPayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.CHAT_INFO, self.error_types.INVALID_PAYLOAD, writer) - return - - # Итоговый список чатов - chats = [] - - # ID чатов, которые нам предстоит найти - chatIds = payload.get("chatIds") - - # Ищем чаты в бд - async with self.db_pool.acquire() as conn: - async with conn.cursor() as cursor: - for chatId in chatIds: - if chatId != 0: - await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,)) - chat = await cursor.fetchone() - - if chat: - # Проверяем, является ли пользователь участником чата - participants = await self.tools.get_chat_participants(chatId, self.db_pool) - # (в max нельзя смотреть и отправлять сообщения в чат, в котором ты не участник, в отличие от tg (например, комментарии в каналах), - # так что надо тоже так делать) - if int(senderId) not in participants: - continue - - # Получаем последнее сообщение из чата - message, messageTime = await self.tools.get_last_message( - chatId, self.db_pool - ) - - # Добавляем чат в список - chats.append( - self.tools.generate_chat( - chatId, chat.get("owner"), - chat.get("type"), participants, - message, messageTime - ) - ) - else: - # Получаем последнее сообщение из чата - message, messageTime = await self.tools.get_last_message( - senderId, self.db_pool - ) - - # ID избранного - chatId = senderId ^ senderId - - # Добавляем чат в список - chats.append( - self.tools.generate_chat( - chatId, senderId, - "DIALOG", [senderId], - message, messageTime - ) - ) - - # Создаем данные пакета - payload = { - "chats": chats - } - - # Собираем пакет - response = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CHAT_INFO, payload=payload - ) - - # Отправляем - await self._send(writer, response) - - async def contact_info_by_phone(self, payload, seq, writer, senderId): - """Поиск по номеру телефона""" - # Валидируем данные пакета - try: - SearchByPhonePayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.CONTACT_INFO_BY_PHONE, self.error_types.INVALID_PAYLOAD, writer) - return - - # Ищем пользователя в бд - async with self.db_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT * FROM users WHERE phone = %s", (int(payload.get("phone")),)) - user = await cursor.fetchone() - - # Если пользователь не найден, отправляем ошибку - if not user: - await self._send_error(seq, self.opcodes.CONTACT_INFO_BY_PHONE, self.error_types.USER_NOT_FOUND, writer) - return - - # ID чата - chatId = senderId ^ user.get("id") - - # Ищем диалог в бд - await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,)) - chat = await cursor.fetchone() - - # Если диалога нет - создаем - if not chat: - await cursor.execute( - "INSERT INTO chats (id, owner, type) VALUES (%s, %s, %s)", - (chatId, senderId, "DIALOG") - ) - - # Добавляем участников в таблицу chat_participants - participants = [int(senderId), int(user.get("id"))] - - for user_id in participants: - await cursor.execute( - "INSERT INTO chat_participants (chat_id, user_id) VALUES (%s, %s)", - (chatId, user_id) - ) - - # Аватарка с биографией - photoId = None if not user.get("avatar_id") else int(user.get("avatar_id")) - avatar_url = None if not photoId else self.config.avatar_base_url + photoId - description = None if not user.get("description") else user.get("description") - - # Генерируем профиль - profile = self.tools.generate_profile( - id=user.get("id"), - phone=int(user.get("phone")), - avatarUrl=avatar_url, - photoId=photoId, - updateTime=int(user.get("updatetime")), - firstName=user.get("firstname"), - lastName=user.get("lastname"), - options=json.loads(user.get("options")), - description=description, - accountStatus=int(user.get("accountstatus")), - profileOptions=json.loads(user.get("profileoptions")), - includeProfileOptions=False, - username=user.get("username") - ) - - # Создаем данные пакета - payload = { - "contact": profile - } - - # Создаем пакет - response = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CONTACT_INFO_BY_PHONE, payload=payload - ) - - # Отправляем - await self._send(writer, response) - - async def ok_token(self, payload, seq, writer): - """Получение токена для звонка""" - # Валидируем данные пакета - try: - GetCallTokenPayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.OK_TOKEN, self.error_types.INVALID_PAYLOAD, writer) - return - - # TODO: когда-то взяться за звонки - - # Данные пакета - payload = { - "token": secrets.token_urlsafe(128), - "token_lifetime_ts": int(time.time() * 1000), - "token_refresh_ts": int(time.time() * 1000) - } - - # Создаем пакет - packet = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.OK_TOKEN, payload=payload - ) - - # Отправляем - await self._send(writer, packet) - - async def msg_typing(self, payload, seq, writer, senderId): - """Обработчик события печатания""" - # Валидируем данные пакета - try: - TypingPayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.MSG_TYPING, self.error_types.INVALID_PAYLOAD, writer) - return - - # Извлекаем данные из пакета - chatId = payload.get("chatId") - type = payload.get("type") or "TYPING" - - # Ищем чат в базе данных - async with self.db_pool.acquire() as conn: - async with conn.cursor() as cursor: - await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,)) - chat = await cursor.fetchone() - - # Если чат не найден, отправляем ошибку - if not chat: - await self._send_error(seq, self.opcodes.MSG_TYPING, self.error_types.CHAT_NOT_FOUND, writer) - return - - # Участники чата - participants = await self.tools.get_chat_participants(chatId, self.db_pool) - - # Проверяем, является ли отправитель участником чата - if int(senderId) not in participants: - await self._send_error(seq, self.opcodes.MSG_TYPING, self.error_types.CHAT_NOT_ACCESS, writer) - return - - # Рассылаем событие участникам чата - for participant in participants: - if participant != senderId: - # Если участник не является отправителем, отправляем - await self.event( - participant, - { - "eventType": "typing", - "chatId": chatId, - "type": type, - "userId": senderId - } - ) - - # Создаем пакет - packet = self.proto.pack_packet( - seq=seq, opcode=self.opcodes.MSG_TYPING - ) - - # Отправляем пакет - await self._send(writer, packet) - - async def complain_reasons_get(self, payload, seq, writer): - """Обработчик получения причин жалоб""" - # Валидируем данные пакета - try: - ComplainReasonsGetPayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.COMPLAIN_REASONS_GET, self.error_types.INVALID_PAYLOAD, writer) - return - - # Собираем данные пакета - payload = { - "complains": self.static.COMPLAIN_REASONS, - "complainSync": int(time.time()) - } - - # Создаем пакет - packet = self.proto.pack_packet( - seq=seq, opcode=self.opcodes.COMPLAIN_REASONS_GET, payload=payload - ) - - # Отправляем пакет - await self._send(writer, packet) - - async def chat_history(self, payload, seq, writer, senderId): - """Обработчик получения истории чата""" - # Валидируем данные пакета - try: - ChatHistoryPayloadModel.model_validate(payload) - except pydantic.ValidationError as error: - self.logger.error(f"Возникли ошибки при валидации пакета: {error}") - await self._send_error(seq, self.opcodes.CHAT_HISTORY, self.error_types.INVALID_PAYLOAD, writer) - return - - # Извлекаем данные из пакета - chatId = payload.get("chatId") - forward = payload.get("forward", 0) - backward = payload.get("backward", 0) - from_time = payload.get("from", 0) - getMessages = payload.get("getMessages", True) - messages = [] - - # Если пользователь хочет получить историю из избранного, - # то выставляем в качестве ID чата его ID - if chatId == 0: - chatId = senderId - - # Проверяем, существует ли чат - async with self.db_pool.acquire() as conn: - async with conn.cursor() as cursor: - # Проверяем состоит ли пользователь в чате, - # только в случае того, если это не избранное - if chatId != senderId: - await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,)) - chat = await cursor.fetchone() - - # Выбрасываем ошибку, если чата нет - if not chat: - await self._send_error(seq, self.opcodes.CHAT_HISTORY, self.error_types.CHAT_NOT_FOUND, writer) - return - - # Проверяем, является ли пользователь участником чата - participants = await self.tools.get_chat_participants(chatId, self.db_pool) - if int(senderId) not in participants: - await self._send_error(seq, self.opcodes.CHAT_HISTORY, self.error_types.CHAT_NOT_ACCESS, writer) - return - - # Если запрошены сообщения - if getMessages: - if backward > 0: - await cursor.execute( - "SELECT * FROM messages WHERE chat_id = %s AND time < %s ORDER BY time ASC LIMIT %s", - (chatId, from_time, backward) - ) - - result = await cursor.fetchall() - - for row in result: - messages.append({ - "id": row.get("id"), - "time": int(row.get("time")), - "type": row.get("type"), - "sender": row.get("sender"), - "text": row.get("text"), - "attaches": json.loads(row.get("attaches")), - "elements": json.loads(row.get("elements")), - "reactionInfo": {} - }) - - if forward > 0: - await cursor.execute( - "SELECT * FROM messages WHERE chat_id = %s AND time > %s ORDER BY time ASC LIMIT %s", - (chatId, from_time, forward) - ) - - result = await cursor.fetchall() - - for row in result: - messages.append({ - "id": row.get("id"), - "time": int(row.get("time")), - "type": row.get("type"), - "sender": row.get("sender"), - "text": row.get("text"), - "attaches": json.loads(row.get("attaches")), - "elements": json.loads(row.get("elements")), - "reactionInfo": {} - }) - - # Сортируем сообщения по времени - messages.sort(key=lambda x: x["time"]) - - # Формируем ответ - payload = { - "messages": messages - } - - # Собираем пакет - packet = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CHAT_HISTORY, payload=payload - ) - - # Отправялем - await self._send(writer, packet) - - async def profile(self, payload, seq, writer, userId, userPhone): - # Валидируем входные данные - try: - UpdateProfilePayloadModel.model_validate(payload) - except Exception as e: - await self._send_error(seq, self.opcodes.PROFILE, self.error_types.INVALID_PAYLOAD, writer) - return - - # Извлекаем поля из пакета (каждое может быть None) - description = payload.get("description") - firstName = payload.get("firstName") - lastName = payload.get("lastName") - - # Обновляем только те поля, которые пришли в запросе - async with self.db_pool.acquire() as conn: - async with conn.cursor() as cursor: - if description is not None: - # При изменении описания также обновляем время последнего изменения профиля - await cursor.execute( - "UPDATE users SET description = %s, updatetime = %s WHERE id = %s", - (description, int(time.time() * 1000), userId) - ) - if firstName is not None: - await cursor.execute( - "UPDATE users SET firstname = %s WHERE id = %s", - (firstName, userId) - ) - if lastName is not None: - await cursor.execute( - "UPDATE users SET lastname = %s WHERE id = %s", - (lastName, userId) - ) - - # Получаем актуальные данные пользователя после обновления - await cursor.execute("SELECT * FROM users WHERE id = %s", (userId,)) - user = await cursor.fetchone() - - # Формируем URL аватарки если она есть - photoId = None if not user.get("avatar_id") else int(user.get("avatar_id")) - avatar_url = None if not photoId else self.config.avatar_base_url + str(photoId) - - # Генерируем профиль для отправки клиенту - profile = self.tools.generate_profile( - id=user.get("id"), - phone=int(user.get("phone")), - avatarUrl=avatar_url, - photoId=photoId, - updateTime=int(user.get("updatetime")), - firstName=user.get("firstname"), - lastName=user.get("lastname"), - options=json.loads(user.get("options")), - description=user.get("description"), - accountStatus=int(user.get("accountstatus")), - profileOptions=json.loads(user.get("profileoptions")), - includeProfileOptions=True, - username=user.get("username") - ) - - # Данные пакета - payload = { - "profile": profile - } - - # Отправляем ответ на запрос (CMD_OK) - packet = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.PROFILE, payload=payload - ) - await self._send(writer, packet) - - # Отправляем всем сессиям о изменении профиля - await self.event( - user.get('id'), - { - "eventType": "profile_updated", - "profile": profile, - "writer": writer - } - ) - - async def chat_subscribe(self, payload, seq, writer): - # Валидируем входные данные - try: - ChatSubscribePayloadModel.model_validate(payload) - except Exception as e: - await self._send_error(seq, self.opcodes.CHAT_SUBSCRIBE, self.error_types.INVALID_PAYLOAD, writer) - return - - # Созадаем пакет - packet = self.proto.pack_packet( - cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CHAT_SUBSCRIBE, payload=None - ) - - # Отправялем - await self._send(writer, packet) diff --git a/src/oneme/processors/__init__.py b/src/oneme/processors/__init__.py new file mode 100644 index 0000000..87d877a --- /dev/null +++ b/src/oneme/processors/__init__.py @@ -0,0 +1,26 @@ +from .assets import AssetsProcessors +from .auth import AuthProcessors +from .calls import CallsProcessors +from .chats import ChatsProcessors +from .complains import ComplainsProcessors +from .folders import FoldersProcessors +from .history import HistoryProcessors +from .main import MainProcessors +from .messages import MessagesProcessors +from .search import SearchProcessors +from .sessions import SessionsProcessors + +class Processors( + AssetsProcessors, + AuthProcessors, + CallsProcessors, + ChatsProcessors, + ComplainsProcessors, + FoldersProcessors, + HistoryProcessors, + MainProcessors, + MessagesProcessors, + SearchProcessors, + SessionsProcessors +): + pass \ No newline at end of file diff --git a/src/oneme/processors/assets.py b/src/oneme/processors/assets.py new file mode 100644 index 0000000..45e1c15 --- /dev/null +++ b/src/oneme/processors/assets.py @@ -0,0 +1,31 @@ +import pydantic +import time +from classes.baseprocessor import BaseProcessor +from oneme.models import AssetsPayloadModel + +class AssetsProcessors(BaseProcessor): + async def assets_update(self, payload, seq, writer): + """Обработчик запроса ассетов клиента на сервере""" + # Валидируем данные пакета + try: + AssetsPayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.ASSETS_UPDATE, self.error_types.INVALID_PAYLOAD, writer) + return + + # TODO: сейчас это заглушка, а попозже нужно сделать полноценную реализацию + + # Данные пакета + payload = { + "sections": [], + "sync": int(time.time() * 1000) + } + + # Собираем пакет + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.ASSETS_UPDATE, payload=payload + ) + + # Отправляем + await self._send(writer, packet) \ No newline at end of file diff --git a/src/oneme/processors/auth.py b/src/oneme/processors/auth.py new file mode 100644 index 0000000..58fb8cb --- /dev/null +++ b/src/oneme/processors/auth.py @@ -0,0 +1,454 @@ +import json +import secrets +import hashlib +import time +import logging +import pydantic +from classes.baseprocessor import BaseProcessor +from oneme.models import ( + RequestCodePayloadModel, + VerifyCodePayloadModel, + AuthConfirmRegisterPayloadModel, + LoginPayloadModel, +) +from oneme.config import OnemeConfig +from common.sms import send_sms_code + + +class AuthProcessors(BaseProcessor): + def __init__(self, db_pool=None, clients=None, send_event=None, telegram_bot=None, type="socket"): + super().__init__(db_pool, clients, send_event, type) + self.server_config = OnemeConfig().SERVER_CONFIG + self.telegram_bot = telegram_bot + + async def auth_request(self, payload, seq, writer): + """Обработчик запроса кода""" + try: + RequestCodePayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.AUTH_REQUEST, self.error_types.INVALID_PAYLOAD, writer) + return + + # Извлекаем телефон из пакета + phone = payload.get("phone").replace("+", "").replace(" ", "").replace("-", "") + + # Генерируем токен + token = secrets.token_urlsafe(128) + token_hash = hashlib.sha256(token.encode()).hexdigest() + + # Время истечения токена + expires = int(time.time()) + 300 + + user_exists = False + + # Ищем пользователя + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT * FROM users WHERE phone = %s", (phone,)) + user = await cursor.fetchone() + + # Получаем код через SMS шлюз или генерируем локально (безопасность прежде всего) + if self.config.sms_gateway_url: + code = await send_sms_code(self.config.sms_gateway_url, phone) + + if code is None: + code = str(secrets.randbelow(900000) + 100000) + else: + code = str(secrets.randbelow(900000) + 100000) + + # Хешируем + code_hash = hashlib.sha256(code.encode()).hexdigest() + + # Сохраняем токен и если нужно отправляем код через тг + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + if user: + user_exists = True + # Сохраняем токен + await cursor.execute( + "INSERT INTO auth_tokens (phone, token_hash, code_hash, expires) VALUES (%s, %s, %s, %s)", + (phone, token_hash, code_hash, expires,) + ) + + # Если тг бот включен, и тг привязан к аккаунту - отправляем туда сообщение + if not self.config.sms_gateway_url and self.telegram_bot and user.get("telegram_id"): + await self.telegram_bot.send_code(chat_id=int(user.get("telegram_id")), phone=phone, code=code) + else: + # Пользователь не найден - сохраняем токен со state='register' + # чтобы после верификации кода направить на экран регистрации + await cursor.execute( + "INSERT INTO auth_tokens (phone, token_hash, code_hash, expires, state) VALUES (%s, %s, %s, %s, %s)", + (phone, token_hash, code_hash, expires, "register",) + ) + + # Данные пакета + payload = { + "requestMaxDuration": 60000, + "requestCountLeft": 10, + "altActionDuration": 60000, + "codeLength": 6, + "token": token + } + + # Собираем пакет + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.AUTH_REQUEST, payload=payload + ) + + # Отправляем + await self._send(writer, packet) + self.logger.debug(f"Код для {phone}: {code} (существующий={user_exists})") + + async def auth(self, payload, seq, writer, deviceType, deviceName): + """Обработчик проверки кода""" + try: + VerifyCodePayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.AUTH, self.error_types.INVALID_PAYLOAD, writer) + return + + # Извлекаем данные из пакета + code = payload.get("verifyCode") + token = payload.get("token") + + # Хешируем токен с кодом + hashed_code = hashlib.sha256(code.encode()).hexdigest() + hashed_token = hashlib.sha256(token.encode()).hexdigest() + + # Генерируем постоянный токен + login = secrets.token_urlsafe(128) + hashed_login = hashlib.sha256(login.encode()).hexdigest() + + # Ищем токен с кодом + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + # Ищем токен + await cursor.execute( + "SELECT * FROM auth_tokens WHERE token_hash = %s AND expires > UNIX_TIMESTAMP()", + (hashed_token,) + ) + stored_token = await cursor.fetchone() + + # Если токен просрочен, или его нет - отправляем ошибку + if stored_token is None: + await self._send_error(seq, self.opcodes.AUTH, self.error_types.CODE_EXPIRED, writer) + return + + # Проверяем код + if stored_token.get("code_hash") != hashed_code: + await self._send_error(seq, self.opcodes.AUTH, self.error_types.INVALID_CODE, writer) + return + + # Если это новый пользователь - переводим токен в state='verified' + # и отдаём клиенту REGISTER токен, чтобы он показал экран ввода имени + if stored_token.get("state") == "register": + await cursor.execute( + "UPDATE auth_tokens SET state = %s WHERE token_hash = %s", + ("verified", hashed_token,) + ) + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.AUTH, + payload={ + "tokenAttrs": { + "REGISTER": { + "token": token + } + }, + "presetAvatars": [] + } + ) + await self._send(writer, packet) + return + + # Ищем аккаунт + await cursor.execute("SELECT * FROM users WHERE phone = %s", (stored_token.get("phone"),)) + account = await cursor.fetchone() + + # Удаляем токен + await cursor.execute("DELETE FROM auth_tokens WHERE token_hash = %s", (hashed_token,)) + + # Создаем сессию + await cursor.execute( + "INSERT INTO tokens (phone, token_hash, device_type, device_name, location, time) VALUES (%s, %s, %s, %s, %s, %s)", + (stored_token.get("phone"), hashed_login, deviceType, deviceName, "Little Saint James Island", int(time.time()),) # весь покрытый зеленью, абсолютно весь, остров невезения в океане есть + ) + + # Генерируем профиль + # Аватарка с биографией + photoId = None if not account.get("avatar_id") else int(account.get("avatar_id")) + avatar_url = None if not photoId else self.config.avatar_base_url + photoId + description = None if not account.get("description") else account.get("description") + + # Собираем данные пакета + payload = { + "tokenAttrs": { + "LOGIN": { + "token": login + } + }, + "profile": self.tools.generate_profile( + id=account.get("id"), + phone=int(account.get("phone")), + avatarUrl=avatar_url, + photoId=photoId, + updateTime=int(account.get("updatetime")), + firstName=account.get("firstname"), + lastName=account.get("lastname"), + options=json.loads(account.get("options")), + description=description, + accountStatus=int(account.get("accountstatus")), + profileOptions=json.loads(account.get("profileoptions")), + includeProfileOptions=True, + username=account.get("username") + ) + } + + # Создаем пакет + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.AUTH, payload=payload + ) + + # Отправляем + await self._send(writer, packet) + + async def auth_confirm(self, payload, seq, writer, deviceType, deviceName): + """Обработчик подтверждения регистрации нового пользователя""" + # Валидируем данные пакета + try: + AuthConfirmRegisterPayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.AUTH_CONFIRM, self.error_types.INVALID_PAYLOAD, writer) + return + + # Извлекаем данные из пакета + token = payload.get("token") + first_name = payload.get("firstName").strip() + last_name = (payload.get("lastName") or "").strip() + + # Хешируем токен + hashed_token = hashlib.sha256(token.encode()).hexdigest() + + # Генерируем постоянный логин-токен + login = secrets.token_urlsafe(128) + hashed_login = hashlib.sha256(login.encode()).hexdigest() + + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + # Ищем токен - он должен быть в state='verified' + await cursor.execute( + "SELECT * FROM auth_tokens WHERE token_hash = %s AND expires > UNIX_TIMESTAMP() AND state = %s", + (hashed_token, "verified",) + ) + stored_token = await cursor.fetchone() + + # Если токен не найден или просрочен - отправляем ошибку + if stored_token is None: + await self._send_error(seq, self.opcodes.AUTH_CONFIRM, self.error_types.CODE_EXPIRED, writer) + return + + phone = stored_token.get("phone") + + # Проверяем что пользователь с таким телефоном ещё не существует + await cursor.execute("SELECT id FROM users WHERE phone = %s", (phone,)) + if await cursor.fetchone(): + await self._send_error(seq, self.opcodes.AUTH_CONFIRM, self.error_types.INVALID_PAYLOAD, writer) + return + + now_ms = int(time.time() * 1000) + now_s = int(time.time()) + + # Создаем пользователя + await cursor.execute( + """ + INSERT INTO users + (id, phone, telegram_id, firstname, lastname, username, + profileoptions, options, accountstatus, updatetime, lastseen) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """, + ( + self.tools.generate_id(), phone, None, first_name, last_name, None, + json.dumps([]), json.dumps(["TT", "ONEME"]), + 0, str(now_ms), str(now_s), + ) + ) + + user_id = cursor.lastrowid + + # Добавляем данные аккаунта + await cursor.execute( + """ + INSERT INTO user_data + (phone, contacts, folders, user_config, chat_config) + VALUES (%s %s, %s, %s, %s) + """, + ( + phone, + json.dumps([]), + json.dumps(self.static.USER_FOLDERS), + json.dumps(self.static.USER_SETTINGS), + json.dumps({}), + ) + ) + + # Удаляем токен + await cursor.execute("DELETE FROM auth_tokens WHERE token_hash = %s", (hashed_token,)) + + # Создаем сессию + await cursor.execute( + "INSERT INTO tokens (phone, token_hash, device_type, device_name, location, time) VALUES (%s, %s, %s, %s, %s, %s)", + (phone, hashed_login, deviceType or "ANDROID", deviceName or "Unknown", "Little Saint James Island", now_s,) + ) + + # Генерируем профиль + profile = self.tools.generate_profile( + id=user_id, + phone=int(phone), + avatarUrl=None, + photoId=None, + updateTime=now_ms, + firstName=first_name, + lastName=last_name, + options=["ONEME"], + description=None, + accountStatus=0, + profileOptions=[], + includeProfileOptions=True, + username=None + ) + + # Собираем данные пакета + payload = { + "userToken": "0", + "profile": profile, + "tokenType": "LOGIN", + "token": login + } + + # Создаем пакет + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.AUTH_CONFIRM, payload=payload + ) + + # Отправляем + await self._send(writer, packet) + self.logger.info(f"Новый пользователь зарегистрирован: phone={phone} id={user_id} name={first_name} {last_name}") + + async def login(self, payload, seq, writer): + """Обработчик авторизации клиента на сервере""" + # Валидируем данные пакета + try: + LoginPayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.LOGIN, self.error_types.INVALID_PAYLOAD, writer) + return + + # Чаты, где состоит пользователь + chats = [] + + # Получаем данные из пакета + token = payload.get("token") + + # Хешируем токен + hashed_token = hashlib.sha256(token.encode()).hexdigest() + + # Ищем токен в бд + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT * FROM tokens WHERE token_hash = %s", (hashed_token,)) + token_data = await cursor.fetchone() + + # Если токен не найден, отправляем ошибку + if token_data is None: + await self._send_error(seq, self.opcodes.LOGIN, self.error_types.INVALID_TOKEN, writer) + return + + # Ищем аккаунт пользователя в бд + await cursor.execute("SELECT * FROM users WHERE phone = %s", (token_data.get("phone"),)) + user = await cursor.fetchone() + + # Ищем данные пользователя в бд + await cursor.execute("SELECT * FROM user_data WHERE phone = %s", (token_data.get("phone"),)) + user_data = await cursor.fetchone() + + # Ищем все чаты, где состоит пользователь + await cursor.execute( + "SELECT * FROM chat_participants WHERE user_id = %s", + (user.get('id')) + ) + user_chats = await cursor.fetchall() + + for chat in user_chats: + chats.append( + chat.get("chat_id") + ) + + # Аватарка с биографией + photoId = None if not user.get("avatar_id") else int(user.get("avatar_id")) + avatar_url = None if not photoId else self.config.avatar_base_url + photoId + description = None if not user.get("description") else user.get("description") + + # Генерируем профиль + profile = self.tools.generate_profile( + id=user.get("id"), + phone=int(user.get("phone")), + avatarUrl=avatar_url, + photoId=photoId, + updateTime=int(user.get("updatetime")), + firstName=user.get("firstname"), + lastName=user.get("lastname"), + options=json.loads(user.get("options")), + description=description, + accountStatus=int(user.get("accountstatus")), + profileOptions=json.loads(user.get("profileoptions")), + includeProfileOptions=True, + username=user.get("username") + ) + + chats = await self.tools.generate_chats( + chats, self.db_pool, user.get("id") + ) + + # Формируем данные пакета + payload = { + "profile": profile, + "chats": chats, + "chatMarker": 0, + "messages": {}, + "contacts": [], + "presence": {}, + "config": { + "server": self.server_config, + "user": json.loads(user_data.get("user_config")) + }, + "token": token, + "videoChatHistory": False, + "time": int(time.time() * 1000) + } + + # Собираем пакет + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.LOGIN, payload=payload + ) + + # Отправляем + await self._send(writer, packet) + return int(user.get("phone")), int(user.get("id")), hashed_token + + async def logout(self, seq, writer, hashedToken): + """Обработчик завершения сессии""" + # Удаляем токен из бд + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute("DELETE FROM tokens WHERE token_hash = %s", (hashedToken,)) + + # Создаем пакет + response = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.LOGOUT, payload=None + ) + + # Отправляем + await self._send(writer, response) \ No newline at end of file diff --git a/src/oneme/processors/calls.py b/src/oneme/processors/calls.py new file mode 100644 index 0000000..4ff02fe --- /dev/null +++ b/src/oneme/processors/calls.py @@ -0,0 +1,64 @@ +import pydantic +import secrets +import time +from classes.baseprocessor import BaseProcessor +from oneme.models import ( + GetCallTokenPayloadModel, + GetCallHistoryPayloadModel +) + +class CallsProcessors(BaseProcessor): + async def ok_token(self, payload, seq, writer): + """Получение токена для звонка""" + # Валидируем данные пакета + try: + GetCallTokenPayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.OK_TOKEN, self.error_types.INVALID_PAYLOAD, writer) + return + + # TODO: когда-то взяться за звонки + + # Данные пакета + payload = { + "token": secrets.token_urlsafe(128), + "token_lifetime_ts": int(time.time() * 1000), + "token_refresh_ts": int(time.time() * 1000) + } + + # Создаем пакет + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.OK_TOKEN, payload=payload + ) + + # Отправляем + await self._send(writer, packet) + + async def video_chat_history(self, payload, seq, writer): + """Обработчик получения истории звонков""" + # Валидируем данные пакета + try: + GetCallHistoryPayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.VIDEO_CHAT_HISTORY, self.error_types.INVALID_PAYLOAD, writer) + return + + # TODO: сейчас это заглушка, а попозже нужно сделать полноценную реализацию + + # Данные пакета + payload = { + "hasMore": False, + "history": [], + "backwardMarker": 0, + "forwardMarker": 0 + } + + # Собираем пакет + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.VIDEO_CHAT_HISTORY, payload=payload + ) + + # Отправляем + await self._send(writer, packet) \ No newline at end of file diff --git a/src/oneme/processors/chats.py b/src/oneme/processors/chats.py new file mode 100644 index 0000000..7a57be7 --- /dev/null +++ b/src/oneme/processors/chats.py @@ -0,0 +1,20 @@ +import pydantic +from classes.baseprocessor import BaseProcessor +from oneme.models import ChatSubscribePayloadModel + +class ChatsProcessors(BaseProcessor): + async def chat_subscribe(self, payload, seq, writer): + # Валидируем входные данные + try: + ChatSubscribePayloadModel.model_validate(payload) + except Exception as e: + await self._send_error(seq, self.opcodes.CHAT_SUBSCRIBE, self.error_types.INVALID_PAYLOAD, writer) + return + + # Созадаем пакет + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CHAT_SUBSCRIBE, payload=None + ) + + # Отправялем + await self._send(writer, packet) diff --git a/src/oneme/processors/complains.py b/src/oneme/processors/complains.py new file mode 100644 index 0000000..1b5eb80 --- /dev/null +++ b/src/oneme/processors/complains.py @@ -0,0 +1,29 @@ +import pydantic +import time +from classes.baseprocessor import BaseProcessor +from oneme.models import ComplainReasonsGetPayloadModel + +class ComplainsProcessors(BaseProcessor): + async def complain_reasons_get(self, payload, seq, writer): + """Обработчик получения причин жалоб""" + # Валидируем данные пакета + try: + ComplainReasonsGetPayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.COMPLAIN_REASONS_GET, self.error_types.INVALID_PAYLOAD, writer) + return + + # Собираем данные пакета + payload = { + "complains": self.static.COMPLAIN_REASONS, + "complainSync": int(time.time()) + } + + # Создаем пакет + packet = self.proto.pack_packet( + seq=seq, opcode=self.opcodes.COMPLAIN_REASONS_GET, payload=payload + ) + + # Отправляем пакет + await self._send(writer, packet) diff --git a/src/oneme/processors/folders.py b/src/oneme/processors/folders.py new file mode 100644 index 0000000..fccda9f --- /dev/null +++ b/src/oneme/processors/folders.py @@ -0,0 +1,39 @@ +import pydantic +import json +import time +from classes.baseprocessor import BaseProcessor +from oneme.models import SyncFoldersPayloadModel + +class FoldersProcessors(BaseProcessor): + async def folders_get(self, payload, seq, writer, senderPhone): + """Синхронизация папок с сервером""" + # Валидируем данные пакета + try: + SyncFoldersPayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.FOLDERS_GET, self.error_types.INVALID_PAYLOAD, writer) + return + + # Ищем папки в бд + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT folders FROM user_data WHERE phone = %s", (int(senderPhone),)) + result_folders = await cursor.fetchone() + user_folders = json.loads(result_folders.get("folders")) + + # Создаем данные пакета + payload = { + "folderSync": int(time.time() * 1000), + "folders": self.static.ALL_CHAT_FOLDER + user_folders.get("folders"), + "foldersOrder": self.static.ALL_CHAT_FOLDER_ORDER + user_folders.get("foldersOrder"), + "allFilterExcludeFolders": user_folders.get("allFilterExcludeFolders") + } + + # Собираем пакет + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.FOLDERS_GET, payload=payload + ) + + # Отправляем + await self._send(writer, packet) \ No newline at end of file diff --git a/src/oneme/processors/history.py b/src/oneme/processors/history.py new file mode 100644 index 0000000..9cd63a6 --- /dev/null +++ b/src/oneme/processors/history.py @@ -0,0 +1,106 @@ +import pydantic +import json +from classes.baseprocessor import BaseProcessor +from oneme.models import ChatHistoryPayloadModel + +class HistoryProcessors(BaseProcessor): + async def chat_history(self, payload, seq, writer, senderId): + """Обработчик получения истории чата""" + # Валидируем данные пакета + try: + ChatHistoryPayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.CHAT_HISTORY, self.error_types.INVALID_PAYLOAD, writer) + return + + # Извлекаем данные из пакета + chatId = payload.get("chatId") + forward = payload.get("forward", 0) + backward = payload.get("backward", 0) + from_time = payload.get("from", 0) + getMessages = payload.get("getMessages", True) + messages = [] + + # Если пользователь хочет получить историю из избранного, + # то выставляем в качестве ID чата его ID + if chatId == 0: + chatId = senderId + + # Проверяем, существует ли чат + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + # Проверяем состоит ли пользователь в чате, + # только в случае того, если это не избранное + if chatId != senderId: + await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,)) + chat = await cursor.fetchone() + + # Выбрасываем ошибку, если чата нет + if not chat: + await self._send_error(seq, self.opcodes.CHAT_HISTORY, self.error_types.CHAT_NOT_FOUND, writer) + return + + # Проверяем, является ли пользователь участником чата + participants = await self.tools.get_chat_participants(chatId, self.db_pool) + if int(senderId) not in participants: + await self._send_error(seq, self.opcodes.CHAT_HISTORY, self.error_types.CHAT_NOT_ACCESS, writer) + return + + # Если запрошены сообщения + if getMessages: + if backward > 0: + await cursor.execute( + "SELECT * FROM messages WHERE chat_id = %s AND time < %s ORDER BY time ASC LIMIT %s", + (chatId, from_time, backward) + ) + + result = await cursor.fetchall() + + for row in result: + messages.append({ + "id": row.get("id"), + "time": int(row.get("time")), + "type": row.get("type"), + "sender": row.get("sender"), + "text": row.get("text"), + "attaches": json.loads(row.get("attaches")), + "elements": json.loads(row.get("elements")), + "reactionInfo": {} + }) + + if forward > 0: + await cursor.execute( + "SELECT * FROM messages WHERE chat_id = %s AND time > %s ORDER BY time ASC LIMIT %s", + (chatId, from_time, forward) + ) + + result = await cursor.fetchall() + + for row in result: + messages.append({ + "id": row.get("id"), + "time": int(row.get("time")), + "type": row.get("type"), + "sender": row.get("sender"), + "text": row.get("text"), + "attaches": json.loads(row.get("attaches")), + "elements": json.loads(row.get("elements")), + "reactionInfo": {} + }) + + # Сортируем сообщения по времени + messages.sort(key=lambda x: x["time"]) + + # Формируем ответ + payload = { + "messages": messages + } + + # Собираем пакет + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CHAT_HISTORY, payload=payload + ) + + # Отправялем + await self._send(writer, packet) \ No newline at end of file diff --git a/src/oneme/processors/main.py b/src/oneme/processors/main.py new file mode 100644 index 0000000..65d895c --- /dev/null +++ b/src/oneme/processors/main.py @@ -0,0 +1,130 @@ +import pydantic +import json +from classes.baseprocessor import BaseProcessor +from oneme.models import ( + HelloPayloadModel, + PingPayloadModel, + UpdateProfilePayloadModel +) + + +class MainProcessors(BaseProcessor): + def __init__(self, db_pool=None, clients=None, send_event=None, type="socket"): + super().__init__(db_pool, clients, send_event, type) + + async def session_init(self, payload, seq, writer): + """Обработчик приветствия""" + # Валидируем данные пакета + try: + HelloPayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.SESSION_INIT, self.error_types.INVALID_PAYLOAD, writer) + return None, None + + # Получаем данные из пакета + deviceType = payload.get("userAgent").get("deviceType") + deviceName = payload.get("userAgent").get("deviceName") + + # Данные пакета + payload = { + "location": "RU", + "app-update-type": 0, # 1 = принудительное обновление + "reg-country-code": self.static.REG_COUNTRY_CODES, + "phone-auto-complete-enabled": False, + "lang": True + } + + # Собираем пакет + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.SESSION_INIT, payload=payload + ) + + # Отправляем + await self._send(writer, packet) + return deviceType, deviceName + + async def ping(self, payload, seq, writer): + """Обработчик пинга""" + # Валидируем данные пакета + try: + PingPayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.PING, self.error_types.INVALID_PAYLOAD, writer) + return + + # Собираем пакет + response = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.PING, payload=None + ) + + # Отправляем + await self._send(writer, response) + + async def log(self, payload, seq, writer): + """Обработчик телеметрии""" + # TODO: можно было бы реализовать валидацию телеметрии, но сейчас это не особо важно + + # Собираем пакет + response = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.LOG, payload=None + ) + + # Отправляем + await self._send(writer, response) + + async def profile(self, payload, seq, writer, userId): + # Валидируем входные данные + try: + UpdateProfilePayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.PROFILE, self.error_types.INVALID_PAYLOAD, writer) + return + + # Ищем пользователя в бд + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT * FROM users WHERE id = %s", (userId,)) + user = await cursor.fetchone() + + # Если пользователь не найден + if not user: + await self._send_error(seq, self.opcodes.PROFILE, self.error_types.USER_NOT_FOUND, writer) + return + + # Аватарка с биографией + photoId = None if not user.get("avatar_id") else int(user.get("avatar_id")) + avatar_url = None if not photoId else self.config.avatar_base_url + photoId + description = None if not user.get("description") else user.get("description") + + # Генерируем профиль + profile = self.tools.generate_profile( + id=user.get("id"), + phone=int(user.get("phone")), + avatarUrl=avatar_url, + photoId=photoId, + updateTime=int(user.get("updatetime")), + firstName=user.get("firstname"), + lastName=user.get("lastname"), + options=json.loads(user.get("options")), + description=description, + accountStatus=int(user.get("accountstatus")), + profileOptions=json.loads(user.get("profileoptions")), + includeProfileOptions=True, + username=user.get("username") + ) + + # Создаем данные пакета + payload = { + "profile": profile + } + + # Собираем пакет + response = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.PROFILE, payload=payload + ) + + # Отправляем + await self._send(writer, response) \ No newline at end of file diff --git a/src/oneme/processors/messages.py b/src/oneme/processors/messages.py new file mode 100644 index 0000000..44f46ae --- /dev/null +++ b/src/oneme/processors/messages.py @@ -0,0 +1,168 @@ +import pydantic +from classes.baseprocessor import BaseProcessor +from oneme.models import ( + TypingPayloadModel, + SendMessagePayloadModel +) + +class MessagesProcessors(BaseProcessor): + async def msg_typing(self, payload, seq, writer, senderId): + """Обработчик события печатания""" + # Валидируем данные пакета + try: + TypingPayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.MSG_TYPING, self.error_types.INVALID_PAYLOAD, writer) + return + + # Извлекаем данные из пакета + chatId = payload.get("chatId") + type = payload.get("type") or "TYPING" + + # Ищем чат в базе данных + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,)) + chat = await cursor.fetchone() + + # Если чат не найден, отправляем ошибку + if not chat: + await self._send_error(seq, self.opcodes.MSG_TYPING, self.error_types.CHAT_NOT_FOUND, writer) + return + + # Участники чата + participants = await self.tools.get_chat_participants(chatId, self.db_pool) + + # Проверяем, является ли отправитель участником чата + if int(senderId) not in participants: + await self._send_error(seq, self.opcodes.MSG_TYPING, self.error_types.CHAT_NOT_ACCESS, writer) + return + + # Рассылаем событие участникам чата + for participant in participants: + if participant != senderId: + # Если участник не является отправителем, отправляем + await self.event( + participant, + { + "eventType": "typing", + "chatId": chatId, + "type": type, + "userId": senderId + } + ) + + # Создаем пакет + packet = self.proto.pack_packet( + seq=seq, opcode=self.opcodes.MSG_TYPING + ) + + # Отправляем пакет + await self._send(writer, packet) + + async def msg_send(self, payload, seq, writer, senderId, db_pool): + """Функция отправки сообщения""" + # Валидируем данные пакета + try: + SendMessagePayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.MSG_SEND, self.error_types.INVALID_PAYLOAD, writer) + return + + # Извлекаем данные из пакета + userId = payload.get("userId") + chatId = payload.get("chatId") + message = payload.get("message") + + elements = message.get("elements") or [] + attaches = message.get("attaches") or [] + cid = message.get("cid") or 0 + text = message.get("text") or "" + + # Вычисляем ID чата по ID пользователя и ID отправителя, + # в случае отсутствия ID чата + if not chatId: + chatId = userId ^ senderId + + # Если клиент хочет отправить сообщение в избранное, + # то выставляем в качестве ID чата ID отправителя + # (А ещё используем это, если клиент вообще ничего не указал) + if chatId == 0 or not chatId: + chatId = senderId + participants = [senderId] + else: + # Если все таки клиент хочет отправить сообщение в нормальный чат, + # то ищем его в базе данных (извлекать список участников все таки тоже надо) + async with db_pool.acquire() as db_connection: + async with db_connection.cursor() as cursor: + await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,)) + chat = await cursor.fetchone() + + # Если нет такого чата - выбрасываем ошибку + if not chat: + await self._send_error(seq, self.opcodes.MSG_SEND, self.error_types.CHAT_NOT_FOUND, writer) + return + + # Список участников + participants = await self.tools.get_chat_participants(chatId, db_pool) + + # Проверяем, является ли отправитель участником чата + if int(senderId) not in participants: + await self._send_error(seq, self.opcodes.MSG_SEND, self.error_types.CHAT_NOT_ACCESS, writer) + return + + # Добавляем сообщение в историю + messageId, lastMessageId, messageTime = await self.tools.insert_message( + chatId=chatId, + senderId=senderId, + text=text, + attaches=attaches, + elements=elements, + cid=cid, + type="USER", + db_pool=self.db_pool + ) + + # Готовое тело сообщения + bodyMessage = { + "id": messageId, + "time": messageTime, + "type": "USER", + "sender": senderId, + "cid": cid, + "text": text, + "attaches": attaches, + "elements": elements + } + + # Отправляем событие всем участникам чата + for participant in participants: + await self.event( + participant, + { + "eventType": "new_msg", + "chatId": 0 if chatId == senderId else chatId, + "message": bodyMessage, + "prevMessageId": lastMessageId, + "time": messageTime, + "writer": writer + } + ) + + # Данные пакета + payload = { + "chatId": 0 if chatId == senderId else chatId, + "message": bodyMessage, + "unread": 0, + "mark": messageTime + } + + # Собираем пакет + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.MSG_SEND, payload=payload + ) + + # Отправляем + await self._send(writer, packet) \ No newline at end of file diff --git a/src/oneme/processors/search.py b/src/oneme/processors/search.py new file mode 100644 index 0000000..d9ba9fe --- /dev/null +++ b/src/oneme/processors/search.py @@ -0,0 +1,205 @@ +import json +import pydantic +from classes.baseprocessor import BaseProcessor +from oneme.models import ( + SearchUsersPayloadModel, + SearchChatsPayloadModel, + SearchByPhonePayloadModel +) + + +class SearchProcessors(BaseProcessor): + async def contact_info(self, payload, seq, writer): + """Поиск пользователей по ID""" + # Валидируем данные пакета + try: + SearchUsersPayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.CONTACT_INFO, self.error_types.INVALID_PAYLOAD, writer) + return + + # Итоговый список пользователей + users = [] + + # ID пользователей, которые нам предстоит найти + contactIds = payload.get("contactIds") + + # Ищем пользователей в бд + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + for contactId in contactIds: + await cursor.execute("SELECT * FROM users WHERE id = %s", (contactId,)) + user = await cursor.fetchone() + + # Если такой пользователь есть, добавляем его в список + if user: + # Аватарка с биографией + photoId = None if not user.get("avatar_id") else int(user.get("avatar_id")) + avatar_url = None if not photoId else self.config.avatar_base_url + photoId + description = None if not user.get("description") else user.get("description") + + # Генерируем профиль + users.append( + self.tools.generate_profile( + id=user.get("id"), + phone=int(user.get("phone")), + avatarUrl=avatar_url, + photoId=photoId, + updateTime=int(user.get("updatetime")), + firstName=user.get("firstname"), + lastName=user.get("lastname"), + options=json.loads(user.get("options")), + description=description, + accountStatus=int(user.get("accountstatus")), + profileOptions=json.loads(user.get("profileoptions")), + includeProfileOptions=False, + username=user.get("username") + ) + ) + + # Создаем данные пакета + payload = { + "contacts": users + } + + # Создаем пакет + response = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CONTACT_INFO, payload=payload + ) + + # Отправляем + await self._send(writer, response) + + async def contact_info_by_phone(self, payload, seq, writer, senderId): + """Поиск по номеру телефона""" + # Валидируем данные пакета + try: + SearchByPhonePayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.CONTACT_INFO_BY_PHONE, self.error_types.INVALID_PAYLOAD, writer) + return + + # Ищем пользователя в бд + phone = payload.get("phone").replace("+", "").replace(" ", "").replace("-", "") + + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT * FROM users WHERE phone = %s", (phone,)) + user = await cursor.fetchone() + + # Если пользователь найден + if user: + # Аватарка с биографией + photoId = None if not user.get("avatar_id") else int(user.get("avatar_id")) + avatar_url = None if not photoId else self.config.avatar_base_url + photoId + description = None if not user.get("description") else user.get("description") + + # Генерируем профиль + profile = self.tools.generate_profile( + id=user.get("id"), + phone=int(user.get("phone")), + avatarUrl=avatar_url, + photoId=photoId, + updateTime=int(user.get("updatetime")), + firstName=user.get("firstname"), + lastName=user.get("lastname"), + options=json.loads(user.get("options")), + description=description, + accountStatus=int(user.get("accountstatus")), + profileOptions=json.loads(user.get("profileoptions")), + includeProfileOptions=False, + username=user.get("username") + ) + else: + profile = None + + # Создаем данные пакета + payload = { + "profile": profile + } + + # Создаем пакет + response = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CONTACT_INFO_BY_PHONE, payload=payload + ) + + # Отправляем + await self._send(writer, response) + + async def chat_info(self, payload, seq, writer, senderId): + """Поиск чатов по ID""" + # Валидируем данные пакета + try: + SearchChatsPayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.CHAT_INFO, self.error_types.INVALID_PAYLOAD, writer) + return + + # Итоговый список чатов + chats = [] + + # ID чатов, которые нам предстоит найти + chatIds = payload.get("chatIds") + + # Ищем чаты в бд + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + for chatId in chatIds: + if chatId != 0: + await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,)) + chat = await cursor.fetchone() + + if chat: + # Проверяем, является ли пользователь участником чата + participants = await self.tools.get_chat_participants(chatId, self.db_pool) + # (в max нельзя смотреть и отправлять сообщения в чат, в котором ты не участник, в отличие от tg (например, комментарии в каналах), + # так что надо тоже так делать) + if int(senderId) not in participants: + continue + + # Получаем последнее сообщение из чата + message, messageTime = await self.tools.get_last_message( + chatId, self.db_pool + ) + + # Добавляем чат в список + chats.append( + self.tools.generate_chat( + chatId, chat.get("owner"), + chat.get("type"), participants, + message, messageTime + ) + ) + else: + # Получаем последнее сообщение из чата + message, messageTime = await self.tools.get_last_message( + senderId, self.db_pool + ) + + # ID избранного + chatId = senderId ^ senderId + + # Добавляем чат в список + chats.append( + self.tools.generate_chat( + chatId, senderId, + "DIALOG", [senderId], + message, messageTime + ) + ) + + # Создаем данные пакета + payload = { + "chats": chats + } + + # Собираем пакет + response = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CHAT_INFO, payload=payload + ) + + # Отправляем + await self._send(writer, response) diff --git a/src/oneme/processors/sessions.py b/src/oneme/processors/sessions.py new file mode 100644 index 0000000..6218397 --- /dev/null +++ b/src/oneme/processors/sessions.py @@ -0,0 +1,38 @@ +from classes.baseprocessor import BaseProcessor + +class SessionsProcessors(BaseProcessor): + async def sessions_info(self, payload, seq, writer, senderPhone, hashedToken): + """Получение активных сессий на аккаунте""" + # Готовый список сессий + sessions = [] + + # Ищем сессии в бд + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT * FROM tokens WHERE phone = %s", (str(senderPhone),)) + user_sessions = await cursor.fetchall() + + # Собираем сессии в список + for session in user_sessions: + sessions.append( + { + "time": int(session.get("time")), + "client": f"MAX {session.get('device_type')}", + "info": session.get("device_name"), + "location": session.get("location"), + "current": True if session.get("token_hash") == hashedToken else False + } + ) + + # Создаем данные пакета + payload = { + "sessions": sessions + } + + # Создаем пакет + response = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.SESSIONS_INFO, payload=payload + ) + + # Отправляем + await self._send(writer, response) diff --git a/src/oneme/socket.py b/src/oneme/socket.py index 37884b9..d8196ad 100644 --- a/src/oneme/socket.py +++ b/src/oneme/socket.py @@ -6,7 +6,7 @@ from common.tools import Tools from common.opcodes import Opcodes class OnemeMobileServer: - def __init__(self, host="0.0.0.0", port=443, ssl_context=None, db_pool=None, clients={}, send_event=None, telegram_bot=None): + def __init__(self, host, port, ssl_context, db_pool, clients, send_event, telegram_bot): self.host = host self.port = port self.ssl_context = ssl_context @@ -157,7 +157,7 @@ class OnemeMobileServer: ) case self.opcodes.PROFILE: await self.processors.profile( - payload, seq, writer, userId=userId, userPhone=userPhone + payload, seq, writer, userId=userId ) case self.opcodes.CHAT_SUBSCRIBE: await self.auth_required(