Files
openmax-server/src/common/tools.py
zavolo fa0ed34adc MAX: история таки заработала — cid/link/reactionInfo обязательны в схеме
Десктопный MAX подключается через TCP (mobile-протокол) и парсит
msgpack по фиксированной схеме. Если в сообщении выпадает любое из
полей — клиент молча обрывает соединение. После 87cfc19 как раз
такие условные `if elements: ...` / `if link: ...` (а link и
reaction_info там всегда были `{}`, то есть falsy) вырезали поля
из ответа CHAT_HISTORY и MSG_SEND, чем и сломали историю.

- src/common/tools.py: новый build_message_dict() — единая сборка
  тела сообщения, где все поля (id, cid, time, type, sender, text,
  attaches, elements, reactionInfo, link) присутствуют ВСЕГДА.
  get_last_message переписан через него.
- src/oneme/processors/history.py: chat_history использует
  build_message_dict вместо ручной логики с условными if-ками.
- src/oneme/processors/messages.py: msg_send.bodyMessage теперь
  отдает cid / reactionInfo / link даже пустыми и приводит id
  к int для mobile, str для web.

Цепная польза: auth.py LOGIN bootstrap (через generate_chats →
get_last_message) и search.py тоже теперь шлют корректную схему.
2026-05-10 22:17:18 +03:00

598 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import random
import secrets
import time
import geoip2.database
class Tools:
def __init__(self):
pass
def build_message_dict(self, row, protocol_type="mobile"):
"""Унифицированная сборка тела сообщения для отправки клиенту.
Десктоп MAX (TCP, protocol_type='mobile') и официальный
api.oneme.ru ожидают, что в сообщении будут ВСЕГДА присутствовать
поля cid / elements / link / reactionInfo, даже если они пустые.
Любое отсутствие поля приводит к тому, что клиент бросает соединение
при разборе msgpack-схемы (классическая регрессия из коммита 87cfc19).
"""
try:
attaches = json.loads(row.get("attaches") or "[]")
except (TypeError, ValueError):
attaches = []
try:
elements = json.loads(row.get("elements") or "[]")
except (TypeError, ValueError):
elements = []
message = {
"id": row.get("id") if protocol_type == "mobile" else str(row.get("id")),
"cid": int(row.get("cid") or 0),
"time": int(row.get("time")),
"type": row.get("type"),
"sender": row.get("sender"),
"text": row.get("text") or "",
"attaches": attaches if isinstance(attaches, list) else [],
"elements": elements if isinstance(elements, list) else [],
"reactionInfo": {},
"link": {},
}
return message
def generate_profile(
self,
id=1,
phone=70000000000,
avatarUrl=None,
photoId=None,
updateTime=0,
firstName="Test",
lastName="Account",
options=[],
description=None,
accountStatus=0,
profileOptions=[],
includeProfileOptions=True,
username=None,
# для контактов, собственно
custom_firstname=None,
custom_lastname=None,
blocked=False
):
contact = {
"id": id,
"updateTime": updateTime,
"phone": phone,
"names": [
{
"name": firstName,
"firstName": firstName,
"lastName": lastName,
"type": "ONEME",
}
],
"options": options,
"accountStatus": accountStatus,
"location": "RU",
"registrationTime": int(time.time() * 1000)
}
if avatarUrl:
contact["photoId"] = photoId
contact["baseUrl"] = avatarUrl
contact["baseRawUrl"] = avatarUrl
if description:
contact["description"] = description
if username:
contact["link"] = "https://max.ru/" + username
if custom_firstname:
contact["names"].append(
{
"name": custom_firstname,
"firstName": custom_firstname,
"lastName": custom_lastname,
"type": "CUSTOM"
}
)
if blocked:
contact["status"] = "BLOCKED"
if includeProfileOptions:
return {"contact": contact, "profileOptions": profileOptions}
else:
return contact
def generate_profile_tt(
self,
id=1,
phone=70000000000,
avatarUrl=None,
photoId=None,
updateTime=0,
firstName="Test",
lastName="Account",
options=[],
description=None,
username=None,
):
contact = {
"id": id,
"updateTime": updateTime,
"phone": phone,
"names": [{"name": f"{firstName} {lastName}", "type": "TT"}],
"options": options,
}
if avatarUrl:
contact["photoId"] = photoId
contact["baseUrl"] = avatarUrl
contact["baseRawUrl"] = avatarUrl
if description:
contact["description"] = description
if username:
contact["link"] = "https://tamtam.chat/" + username
return contact
def generate_chat(
self, id, owner, type, participants, lastMessage, lastEventTime, prevMessageId=0
):
"""Генерация чата"""
# Генерируем список участников
if isinstance(participants, dict):
result_participants = {
int(k): int(v) if v is not None else 0 for k, v in participants.items()
}
else:
# assume list
result_participants = {int(participant): 0 for participant in participants}
result = None
# Генерируем нужный список в зависимости от типа чата
if type == "DIALOG":
result = {
"id": id,
"type": type,
"status": "ACTIVE",
"owner": owner,
"participants": result_participants,
"lastMessage": lastMessage,
"lastEventTime": lastEventTime,
"lastDelayedUpdateTime": 0,
"lastFireDelayedErrorTime": 0,
"created": 1,
"cid": id,
"prevMessageId": prevMessageId,
"joinTime": 1,
"modified": lastEventTime,
}
# Возвращаем
return result
async def generate_chats(
self,
chatIds,
db_pool,
senderId,
include_favourites=True,
protocol_type="mobile",
):
"""Генерирует чаты для отдачи клиенту"""
# Готовый список с чатами
chats = []
# Формируем список чатов
for chatId in chatIds:
async with db_pool.acquire() as db_connection:
async with db_connection.cursor() as cursor:
# Получаем чат по id
await cursor.execute(
"SELECT * FROM `chats` WHERE id = %s", (chatId,)
)
row = await cursor.fetchone()
if row:
# Получаем последнее сообщение из чата
message, messageTime = await self.get_last_message(
chatId, db_pool, protocol_type=protocol_type
)
# Формируем список участников с временем последней активности
participant_ids = await self.get_chat_participants(
chatId, db_pool
)
participants = await self.get_participant_last_activity(
chatId, participant_ids, db_pool
)
# Получаем ID предыдущего сообщения
prevMessageId = await self.get_previous_message_id(
chatId, db_pool, protocol_type=protocol_type
)
# Выносим результат в лист
chats.append(
self.generate_chat(
row.get("id"),
row.get("owner"),
row.get("type"),
participants,
message,
messageTime,
prevMessageId,
)
)
if include_favourites:
# Получаем последнее сообщение из избранного
favouriteChatId = -senderId
message, messageTime = await self.get_last_message(
favouriteChatId, db_pool, protocol_type=protocol_type
)
# ID избранного для клиента
chatId = senderId ^ senderId
# Получаем последнюю активность в избранном
participants = await self.get_participant_last_activity(
favouriteChatId, [senderId], db_pool
)
# Получаем ID предыдущего сообщения для избранного
prevMessageId = await self.get_previous_message_id(
favouriteChatId, db_pool, protocol_type=protocol_type
)
# Хардкодим в лист чатов избранное
chats.append(
self.generate_chat(
chatId,
senderId,
"DIALOG",
participants,
message,
messageTime,
prevMessageId,
)
)
return chats
async def generate_contacts(
self,
contacts,
db_pool,
avatar_base_url="",
):
"""
Генерация контакт-листа для отдачи клиенту
[notes]
В contacts должен поступать список вида
[
{
"firstname": "test",
"lastname": "testovich",
"id": 4323
}
]
А формировать мы должны его до вызова функции,
ибо я хочу вынести контакты в отдельную таблицу,
по моему мнению так будет намного практичнее и лучше
"""
# Готовый список с контакт-листом
contact_list = []
# Формируем список контактов
for contact in contacts:
# ID контакта
contact_id = contact.get("id")
# Имя и фамилия которые указал юзер для контакта
firstname = contact.get("firstname")
lastname = contact.get("lastname")
blocked = contact.get("blocked", False)
async with db_pool.acquire() as db_connection:
async with db_connection.cursor() as cursor:
# Получаем контакт по id
await cursor.execute(
"SELECT * FROM `users` WHERE id = %s", (contact_id,)
)
user = await cursor.fetchone()
if user:
# Аватарка с биографией
photoId = (
None
if not user.get("avatar_id")
else int(user.get("avatar_id"))
)
avatar_url = (
None
if not photoId
else avatar_base_url + str(photoId)
)
description = (
None
if not user.get("description")
else user.get("description")
)
# Создаем профиль
contact = self.generate_profile(
id=user.get("id"),
phone=int(user.get("phone")),
avatarUrl=avatar_url,
photoId=photoId,
updateTime=int(user.get("updatetime")),
firstName=user.get("firstname"),
lastName=user.get("lastname"),
options=json.loads(user.get("options")),
description=description,
accountStatus=int(user.get("accountstatus")),
includeProfileOptions=False,
username=user.get("username"),
custom_firstname=firstname,
custom_lastname=lastname,
blocked=blocked,
)
# Выносим результат в лист
contact_list.append(contact)
return contact_list
async def collect_user_contacts(
self,
owner_id,
db_pool,
avatar_base_url="",
):
"""Собирает все контакты пользователя и возвращает готовый контакт-лист"""
contacts = []
async with db_pool.acquire() as db_connection:
async with db_connection.cursor() as cursor:
await cursor.execute(
"SELECT * FROM `contacts` WHERE owner_id = %s AND is_blocked = FALSE",
(owner_id,),
)
rows = await cursor.fetchall()
for row in rows:
contacts.append(
{
"id": int(row.get("contact_id")),
"firstname": row.get("custom_firstname"),
"lastname": row.get("custom_lastname"),
"blocked": bool(row.get("is_blocked")),
}
)
return await self.generate_contacts(
contacts, db_pool, avatar_base_url=avatar_base_url
)
async def insert_message(
self, chatId, senderId, text, attaches, elements, cid, type, db_pool
):
"""Добавление сообщения в историю"""
async with db_pool.acquire() as db_connection:
async with db_connection.cursor() as cursor:
# Получаем id последнего сообщения в чате
await cursor.execute(
"SELECT id FROM `messages` WHERE chat_id = %s ORDER BY time DESC LIMIT 1",
(chatId,),
)
row = await cursor.fetchone() or {}
last_message_id = row.get("id") or 0 # последнее id сообщения в чате
message_time = int(time.time() * 1000) # время отправки сообщения
# Генерируем ID сообщения
message_id = int(time.time() * 1000000) * 1000 + random.randint(100, 999)
# Вносим новое сообщение в таблицу
await cursor.execute(
"INSERT INTO `messages` (id, chat_id, sender, time, text, attaches, cid, elements, type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
(
message_id,
chatId,
senderId,
message_time,
text,
json.dumps(attaches),
cid,
json.dumps(elements),
type,
),
)
# Возвращаем айдишки
return int(message_id), int(last_message_id), message_time
async def get_last_message(self, chatId, db_pool, protocol_type="mobile"):
"""Получение последнего сообщения в чате"""
async with db_pool.acquire() as db_connection:
async with db_connection.cursor() as cursor:
# Получаем id последнего сообщения в чате
await cursor.execute(
"SELECT * FROM `messages` WHERE chat_id = %s ORDER BY time DESC LIMIT 1",
(chatId,),
)
row = await cursor.fetchone()
# Если нет результатов - возвращаем None
if not row:
return None, None
# Возвращаем
return self.build_message_dict(row, protocol_type), int(row.get("time"))
async def get_previous_message_id(self, chatId, db_pool, protocol_type="mobile"):
"""Получение ID предыдущего сообщения (второго с конца) в чате."""
async with db_pool.acquire() as db_connection:
async with db_connection.cursor() as cursor:
await cursor.execute(
"SELECT id FROM `messages` WHERE chat_id = %s ORDER BY time DESC LIMIT 1 OFFSET 1",
(chatId,),
)
row = await cursor.fetchone()
# Если результат есть, возвращаем его
if row:
return (
row.get("id")
if protocol_type == "mobile"
else str(row.get("id"))
)
# В ином случае возвращаем 0
return 0 if protocol_type == "mobile" else "0"
async def get_participant_last_activity(self, chatId, participant_ids, db_pool):
"""Возвращает словарь {participant_id: last_activity_time} для участников чата."""
if not participant_ids:
return {}
async with db_pool.acquire() as db_connection:
async with db_connection.cursor() as cursor:
# Собираем всех участников
placeholders = ",".join(["%s"] * len(participant_ids))
query = f"""
SELECT sender, MAX(time) as last_time
FROM messages
WHERE chat_id = %s AND sender IN ({placeholders})
GROUP BY sender
"""
params = (chatId,) + tuple(participant_ids)
await cursor.execute(query, params)
rows = await cursor.fetchall()
# Собираем список участников без времени последней активности в чате
result = {int(pid): 0 for pid in participant_ids}
# Обновляем для каждого участника время последней активности в чате
for row in rows:
sender = int(row["sender"])
last_time = row["last_time"]
if last_time is not None:
result[sender] = int(last_time)
return result
async def get_chat_participants(self, chatId, db_pool):
"""Возвращает список ID участников чата из таблицы chat_participants."""
async with db_pool.acquire() as db_connection:
async with db_connection.cursor() as cursor:
await cursor.execute(
"SELECT user_id FROM chat_participants WHERE chat_id = %s",
(chatId,),
)
rows = await cursor.fetchall()
return [int(row["user_id"]) for row in rows]
async def auth_required(self, userPhone, coro, *args):
if userPhone:
await coro(*args)
async def update_user_config(self, cursor, phone, user_settings, default_settings):
"""Функция для обновления юзер конфига из бд в случае его изменения"""
user_config = json.loads(user_settings)
updated_config = {**default_settings, **user_config}
if updated_config != user_config:
await cursor.execute(
"UPDATE user_data SET user_config = %s WHERE phone = %s",
(json.dumps(updated_config), phone),
)
return updated_config
async def collect_presence(self, contact_ids, clients, db_pool):
"""Собирает статусы пользователей"""
now = int(time.time())
presence = {}
# Список тех, кого нужно поискать в базе данных
db_lookup_ids = []
# Проходимся по всем айдишникам,
# которые передал нам клиент
for contact_id in contact_ids:
contact_id = int(contact_id)
client = clients.get(contact_id)
# Если пользователь онлайн
if client and client.get("status") == 2:
presence[str(contact_id)] = {"seen": now, "status": 2}
# Если пользователь подключен,
# но не взаимодействует с клиентом
elif client and client.get("last_seen"):
presence[str(contact_id)] = {"seen": client.get("last_seen")}
# А если никакое условие не подошло, то добавляем его в лист,
# а позже посмотрим в базе данных
else:
db_lookup_ids.append(contact_id)
# Проходимся по листу и добавляем недостающих,
# если такие существуют конечно
async with db_pool.acquire() as conn:
async with conn.cursor() as cursor:
for contact_id in db_lookup_ids:
await cursor.execute(
"SELECT lastseen FROM users WHERE id = %s",
(contact_id,)
)
row = await cursor.fetchone()
if row:
lastseen = row.get("lastseen")
presence[int(contact_id)] = {"seen": int(lastseen)}
return presence
def get_geo(self, ip, db_path):
"""
Получение страны пользователя по его айпи адресу
Используется во время запуска сессии
"""
try:
with geoip2.database.Reader(db_path) as reader:
response = reader.country(ip)
return response.country.name or "Localhost Federation"
except Exception:
return "Localhost Federation"
async def generate_user_id(self, db_pool):
"""Генерация id пользователя"""
async with db_pool.acquire() as conn:
async with conn.cursor() as cursor:
while True:
user_id = secrets.randbelow(2_147_383_647) + 100_000
await cursor.execute("SELECT id FROM users WHERE id = %s", (user_id,))
if not await cursor.fetchone():
return user_id