MAX: добавление/удаление контактов, статусы

This commit is contained in:
Alexey Polyakov
2026-04-28 06:56:29 +03:00
parent ff46e417f4
commit c716520ca4
8 changed files with 392 additions and 12 deletions

View File

@@ -332,7 +332,7 @@ class Tools:
async with db_pool.acquire() as db_connection:
async with db_connection.cursor() as cursor:
await cursor.execute(
"SELECT * FROM `contacts` WHERE owner_id = %s",
"SELECT * FROM `contacts` WHERE owner_id = %s AND is_blocked = FALSE",
(owner_id,),
)
rows = await cursor.fetchall()
@@ -501,3 +501,48 @@ class Tools:
)
return updated_config
async def collect_presence(self, contact_ids, clients, db_pool):
"""Собирает статусы пользователей"""
now = int(time.time())
presence = {}
# Список тех, кого нужно поискать в базе данных
db_lookup_ids = []
# Проходимся по всем айдишникам,
# которые передал нам клиент
for contact_id in contact_ids:
contact_id = int(contact_id)
client = clients.get(contact_id)
# Если пользователь онлайн
if client and client.get("status") == 2:
presence[str(contact_id)] = {"seen": now, "status": 2}
# Если пользователь подключен,
# но не взаимодействует с клиентом
elif client and client.get("last_seen"):
presence[str(contact_id)] = {"seen": client.get("last_seen")}
# А если никакое условие не подошло, то добавляем его в лист,
# а позже посмотрим в базе данных
else:
db_lookup_ids.append(contact_id)
# Проходимся по листу и добавляем недостающих,
# если такие существуют конечно
async with db_pool.acquire() as conn:
async with conn.cursor() as cursor:
for contact_id in db_lookup_ids:
await cursor.execute(
"SELECT lastseen FROM users WHERE id = %s",
(contact_id,)
)
row = await cursor.fetchone()
if row:
lastseen = row.get("lastseen")
presence[int(contact_id)] = {"seen": int(lastseen)}
return presence

View File

@@ -74,6 +74,20 @@ class OnemeController(ControllerBase):
packet = self.proto.pack_packet(
cmd=0, seq=1, opcode=self.opcodes.NOTIF_PROFILE, payload=payload
)
elif eventType == "presence":
userId = eventData.get("userId")
presence = eventData.get("presence")
event_time = eventData.get("time")
payload = {
"userId": userId,
"presence": presence,
"time": event_time
}
packet = self.proto.pack_packet(
cmd=0, seq=1, opcode=self.opcodes.NOTIF_PRESENCE, payload=payload
)
# Отправляем пакет
writer.write(packet)

View File

@@ -141,3 +141,12 @@ class ChatSubscribePayloadModel(pydantic.BaseModel):
class ContactListPayloadModel(pydantic.BaseModel):
status: str
count: int = None
class ContactPresencePayloadModel(pydantic.BaseModel):
contactIds: list
class ContactUpdatePayloadModel(pydantic.BaseModel):
action: str
contactId: int
firstName: str
lastName: str = None

View File

@@ -607,6 +607,10 @@ class AuthProcessors(BaseProcessor):
user.get("id"), self.db_pool, self.config.avatar_base_url
)
# Собираем статусы контактов
contact_ids = [c.get("id") for c in contacts if c.get("id") is not None]
presence = await self.tools.collect_presence(contact_ids, self.clients, self.db_pool)
# Формируем данные пакета
payload = {
"profile": profile,
@@ -614,17 +618,17 @@ class AuthProcessors(BaseProcessor):
"chatMarker": 0,
"messages": {},
"contacts": contacts,
"presence": {},
"presence": presence,
"config": {
"hash": "0",
"server": self.server_config,
"user": updated_user_config,
},
"token": token,
"videoChatHistory": False,
"time": int(time.time() * 1000),
}
} # Собираем пакет
# Собираем пакет
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.LOGIN, payload=payload
)

View File

@@ -1,6 +1,8 @@
import pydantic
import json
import time
from classes.baseprocessor import BaseProcessor
from oneme.models import ContactListPayloadModel
from oneme.models import ContactListPayloadModel, ContactPresencePayloadModel, ContactUpdatePayloadModel
class ContactsProcessors(BaseProcessor):
async def contact_list(self, payload, seq, writer, userId):
@@ -64,3 +66,123 @@ class ContactsProcessors(BaseProcessor):
# Отправляем пакет
await self._send(writer, packet)
async def contact_update(self, payload, seq, writer, userId):
"""
Обработчик опкода какого-то там
(их хуй запомнишь, даже в мриме команды помню, бля)
Отвечает за добавку, удаление, блокировку и разблокировку контакта
"""
# Валидируем данные пакета
try:
ContactUpdatePayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.CONTACT_UPDATE, self.error_types.INVALID_PAYLOAD, writer)
return
action = payload.get("action")
contactId = payload.get("contactId")
firstName = payload.get("firstName")
lastName = payload.get("lastName", "")
if action == "ADD":
# Проверяем, существует ли пользователь с таким ID
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM users WHERE id = %s", (contactId,))
user = await cursor.fetchone()
if not user:
await self._send_error(seq, self.opcodes.CONTACT_UPDATE, self.error_types.USER_NOT_FOUND, writer)
return
# Проверяем, не добавлен ли уже контакт
await cursor.execute(
"SELECT * FROM contacts WHERE owner_id = %s AND contact_id = %s",
(userId, contactId)
)
row = await cursor.fetchone()
# Если контакта не существует, то можем продолжать,
if not row:
# Добавляем контакт
await cursor.execute(
"INSERT INTO contacts (owner_id, contact_id, custom_firstname, custom_lastname, is_blocked) VALUES (%s, %s, %s, %s, FALSE)",
(userId, contactId, firstName, lastName)
)
# а если уже существует, отправляем ошибку
else:
await self._send_error(seq, self.opcodes.CONTACT_UPDATE, self.error_types.CONTACT_ALREADY_EXISTS, writer)
return
# Генерируем профиль
photoId = None if not user.get("avatar_id") else int(user.get("avatar_id"))
avatar_url = None if not photoId else self.config.avatar_base_url + str(photoId)
contact = self.tools.generate_profile(
id=user.get("id"),
phone=int(user.get("phone")),
avatarUrl=avatar_url,
photoId=photoId,
updateTime=int(user.get("updatetime")),
firstName=user.get("firstname"),
lastName=user.get("lastname"),
options=json.loads(user.get("options")),
accountStatus=int(user.get("accountstatus")),
includeProfileOptions=False,
custom_firstname=firstName,
custom_lastname=lastName,
)
response_payload = {
"contact": contact
}
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CONTACT_UPDATE, payload=response_payload
)
await self._send(writer, packet)
elif action == "REMOVE":
# Удаляем контакт
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"DELETE FROM contacts WHERE owner_id = %s AND contact_id = %s",
(userId, contactId)
)
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CONTACT_UPDATE, payload=None
)
await self._send(writer, packet)
async def contact_presence(self, payload, seq, writer):
"""Обработчик получения статуса контактов"""
# Валидируем данные пакета
try:
ContactPresencePayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
await self._send_error(seq, self.opcodes.CONTACT_PRESENCE, self.error_types.INVALID_PAYLOAD, writer)
return
contact_ids = payload.get("contactIds", [])
now_ms = int(time.time() * 1000)
presence = await self.tools.collect_presence(contact_ids, self.clients, self.db_pool)
response_payload = {
"presence": presence,
"time": now_ms
}
packet = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.CONTACT_PRESENCE, payload=response_payload
)
await self._send(writer, packet)

View File

@@ -1,5 +1,6 @@
import pydantic
import json
import time
from classes.baseprocessor import BaseProcessor
from oneme.models import (
HelloPayloadModel,
@@ -30,6 +31,7 @@ class MainProcessors(BaseProcessor):
# Данные пакета
payload = {
"callsSeed": int(time.time() * 1000),
"location": "RU",
"app-update-type": 0, # 1 = принудительное обновление
"reg-country-code": self.static.REG_COUNTRY_CODES,
@@ -47,7 +49,7 @@ class MainProcessors(BaseProcessor):
await self._send(writer, packet)
return deviceType, deviceName, appVersion
async def ping(self, payload, seq, writer):
async def ping(self, payload, seq, writer, userId=None):
"""Обработчик пинга"""
# Валидируем данные пакета
try:
@@ -57,6 +59,58 @@ class MainProcessors(BaseProcessor):
await self._send_error(seq, self.opcodes.PING, self.error_types.INVALID_PAYLOAD, writer)
return
# Обновляем статус пользователя, если он авторизован
# и в пакете отправлен интерактив
interactive = payload.get("interactive") if payload else None
if userId and interactive is not None:
now = int(time.time())
user = self.clients.get(userId)
if user:
if interactive:
user["status"] = 2
user["last_seen"] = now
else:
user["status"] = 0
user["last_seen"] = now
# Сохраняем последнее время посещения
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"UPDATE users SET lastseen = %s WHERE id = %s",
(str(now), userId)
)
# Рассылаем статус контактам пользователя
now_ms = int(time.time() * 1000)
if interactive:
presence_data = {"on": "ON", "seen": now, "status": 1}
else:
presence_data = {"seen": now}
# Находим всех, у кого этот пользователь в контактах
async with self.db_pool.acquire() as conn2:
async with conn2.cursor() as cursor2:
await cursor2.execute(
"SELECT owner_id FROM contacts WHERE contact_id = %s",
(userId,)
)
contact_owners = await cursor2.fetchall()
# Рассылаем
for row in contact_owners:
owner_id = int(row.get("owner_id"))
if owner_id in self.clients:
await self.event(
owner_id,
{
"eventType": "presence",
"userId": userId,
"presence": presence_data,
"time": now_ms,
}
)
# Собираем пакет
response = self.proto.pack_packet(
cmd=self.proto.CMD_OK, seq=seq, opcode=self.opcodes.PING, payload=None

View File

@@ -1,5 +1,6 @@
import asyncio
import logging
import time
import traceback
from common.opcodes import Opcodes
@@ -32,7 +33,7 @@ class OnemeMobile:
self.opcodes = Opcodes()
# rate limiter anti ddos brute force protection
self.auth_rate_limiter = RateLimiter(max_attempts=5, window_seconds=60)
self.auth_rate_limiter = RateLimiter(max_attempts=15, window_seconds=60)
self.read_timeout = 300 # Таймаут чтения из сокета (секунды)
self.max_read_size = 65536 # Максимальный размер данных из сокета
@@ -156,7 +157,7 @@ class OnemeMobile:
)
break
case self.opcodes.PING:
await self.processors.ping(payload, seq, writer)
await self.processors.ping(payload, seq, writer, userId)
case self.opcodes.LOG:
await self.processors.log(payload, seq, writer)
case self.opcodes.ASSETS_UPDATE:
@@ -301,6 +302,23 @@ class OnemeMobile:
userPhone,
hashedToken,
)
case self.opcodes.CONTACT_UPDATE:
await self.auth_required(
userPhone,
self.processors.contact_update,
payload,
seq,
writer,
userId,
)
case self.opcodes.CONTACT_PRESENCE:
await self.auth_required(
userPhone,
self.processors.contact_presence,
payload,
seq,
writer
)
case _:
self.logger.warning(f"Неизвестный опкод {opcode}")
except Exception as e:
@@ -332,6 +350,8 @@ class OnemeMobile:
self.clients[id] = {
"phone": phone,
"id": id,
"status": 2,
"last_seen": 0,
"clients": [
{
"writer": writer,
@@ -342,6 +362,38 @@ class OnemeMobile:
],
}
await self._broadcast_presence(id, True)
async def _broadcast_presence(self, userId, online):
now = int(time.time())
now_ms = int(time.time() * 1000)
if online:
presence_data = {"on": "ON", "seen": now, "status": 1}
else:
presence_data = {"seen": now}
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"SELECT owner_id FROM contacts WHERE contact_id = %s",
(userId,)
)
contact_owners = await cursor.fetchall()
for row in contact_owners:
owner_id = int(row.get("owner_id"))
if owner_id in self.clients:
await self.processors.event(
owner_id,
{
"eventType": "presence",
"userId": userId,
"presence": presence_data,
"time": now_ms,
}
)
async def _end_session(self, id, ip, port):
"""Завершение сессии"""
# Получаем пользователя в списке
@@ -357,6 +409,20 @@ class OnemeMobile:
if (client.get("ip"), client.get("port")) == (ip, port):
clients.pop(i)
if not clients:
now = int(time.time())
user["status"] = 0
user["last_seen"] = now
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"UPDATE users SET lastseen = %s WHERE id = %s",
(str(now), id)
)
await self._broadcast_presence(id, False)
async def start(self):
"""Функция для запуска сервера"""
self.server = await asyncio.start_server(

View File

@@ -1,4 +1,5 @@
import logging
import time
import traceback
import websockets
from common.proto_web import WebProto
@@ -130,7 +131,7 @@ class OnemeWS:
)
break
case self.opcodes.PING:
await self.processors.ping(payload, seq, websocket)
await self.processors.ping(payload, seq, websocket, userId)
case self.opcodes.LOG:
await self.processors.log(payload, seq, websocket)
case self.opcodes.ASSETS_UPDATE:
@@ -275,6 +276,23 @@ class OnemeWS:
userPhone,
hashedToken,
)
case self.opcodes.CONTACT_UPDATE:
await self.auth_required(
userPhone,
self.processors.contact_update,
payload,
seq,
websocket,
userId,
)
case self.opcodes.CONTACT_PRESENCE:
await self.auth_required(
userPhone,
self.processors.contact_presence,
payload,
seq,
websocket
)
case _:
self.logger.warning(f"Неизвестный опкод {opcode}")
except websockets.exceptions.ConnectionClosed:
@@ -308,6 +326,8 @@ class OnemeWS:
self.clients[id] = {
"phone": phone,
"id": id,
"status": 2,
"last_seen": 0,
"clients": [
{
"writer": websocket,
@@ -318,6 +338,38 @@ class OnemeWS:
]
}
await self._broadcast_presence(id, True)
async def _broadcast_presence(self, userId, online):
now = int(time.time())
now_ms = int(time.time() * 1000)
if online:
presence_data = {"on": "ON", "seen": now, "status": 1}
else:
presence_data = {"seen": now}
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"SELECT owner_id FROM contacts WHERE contact_id = %s",
(userId,)
)
contact_owners = await cursor.fetchall()
for row in contact_owners:
owner_id = int(row.get("owner_id"))
if owner_id in self.clients:
await self.processors.event(
owner_id,
{
"eventType": "presence",
"userId": userId,
"presence": presence_data,
"time": now_ms,
}
)
async def _end_session(self, id, ip, port):
"""Завершение сессии"""
# Получаем пользователя в списке
@@ -333,6 +385,20 @@ class OnemeWS:
if (client.get("ip"), client.get("port")) == (ip, port):
clients.pop(i)
if not clients:
now = int(time.time())
user["status"] = 0
user["last_seen"] = now
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"UPDATE users SET lastseen = %s WHERE id = %s",
(str(now), id)
)
await self._broadcast_presence(id, False)
async def start(self):
"""Функция для запуска WebSocket сервера"""
self.server = await websockets.serve(