From c716520ca41f506a6625cff2f81a2a0bf22845ed Mon Sep 17 00:00:00 2001 From: Alexey Polyakov Date: Tue, 28 Apr 2026 06:56:29 +0300 Subject: [PATCH] =?UTF-8?q?MAX:=20=D0=B4=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB?= =?UTF-8?q?=D0=B5=D0=BD=D0=B8=D0=B5/=D1=83=D0=B4=D0=B0=D0=BB=D0=B5=D0=BD?= =?UTF-8?q?=D0=B8=D0=B5=20=D0=BA=D0=BE=D0=BD=D1=82=D0=B0=D0=BA=D1=82=D0=BE?= =?UTF-8?q?=D0=B2,=20=D1=81=D1=82=D0=B0=D1=82=D1=83=D1=81=D1=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/common/tools.py | 47 +++++++++++- src/oneme/controller.py | 14 ++++ src/oneme/models.py | 11 ++- src/oneme/processors/auth.py | 10 ++- src/oneme/processors/contacts.py | 124 ++++++++++++++++++++++++++++++- src/oneme/processors/main.py | 60 ++++++++++++++- src/oneme/socket.py | 70 ++++++++++++++++- src/oneme/websocket.py | 68 ++++++++++++++++- 8 files changed, 392 insertions(+), 12 deletions(-) diff --git a/src/common/tools.py b/src/common/tools.py index ab7cdff..3024427 100644 --- a/src/common/tools.py +++ b/src/common/tools.py @@ -332,7 +332,7 @@ class Tools: async with db_pool.acquire() as db_connection: async with db_connection.cursor() as cursor: await cursor.execute( - "SELECT * FROM `contacts` WHERE owner_id = %s", + "SELECT * FROM `contacts` WHERE owner_id = %s AND is_blocked = FALSE", (owner_id,), ) rows = await cursor.fetchall() @@ -501,3 +501,48 @@ class Tools: ) return updated_config + + async def collect_presence(self, contact_ids, clients, db_pool): + """Собирает статусы пользователей""" + now = int(time.time()) + presence = {} + + # Список тех, кого нужно поискать в базе данных + db_lookup_ids = [] + + # Проходимся по всем айдишникам, + # которые передал нам клиент + for contact_id in contact_ids: + contact_id = int(contact_id) + + client = clients.get(contact_id) + + # Если пользователь онлайн + if client and client.get("status") == 2: + presence[str(contact_id)] = {"seen": now, "status": 2} + # Если пользователь подключен, + # но не взаимодействует с клиентом + elif client and client.get("last_seen"): + presence[str(contact_id)] = {"seen": client.get("last_seen")} + # А если никакое условие не подошло, то добавляем его в лист, + # а позже посмотрим в базе данных + else: + db_lookup_ids.append(contact_id) + + # Проходимся по листу и добавляем недостающих, + # если такие существуют конечно + async with db_pool.acquire() as conn: + async with conn.cursor() as cursor: + for contact_id in db_lookup_ids: + await cursor.execute( + "SELECT lastseen FROM users WHERE id = %s", + (contact_id,) + ) + + row = await cursor.fetchone() + + if row: + lastseen = row.get("lastseen") + presence[int(contact_id)] = {"seen": int(lastseen)} + + return presence diff --git a/src/oneme/controller.py b/src/oneme/controller.py index df48761..44a236b 100644 --- a/src/oneme/controller.py +++ b/src/oneme/controller.py @@ -74,6 +74,20 @@ class OnemeController(ControllerBase): packet = self.proto.pack_packet( cmd=0, seq=1, opcode=self.opcodes.NOTIF_PROFILE, payload=payload ) + elif eventType == "presence": + userId = eventData.get("userId") + presence = eventData.get("presence") + event_time = eventData.get("time") + + payload = { + "userId": userId, + "presence": presence, + "time": event_time + } + + packet = self.proto.pack_packet( + cmd=0, seq=1, opcode=self.opcodes.NOTIF_PRESENCE, payload=payload + ) # Отправляем пакет writer.write(packet) diff --git a/src/oneme/models.py b/src/oneme/models.py index 74d6927..8a858c5 100644 --- a/src/oneme/models.py +++ b/src/oneme/models.py @@ -140,4 +140,13 @@ class ChatSubscribePayloadModel(pydantic.BaseModel): class ContactListPayloadModel(pydantic.BaseModel): status: str - count: int = None \ No newline at end of file + count: int = None + +class ContactPresencePayloadModel(pydantic.BaseModel): + contactIds: list + +class ContactUpdatePayloadModel(pydantic.BaseModel): + action: str + contactId: int + firstName: str + lastName: str = None \ No newline at end of file diff --git a/src/oneme/processors/auth.py b/src/oneme/processors/auth.py index 47b275d..fdce2ca 100644 --- a/src/oneme/processors/auth.py +++ b/src/oneme/processors/auth.py @@ -607,6 +607,10 @@ class AuthProcessors(BaseProcessor): user.get("id"), self.db_pool, self.config.avatar_base_url ) + # Собираем статусы контактов + contact_ids = [c.get("id") for c in contacts if c.get("id") is not None] + presence = await self.tools.collect_presence(contact_ids, self.clients, self.db_pool) + # Формируем данные пакета payload = { "profile": profile, @@ -614,17 +618,17 @@ class AuthProcessors(BaseProcessor): "chatMarker": 0, "messages": {}, "contacts": contacts, - "presence": {}, + "presence": presence, "config": { + "hash": "0", "server": self.server_config, "user": updated_user_config, }, "token": token, "videoChatHistory": False, "time": int(time.time() * 1000), - } + } # Собираем пакет - # Собираем пакет packet = self.proto.pack_packet( cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.LOGIN, payload=payload ) diff --git a/src/oneme/processors/contacts.py b/src/oneme/processors/contacts.py index b94abf5..eb6e4ef 100644 --- a/src/oneme/processors/contacts.py +++ b/src/oneme/processors/contacts.py @@ -1,6 +1,8 @@ import pydantic +import json +import time from classes.baseprocessor import BaseProcessor -from oneme.models import ContactListPayloadModel +from oneme.models import ContactListPayloadModel, ContactPresencePayloadModel, ContactUpdatePayloadModel class ContactsProcessors(BaseProcessor): async def contact_list(self, payload, seq, writer, userId): @@ -64,3 +66,123 @@ class ContactsProcessors(BaseProcessor): # Отправляем пакет await self._send(writer, packet) + + async def contact_update(self, payload, seq, writer, userId): + """ + Обработчик опкода какого-то там + (их хуй запомнишь, даже в мриме команды помню, бля) + + Отвечает за добавку, удаление, блокировку и разблокировку контакта + """ + # Валидируем данные пакета + try: + ContactUpdatePayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.CONTACT_UPDATE, self.error_types.INVALID_PAYLOAD, writer) + return + + action = payload.get("action") + contactId = payload.get("contactId") + firstName = payload.get("firstName") + lastName = payload.get("lastName", "") + + if action == "ADD": + # Проверяем, существует ли пользователь с таким ID + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT * FROM users WHERE id = %s", (contactId,)) + user = await cursor.fetchone() + + if not user: + await self._send_error(seq, self.opcodes.CONTACT_UPDATE, self.error_types.USER_NOT_FOUND, writer) + return + + # Проверяем, не добавлен ли уже контакт + await cursor.execute( + "SELECT * FROM contacts WHERE owner_id = %s AND contact_id = %s", + (userId, contactId) + ) + row = await cursor.fetchone() + + # Если контакта не существует, то можем продолжать, + if not row: + # Добавляем контакт + await cursor.execute( + "INSERT INTO contacts (owner_id, contact_id, custom_firstname, custom_lastname, is_blocked) VALUES (%s, %s, %s, %s, FALSE)", + (userId, contactId, firstName, lastName) + ) + # а если уже существует, отправляем ошибку + else: + await self._send_error(seq, self.opcodes.CONTACT_UPDATE, self.error_types.CONTACT_ALREADY_EXISTS, writer) + return + + # Генерируем профиль + photoId = None if not user.get("avatar_id") else int(user.get("avatar_id")) + avatar_url = None if not photoId else self.config.avatar_base_url + str(photoId) + + contact = self.tools.generate_profile( + id=user.get("id"), + phone=int(user.get("phone")), + avatarUrl=avatar_url, + photoId=photoId, + updateTime=int(user.get("updatetime")), + firstName=user.get("firstname"), + lastName=user.get("lastname"), + options=json.loads(user.get("options")), + accountStatus=int(user.get("accountstatus")), + includeProfileOptions=False, + custom_firstname=firstName, + custom_lastname=lastName, + ) + + response_payload = { + "contact": contact + } + + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CONTACT_UPDATE, payload=response_payload + ) + + await self._send(writer, packet) + + elif action == "REMOVE": + # Удаляем контакт + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "DELETE FROM contacts WHERE owner_id = %s AND contact_id = %s", + (userId, contactId) + ) + + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CONTACT_UPDATE, payload=None + ) + + await self._send(writer, packet) + + async def contact_presence(self, payload, seq, writer): + """Обработчик получения статуса контактов""" + # Валидируем данные пакета + try: + ContactPresencePayloadModel.model_validate(payload) + except pydantic.ValidationError as error: + self.logger.error(f"Возникли ошибки при валидации пакета: {error}") + await self._send_error(seq, self.opcodes.CONTACT_PRESENCE, self.error_types.INVALID_PAYLOAD, writer) + return + + contact_ids = payload.get("contactIds", []) + now_ms = int(time.time() * 1000) + + presence = await self.tools.collect_presence(contact_ids, self.clients, self.db_pool) + + response_payload = { + "presence": presence, + "time": now_ms + } + + packet = self.proto.pack_packet( + cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CONTACT_PRESENCE, payload=response_payload + ) + + await self._send(writer, packet) diff --git a/src/oneme/processors/main.py b/src/oneme/processors/main.py index c4e42fa..92ec267 100644 --- a/src/oneme/processors/main.py +++ b/src/oneme/processors/main.py @@ -1,9 +1,10 @@ import pydantic import json +import time from classes.baseprocessor import BaseProcessor from oneme.models import ( - HelloPayloadModel, - PingPayloadModel, + HelloPayloadModel, + PingPayloadModel, UpdateProfilePayloadModel ) @@ -30,6 +31,7 @@ class MainProcessors(BaseProcessor): # Данные пакета payload = { + "callsSeed": int(time.time() * 1000), "location": "RU", "app-update-type": 0, # 1 = принудительное обновление "reg-country-code": self.static.REG_COUNTRY_CODES, @@ -47,7 +49,7 @@ class MainProcessors(BaseProcessor): await self._send(writer, packet) return deviceType, deviceName, appVersion - async def ping(self, payload, seq, writer): + async def ping(self, payload, seq, writer, userId=None): """Обработчик пинга""" # Валидируем данные пакета try: @@ -57,6 +59,58 @@ class MainProcessors(BaseProcessor): await self._send_error(seq, self.opcodes.PING, self.error_types.INVALID_PAYLOAD, writer) return + # Обновляем статус пользователя, если он авторизован + # и в пакете отправлен интерактив + interactive = payload.get("interactive") if payload else None + if userId and interactive is not None: + now = int(time.time()) + user = self.clients.get(userId) + if user: + if interactive: + user["status"] = 2 + user["last_seen"] = now + else: + user["status"] = 0 + user["last_seen"] = now + + # Сохраняем последнее время посещения + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "UPDATE users SET lastseen = %s WHERE id = %s", + (str(now), userId) + ) + + # Рассылаем статус контактам пользователя + now_ms = int(time.time() * 1000) + if interactive: + presence_data = {"on": "ON", "seen": now, "status": 1} + else: + presence_data = {"seen": now} + + # Находим всех, у кого этот пользователь в контактах + async with self.db_pool.acquire() as conn2: + async with conn2.cursor() as cursor2: + await cursor2.execute( + "SELECT owner_id FROM contacts WHERE contact_id = %s", + (userId,) + ) + contact_owners = await cursor2.fetchall() + + # Рассылаем + for row in contact_owners: + owner_id = int(row.get("owner_id")) + if owner_id in self.clients: + await self.event( + owner_id, + { + "eventType": "presence", + "userId": userId, + "presence": presence_data, + "time": now_ms, + } + ) + # Собираем пакет response = self.proto.pack_packet( cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.PING, payload=None diff --git a/src/oneme/socket.py b/src/oneme/socket.py index 8834133..ee9b7e9 100644 --- a/src/oneme/socket.py +++ b/src/oneme/socket.py @@ -1,5 +1,6 @@ import asyncio import logging +import time import traceback from common.opcodes import Opcodes @@ -32,7 +33,7 @@ class OnemeMobile: self.opcodes = Opcodes() # rate limiter anti ddos brute force protection - self.auth_rate_limiter = RateLimiter(max_attempts=5, window_seconds=60) + self.auth_rate_limiter = RateLimiter(max_attempts=15, window_seconds=60) self.read_timeout = 300 # Таймаут чтения из сокета (секунды) self.max_read_size = 65536 # Максимальный размер данных из сокета @@ -156,7 +157,7 @@ class OnemeMobile: ) break case self.opcodes.PING: - await self.processors.ping(payload, seq, writer) + await self.processors.ping(payload, seq, writer, userId) case self.opcodes.LOG: await self.processors.log(payload, seq, writer) case self.opcodes.ASSETS_UPDATE: @@ -301,6 +302,23 @@ class OnemeMobile: userPhone, hashedToken, ) + case self.opcodes.CONTACT_UPDATE: + await self.auth_required( + userPhone, + self.processors.contact_update, + payload, + seq, + writer, + userId, + ) + case self.opcodes.CONTACT_PRESENCE: + await self.auth_required( + userPhone, + self.processors.contact_presence, + payload, + seq, + writer + ) case _: self.logger.warning(f"Неизвестный опкод {opcode}") except Exception as e: @@ -332,6 +350,8 @@ class OnemeMobile: self.clients[id] = { "phone": phone, "id": id, + "status": 2, + "last_seen": 0, "clients": [ { "writer": writer, @@ -342,6 +362,38 @@ class OnemeMobile: ], } + await self._broadcast_presence(id, True) + + async def _broadcast_presence(self, userId, online): + now = int(time.time()) + now_ms = int(time.time() * 1000) + + if online: + presence_data = {"on": "ON", "seen": now, "status": 1} + else: + presence_data = {"seen": now} + + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "SELECT owner_id FROM contacts WHERE contact_id = %s", + (userId,) + ) + contact_owners = await cursor.fetchall() + + for row in contact_owners: + owner_id = int(row.get("owner_id")) + if owner_id in self.clients: + await self.processors.event( + owner_id, + { + "eventType": "presence", + "userId": userId, + "presence": presence_data, + "time": now_ms, + } + ) + async def _end_session(self, id, ip, port): """Завершение сессии""" # Получаем пользователя в списке @@ -357,6 +409,20 @@ class OnemeMobile: if (client.get("ip"), client.get("port")) == (ip, port): clients.pop(i) + if not clients: + now = int(time.time()) + user["status"] = 0 + user["last_seen"] = now + + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "UPDATE users SET lastseen = %s WHERE id = %s", + (str(now), id) + ) + + await self._broadcast_presence(id, False) + async def start(self): """Функция для запуска сервера""" self.server = await asyncio.start_server( diff --git a/src/oneme/websocket.py b/src/oneme/websocket.py index 9ce1dac..88a630a 100644 --- a/src/oneme/websocket.py +++ b/src/oneme/websocket.py @@ -1,4 +1,5 @@ import logging +import time import traceback import websockets from common.proto_web import WebProto @@ -130,7 +131,7 @@ class OnemeWS: ) break case self.opcodes.PING: - await self.processors.ping(payload, seq, websocket) + await self.processors.ping(payload, seq, websocket, userId) case self.opcodes.LOG: await self.processors.log(payload, seq, websocket) case self.opcodes.ASSETS_UPDATE: @@ -275,6 +276,23 @@ class OnemeWS: userPhone, hashedToken, ) + case self.opcodes.CONTACT_UPDATE: + await self.auth_required( + userPhone, + self.processors.contact_update, + payload, + seq, + websocket, + userId, + ) + case self.opcodes.CONTACT_PRESENCE: + await self.auth_required( + userPhone, + self.processors.contact_presence, + payload, + seq, + websocket + ) case _: self.logger.warning(f"Неизвестный опкод {opcode}") except websockets.exceptions.ConnectionClosed: @@ -308,6 +326,8 @@ class OnemeWS: self.clients[id] = { "phone": phone, "id": id, + "status": 2, + "last_seen": 0, "clients": [ { "writer": websocket, @@ -318,6 +338,38 @@ class OnemeWS: ] } + await self._broadcast_presence(id, True) + + async def _broadcast_presence(self, userId, online): + now = int(time.time()) + now_ms = int(time.time() * 1000) + + if online: + presence_data = {"on": "ON", "seen": now, "status": 1} + else: + presence_data = {"seen": now} + + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "SELECT owner_id FROM contacts WHERE contact_id = %s", + (userId,) + ) + contact_owners = await cursor.fetchall() + + for row in contact_owners: + owner_id = int(row.get("owner_id")) + if owner_id in self.clients: + await self.processors.event( + owner_id, + { + "eventType": "presence", + "userId": userId, + "presence": presence_data, + "time": now_ms, + } + ) + async def _end_session(self, id, ip, port): """Завершение сессии""" # Получаем пользователя в списке @@ -333,6 +385,20 @@ class OnemeWS: if (client.get("ip"), client.get("port")) == (ip, port): clients.pop(i) + if not clients: + now = int(time.time()) + user["status"] = 0 + user["last_seen"] = now + + async with self.db_pool.acquire() as conn: + async with conn.cursor() as cursor: + await cursor.execute( + "UPDATE users SET lastseen = %s WHERE id = %s", + (str(now), id) + ) + + await self._broadcast_presence(id, False) + async def start(self): """Функция для запуска WebSocket сервера""" self.server = await websockets.serve(