Начальная реализация транспорта ws для max web и прочие улучшения

This commit is contained in:
Alexey Polyakov
2026-04-07 12:36:30 +03:00
parent 52949602af
commit 0ffc649dd9
19 changed files with 873 additions and 228 deletions

View File

@@ -3,5 +3,120 @@ class OnemeConfig:
pass
SERVER_CONFIG = {
"async-tracer": 0,
"presence-ttl": 300,
"non-contact-sync-time": 86400,
"contact-batching-variant": 0,
"account-nickname-enabled": True,
"web-ad-banner": {
"enabled": False
},
"edit-timeout": 0,
"reactions-menu": [],
"invite-long": "",
"calls-endpoint": "",
"calls-test-domain": "",
"max-readmarks": 100,
"max-cname-length": 200,
"max-description-length": 400,
"new-avatar-gradient-colors-enabled": True,
"max-msg-length": 4000,
"file-upload-unsupported-types": [],
"file-upload-max-size": 4294967296,
"image-quality": 0.8,
"image-width": 1920,
"image-height": 1920,
"image-size": 10000000,
"max-favorite-chats": 5,
"bot-complaint-enabled": True,
"reactions-max": 8,
"welcome-sticker-ids": [],
"edit-chat-type-screen-enabled": True,
"edit-channel-type-screen-enabled": True,
"esia-verify-botId": 0,
"official-org": False,
"esia-enabled": False,
"calls-debug-mode": False,
"channels-suggests-folder": True,
"delete-msg-fys-large-chat-disabled": False,
"calls-web-download-logs": False,
"calls-web-upload-logs": False,
"calls-video-zoom": False,
"calls-fullscreen-mode": False,
"group-call-part-limit": 100,
"call-chat-members-load-config": {},
"cfs": False,
"cse": False,
"calls-hotkeys": True,
"gc-link-pre-settings": False,
"gc-from-p2p": False,
"call-rate": {},
"channels-enabled": True,
"max-participants": 20000,
"max-added-participants": 100,
"saved-messages-aliases": [],
"author-visibility-forward-enabled": False,
"official-bot-naming-enabled": False,
"search-webapps-showcase": {
"items": []
},
"settings-entry-banners": [],
"settings-business": "https://telegram.org/blog/telegram-business",
"appearance-multi-theme-screen-enabled": True,
"moscow-theme-enabled": True,
"creation-2fa-config": {
"enabled": False,
"pass_min_len": 6,
"pass_max_len": 64,
"hint_max_len": 30
},
"lebedev-theme-enabled": True,
"quotes-enabled": True,
"channels-complaint-enabled": True,
"reactions-settings-enabled": True,
"channel-statistics-botid": 0,
"enable-unknown-contact-bottom-sheet": 0,
"informer-enabled": True,
"family-protection-botid": 0,
"new-year-theme-2026": True,
"scheduled-messages-enabled": True,
"scheduled-posts-enabled": True,
"scheduled-faves-enabled": True,
"non-contact-complaints-enabled": True,
"join-requests": True,
"web-persistent-cache": False,
"create-channel-type-screen": True,
"show-warning-links": True,
"white-list-links": [],
"february-23-26-theme": True,
"march-8-26-theme": True,
"audio-play-cmd": False,
"audio-play-opus": False,
"bots-channel-adding": True,
"stickers-botid": 0,
"sticker-set-edit-enabled": True,
"calls-new-history-enabled": True,
"y-map": {
"tile": "",
"geocoder": "",
"static": ""
},
"enable-audio-messages-transcription": True,
"enable-video-messages-transcription": True,
"retry-transcribe-attempt": 5,
"retry-transcribe-timeout": 2000,
"org-profile": False,
"media-not-ready-retry-delay": 2000,
"polls-in-chats": True,
"polls-in-channels": True,
"render-polls": True,
"poll-ttl": {
"chat": 5000,
"bigchat": 15000,
"channel": 25000
},
"new-collage": False,
"channel-profile-invite-link": False,
"rename-profile-to-settings": True,
"live-streams": True
}

View File

@@ -1,5 +1,6 @@
import asyncio
from oneme.socket import OnemeMobileServer
from oneme.socket import OnemeMobile
from oneme.websocket import OnemeWS
from common.proto_tcp import MobileProto
from common.proto_web import WebProto
from classes.controllerbase import ControllerBase
@@ -78,7 +79,7 @@ class OnemeController(ControllerBase):
def launch(self, api):
async def _start_all():
await asyncio.gather(
OnemeMobileServer(
OnemeMobile(
host=self.config.host,
port=self.config.oneme_tcp_port,
ssl_context=api['ssl'],
@@ -86,6 +87,14 @@ class OnemeController(ControllerBase):
clients=api['clients'],
send_event=api['event'],
telegram_bot=api.get('telegram_bot'),
).start(),
OnemeWS(
host=self.config.host,
port=self.config.oneme_ws_port,
clients=api['clients'],
ssl_context=api['ssl'],
db_pool=api['db'],
send_event=api['event']
).start()
)

View File

@@ -48,7 +48,7 @@ class LoginPayloadModel(pydantic.BaseModel):
token: str
class PingPayloadModel(pydantic.BaseModel):
interactive: bool
interactive: bool = None
class AssetsPayloadModel(pydantic.BaseModel):
sync: int
@@ -59,11 +59,11 @@ class GetCallHistoryPayloadModel(pydantic.BaseModel):
count: int
class MessageModel(pydantic.BaseModel):
isLive: bool
detectShare: bool
elements: list
isLive: bool = None
detectShare: bool = None
elements: list = None
attaches: list = None
cid: int
cid: int = None
text: str = None
class SendMessagePayloadModel(pydantic.BaseModel):

View File

@@ -34,7 +34,7 @@ class AuthProcessors(BaseProcessor):
phone = payload.get("phone").replace("+", "").replace(" ", "").replace("-", "")
# Генерируем токен
token = secrets.token_urlsafe(128)
token = secrets.token_urlsafe(102)
token_hash = hashlib.sha256(token.encode()).hexdigest()
# Время истечения токена
@@ -282,7 +282,7 @@ class AuthProcessors(BaseProcessor):
"""
INSERT INTO user_data
(phone, contacts, folders, user_config, chat_config)
VALUES (%s %s, %s, %s, %s)
VALUES (%s, %s, %s, %s, %s)
""",
(
phone,
@@ -409,7 +409,7 @@ class AuthProcessors(BaseProcessor):
)
chats = await self.tools.generate_chats(
chats, self.db_pool, user.get("id")
chats, self.db_pool, user.get("id"), protocol_type=self.type
)
# Формируем данные пакета

View File

@@ -58,8 +58,9 @@ class HistoryProcessors(BaseProcessor):
result = await cursor.fetchall()
for row in result:
# TODO: Сборку тела сообщения нужно вынести в отдельную функцию
messages.append({
"id": row.get("id"),
"id": row.get("id") if self.type == 'mobile' else str(row.get('id')),
"time": int(row.get("time")),
"type": row.get("type"),
"sender": row.get("sender"),

View File

@@ -32,6 +32,7 @@ class MainProcessors(BaseProcessor):
"app-update-type": 0, # 1 = принудительное обновление
"reg-country-code": self.static.REG_COUNTRY_CODES,
"phone-auto-complete-enabled": False,
"qr-auth-enabled": False,
"lang": True
}

View File

@@ -83,7 +83,7 @@ class MessagesProcessors(BaseProcessor):
# Вычисляем ID чата по ID пользователя и ID отправителя,
# в случае отсутствия ID чата
if not chatId:
if chatId is None:
chatId = userId ^ senderId
# Если клиент хочет отправить сообщение в избранное,

View File

@@ -76,48 +76,69 @@ class SearchProcessors(BaseProcessor):
# Валидируем данные пакета
try:
SearchByPhonePayloadModel.model_validate(payload)
except pydantic.ValidationError as error:
self.logger.error(f"Возникли ошибки при валидации пакета: {error}")
except Exception as e:
await self._send_error(seq, self.opcodes.CONTACT_INFO_BY_PHONE, self.error_types.INVALID_PAYLOAD, writer)
return
# Ищем пользователя в бд
phone = payload.get("phone").replace("+", "").replace(" ", "").replace("-", "")
async with self.db_pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM users WHERE phone = %s", (phone,))
await cursor.execute("SELECT * FROM users WHERE phone = %s", (int(payload.get("phone")),))
user = await cursor.fetchone()
# Если пользователь найден
if user:
# Аватарка с биографией
photoId = None if not user.get("avatar_id") else int(user.get("avatar_id"))
avatar_url = None if not photoId else self.config.avatar_base_url + photoId
description = None if not user.get("description") else user.get("description")
# Если пользователь не найден, отправляем ошибку
if not user:
await self._send_error(seq, self.opcodes.CONTACT_INFO_BY_PHONE, self.error_types.USER_NOT_FOUND, writer)
return
# ID чата
chatId = senderId ^ user.get("id")
# Генерируем профиль
profile = self.tools.generate_profile(
id=user.get("id"),
phone=int(user.get("phone")),
avatarUrl=avatar_url,
photoId=photoId,
updateTime=int(user.get("updatetime")),
firstName=user.get("firstname"),
lastName=user.get("lastname"),
options=json.loads(user.get("options")),
description=description,
accountStatus=int(user.get("accountstatus")),
profileOptions=json.loads(user.get("profileoptions")),
includeProfileOptions=False,
username=user.get("username")
# Ищем диалог в бд
await cursor.execute("SELECT * FROM chats WHERE id = %s", (chatId,))
chat = await cursor.fetchone()
# Если диалога нет - создаем
if not chat:
await cursor.execute(
"INSERT INTO chats (id, owner, type) VALUES (%s, %s, %s)",
(chatId, senderId, "DIALOG")
)
else:
profile = None
# Добавляем участников в таблицу chat_participants
participants = [int(senderId), int(user.get("id"))]
for user_id in participants:
await cursor.execute(
"INSERT INTO chat_participants (chat_id, user_id) VALUES (%s, %s)",
(chatId, user_id)
)
# Аватарка с биографией
photoId = None if not user.get("avatar_id") else int(user.get("avatar_id"))
avatar_url = None if not photoId else self.config.avatar_base_url + photoId
description = None if not user.get("description") else user.get("description")
# Генерируем профиль
profile = self.tools.generate_profile(
id=user.get("id"),
phone=int(user.get("phone")),
avatarUrl=avatar_url,
photoId=photoId,
updateTime=int(user.get("updatetime")),
firstName=user.get("firstname"),
lastName=user.get("lastname"),
options=json.loads(user.get("options")),
description=description,
accountStatus=int(user.get("accountstatus")),
profileOptions=json.loads(user.get("profileoptions")),
includeProfileOptions=False,
username=user.get("username")
)
# Создаем данные пакета
payload = {
"profile": profile
"contact": profile
}
# Создаем пакет
@@ -162,7 +183,7 @@ class SearchProcessors(BaseProcessor):
# Получаем последнее сообщение из чата
message, messageTime = await self.tools.get_last_message(
chatId, self.db_pool
chatId, self.db_pool, protocol_type=self.type
)
# Добавляем чат в список
@@ -176,7 +197,7 @@ class SearchProcessors(BaseProcessor):
else:
# Получаем последнее сообщение из чата
message, messageTime = await self.tools.get_last_message(
senderId, self.db_pool
senderId, self.db_pool, protocol_type=self.type
)
# ID избранного

View File

@@ -1,12 +1,18 @@
import asyncio, logging, traceback
import asyncio
import logging
import traceback
from common.opcodes import Opcodes
from common.proto_tcp import MobileProto
from oneme.processors import Processors
from common.rate_limiter import RateLimiter
from common.tools import Tools
from common.opcodes import Opcodes
from oneme.processors import Processors
class OnemeMobileServer:
def __init__(self, host, port, ssl_context, db_pool, clients, send_event, telegram_bot):
class OnemeMobile:
def __init__(
self, host, port, ssl_context, db_pool, clients, send_event, telegram_bot
):
self.host = host
self.port = port
self.ssl_context = ssl_context
@@ -17,14 +23,19 @@ class OnemeMobileServer:
self.proto = MobileProto()
self.auth_required = Tools().auth_required
self.processors = Processors(db_pool=db_pool, clients=clients, send_event=send_event, telegram_bot=telegram_bot)
self.processors = Processors(
db_pool=db_pool,
clients=clients,
send_event=send_event,
telegram_bot=telegram_bot,
)
self.opcodes = Opcodes()
# rate limiter anti ddos brute force protection
self.auth_rate_limiter = RateLimiter(max_attempts=5, window_seconds=60)
self.read_timeout = 300 # Таймаут чтения из сокета (секунды)
self.max_read_size = 65536 # Максимальный размер данных из сокета
self.read_timeout = 300 # Таймаут чтения из сокета (секунды)
self.max_read_size = 65536 # Максимальный размер данных из сокета
async def handle_client(self, reader, writer):
"""Функция для обработки подключений"""
@@ -44,28 +55,32 @@ class OnemeMobileServer:
# Читаем новые данные из сокета с таймаутом
try:
data = await asyncio.wait_for(
reader.read(self.max_read_size),
timeout=self.read_timeout
reader.read(self.max_read_size), timeout=self.read_timeout
)
except asyncio.TimeoutError:
self.logger.info(f"Таймаут соединения для {address[0]}:{address[1]}")
self.logger.info(
f"Таймаут соединения для {address[0]}:{address[1]}"
)
break
# Если сокет закрыт - выходим из цикла
if not data:
break
if len(data) > self.max_read_size:
self.logger.warning(f"Пакет от {address[0]}:{address[1]} превышает лимит ({len(data)} байт)")
self.logger.warning(
f"Пакет от {address[0]}:{address[1]} превышает лимит ({len(data)} байт)"
)
break
# Распаковываем данные
packet = self.proto.unpack_packet(data)
# Скип если пакет невалидный
# Скип если пакет невалидный
if packet is None:
self.logger.warning(f"Невалидный пакет от {address[0]}:{address[1]}")
self.logger.warning(
f"Невалидный пакет от {address[0]}:{address[1]}"
)
continue
opcode = packet.get("opcode")
@@ -74,34 +89,70 @@ class OnemeMobileServer:
match opcode:
case self.opcodes.SESSION_INIT:
deviceType, deviceName = await self.processors.session_init(payload, seq, writer)
deviceType, deviceName = await self.processors.session_init(
payload, seq, writer
)
case self.opcodes.AUTH_REQUEST:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.AUTH_REQUEST, self.processors.error_types.RATE_LIMITED, writer)
await self.processors._send_error(
seq,
self.opcodes.AUTH_REQUEST,
self.processors.error_types.RATE_LIMITED,
writer,
)
else:
await self.processors.auth_request(payload, seq, writer)
case self.opcodes.AUTH:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.AUTH, self.processors.error_types.RATE_LIMITED, writer)
await self.processors._send_error(
seq,
self.opcodes.AUTH,
self.processors.error_types.RATE_LIMITED,
writer,
)
else:
await self.processors.auth(payload, seq, writer, deviceType, deviceName)
await self.processors.auth(
payload, seq, writer, deviceType, deviceName
)
case self.opcodes.AUTH_CONFIRM:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.AUTH_CONFIRM, self.processors.error_types.RATE_LIMITED, writer)
await self.processors._send_error(
seq,
self.opcodes.AUTH_CONFIRM,
self.processors.error_types.RATE_LIMITED,
writer,
)
elif payload and payload.get("tokenType") == "REGISTER":
await self.processors.auth_confirm(payload, seq, writer, deviceType, deviceName)
await self.processors.auth_confirm(
payload, seq, writer, deviceType, deviceName
)
else:
self.logger.warning(f"AUTH_CONFIRM с неизвестным tokenType: {payload}")
self.logger.warning(
f"AUTH_CONFIRM с неизвестным tokenType: {payload}"
)
case self.opcodes.LOGIN:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.LOGIN, self.processors.error_types.RATE_LIMITED, writer)
await self.processors._send_error(
seq,
self.opcodes.LOGIN,
self.processors.error_types.RATE_LIMITED,
writer,
)
else:
userPhone, userId, hashedToken = await self.processors.login(payload, seq, writer)
(
userPhone,
userId,
hashedToken,
) = await self.processors.login(payload, seq, writer)
if userPhone:
await self._finish_auth(writer, address, userPhone, userId)
await self._finish_auth(
writer, address, userPhone, userId
)
case self.opcodes.LOGOUT:
await self.processors.logout(seq, writer, hashedToken=hashedToken)
await self.processors.logout(
seq, writer, hashedToken=hashedToken
)
break
case self.opcodes.PING:
await self.processors.ping(payload, seq, writer)
@@ -109,35 +160,75 @@ class OnemeMobileServer:
await self.processors.log(payload, seq, writer)
case self.opcodes.ASSETS_UPDATE:
await self.auth_required(
userPhone, self.processors.assets_update, payload, seq, writer
userPhone,
self.processors.assets_update,
payload,
seq,
writer,
)
case self.opcodes.VIDEO_CHAT_HISTORY:
await self.auth_required(
userPhone, self.processors.video_chat_history, payload, seq, writer
userPhone,
self.processors.video_chat_history,
payload,
seq,
writer,
)
case self.opcodes.MSG_SEND:
await self.auth_required(
userPhone, self.processors.msg_send, payload, seq, writer, userId, self.db_pool
userPhone,
self.processors.msg_send,
payload,
seq,
writer,
userId,
self.db_pool,
)
case self.opcodes.FOLDERS_GET:
await self.auth_required(
userPhone, self.processors.folders_get, payload, seq, writer, userPhone
userPhone,
self.processors.folders_get,
payload,
seq,
writer,
userPhone,
)
case self.opcodes.SESSIONS_INFO:
await self.auth_required(
userPhone, self.processors.sessions_info, payload, seq, writer, userPhone, hashedToken
userPhone,
self.processors.sessions_info,
payload,
seq,
writer,
userPhone,
hashedToken,
)
case self.opcodes.CHAT_INFO:
await self.auth_required(
userPhone, self.processors.chat_info, payload, seq, writer, userId
userPhone,
self.processors.chat_info,
payload,
seq,
writer,
userId,
)
case self.opcodes.CHAT_HISTORY:
await self.auth_required(
userPhone, self.processors.chat_history, payload, seq, writer, userId
userPhone,
self.processors.chat_history,
payload,
seq,
writer,
userId,
)
case self.opcodes.CONTACT_INFO_BY_PHONE:
await self.auth_required(
userPhone, self.processors.contact_info_by_phone, payload, seq, writer, userId
userPhone,
self.processors.contact_info_by_phone,
payload,
seq,
writer,
userId,
)
case self.opcodes.OK_TOKEN:
await self.auth_required(
@@ -145,15 +236,28 @@ class OnemeMobileServer:
)
case self.opcodes.MSG_TYPING:
await self.auth_required(
userPhone, self.processors.msg_typing, payload, seq, writer, userId
userPhone,
self.processors.msg_typing,
payload,
seq,
writer,
userId,
)
case self.opcodes.CONTACT_INFO:
await self.auth_required(
userPhone, self.processors.contact_info, payload, seq, writer
userPhone,
self.processors.contact_info,
payload,
seq,
writer,
)
case self.opcodes.COMPLAIN_REASONS_GET:
await self.auth_required(
userPhone, self.processors.complain_reasons_get, payload, seq, writer
userPhone,
self.processors.complain_reasons_get,
payload,
seq,
writer,
)
case self.opcodes.PROFILE:
await self.processors.profile(
@@ -161,12 +265,18 @@ class OnemeMobileServer:
)
case self.opcodes.CHAT_SUBSCRIBE:
await self.auth_required(
userPhone, self.processors.chat_subscribe, payload, seq, writer
userPhone,
self.processors.chat_subscribe,
payload,
seq,
writer,
)
case _:
self.logger.warning(f"Неизвестный опкод {opcode}")
except Exception as e:
self.logger.error(f"Произошла ошибка при работе с клиентом {address[0]}:{address[1]}: {e}")
self.logger.error(
f"Произошла ошибка при работе с клиентом {address[0]}:{address[1]}: {e}"
)
traceback.print_exc()
# Удаляем клиента из словаря
@@ -174,7 +284,9 @@ class OnemeMobileServer:
await self._end_session(userId, address[0], address[1])
writer.close()
self.logger.info(f"Прекратил работать работать с клиентом {address[0]}:{address[1]}")
self.logger.info(
f"Прекратил работать работать с клиентом {address[0]}:{address[1]}"
)
async def _finish_auth(self, writer, addr, phone, id):
"""Завершение открытия сессии"""
@@ -184,12 +296,7 @@ class OnemeMobileServer:
# Добавляем новое подключение в словарь
if user:
user["clients"].append(
{
"writer": writer,
"ip": addr[0],
"port": addr[1],
"protocol": "oneme"
}
{"writer": writer, "ip": addr[0], "port": addr[1], "protocol": "oneme"}
)
else:
self.clients[id] = {
@@ -200,9 +307,9 @@ class OnemeMobileServer:
"writer": writer,
"ip": addr[0],
"port": addr[1],
"protocol": "oneme"
"protocol": "oneme",
}
]
],
}
async def _end_session(self, id, ip, port):
@@ -229,4 +336,4 @@ class OnemeMobileServer:
self.logger.info(f"Сокет запущен на порту {self.port}")
async with self.server:
await self.server.serve_forever()
await self.server.serve_forever()

317
src/oneme/websocket.py Normal file
View File

@@ -0,0 +1,317 @@
import logging
import traceback
import websockets
from common.proto_web import WebProto
from oneme.processors import Processors
from common.rate_limiter import RateLimiter
from common.opcodes import Opcodes
from common.tools import Tools
class OnemeWS:
def __init__(self, host, port, clients, ssl_context, db_pool, send_event):
self.host = host
self.port = port
self.ssl_context = ssl_context
self.server = None
self.logger = logging.getLogger(__name__)
self.db_pool = db_pool
self.clients = clients
self.opcodes = Opcodes()
self.proto = WebProto()
self.processors = Processors(db_pool=db_pool, clients=clients, send_event=send_event, type="web")
self.auth_required = Tools().auth_required
# rate limiter
self.auth_rate_limiter = RateLimiter(max_attempts=5, window_seconds=60)
self.read_timeout = 300 # Таймаут чтения из websocket (секунды)
self.max_read_size = 65536 # Максимальный размер данных
async def handle_client(self, websocket):
"""Функция для обработки WebSocket подключений"""
# IP-адрес клиента
address = websocket.remote_address
self.logger.info(f"Работаю с клиентом {address[0]}:{address[1]}")
deviceType = None
deviceName = None
userPhone = None
userId = None
hashedToken = None
try:
async for message in websocket:
# Проверяем размер данных
if len(message) > self.max_read_size:
self.logger.warning(f"Пакет от {address[0]}:{address[1]} превышает лимит ({len(message)} байт)")
break
# Распаковываем данные
packet = self.proto.unpack_packet(message)
# Если пакет невалидный — пропускаем
if not packet:
self.logger.warning(f"Невалидный пакет от {address[0]}:{address[1]}")
continue
opcode = packet.get("opcode")
seq = packet.get("seq")
payload = packet.get("payload")
match opcode:
case self.opcodes.SESSION_INIT:
deviceType, deviceName = await self.processors.session_init(
payload, seq, websocket
)
case self.opcodes.AUTH_REQUEST:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(
seq,
self.opcodes.AUTH_REQUEST,
self.processors.error_types.RATE_LIMITED,
websocket,
)
else:
await self.processors.auth_request(payload, seq, websocket)
case self.opcodes.AUTH:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(
seq,
self.opcodes.AUTH,
self.processors.error_types.RATE_LIMITED,
websocket,
)
else:
await self.processors.auth(
payload, seq, websocket, deviceType, deviceName
)
case self.opcodes.AUTH_CONFIRM:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(
seq,
self.opcodes.AUTH_CONFIRM,
self.processors.error_types.RATE_LIMITED,
websocket,
)
elif payload and payload.get("tokenType") == "REGISTER":
await self.processors.auth_confirm(
payload, seq, websocket, deviceType, deviceName
)
else:
self.logger.warning(
f"AUTH_CONFIRM с неизвестным tokenType: {payload}"
)
case self.opcodes.LOGIN:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(
seq,
self.opcodes.LOGIN,
self.processors.error_types.RATE_LIMITED,
websocket,
)
else:
(
userPhone,
userId,
hashedToken,
) = await self.processors.login(payload, seq, websocket)
if userPhone:
await self._finish_auth(
websocket, address, userPhone, userId
)
case self.opcodes.LOGOUT:
await self.processors.logout(
seq, websocket, hashedToken=hashedToken
)
break
case self.opcodes.PING:
await self.processors.ping(payload, seq, websocket)
case self.opcodes.LOG:
await self.processors.log(payload, seq, websocket)
case self.opcodes.ASSETS_UPDATE:
await self.auth_required(
userPhone,
self.processors.assets_update,
payload,
seq,
websocket,
)
case self.opcodes.VIDEO_CHAT_HISTORY:
await self.auth_required(
userPhone,
self.processors.video_chat_history,
payload,
seq,
websocket,
)
case self.opcodes.MSG_SEND:
await self.auth_required(
userPhone,
self.processors.msg_send,
payload,
seq,
websocket,
userId,
self.db_pool,
)
case self.opcodes.FOLDERS_GET:
await self.auth_required(
userPhone,
self.processors.folders_get,
payload,
seq,
websocket,
userPhone,
)
case self.opcodes.SESSIONS_INFO:
await self.auth_required(
userPhone,
self.processors.sessions_info,
payload,
seq,
websocket,
userPhone,
hashedToken,
)
case self.opcodes.CHAT_INFO:
await self.auth_required(
userPhone,
self.processors.chat_info,
payload,
seq,
websocket,
userId,
)
case self.opcodes.CHAT_HISTORY:
await self.auth_required(
userPhone,
self.processors.chat_history,
payload,
seq,
websocket,
userId,
)
case self.opcodes.CONTACT_INFO_BY_PHONE:
await self.auth_required(
userPhone,
self.processors.contact_info_by_phone,
payload,
seq,
websocket,
userId,
)
case self.opcodes.OK_TOKEN:
await self.auth_required(
userPhone, self.processors.ok_token, payload, seq, websocket
)
case self.opcodes.MSG_TYPING:
await self.auth_required(
userPhone,
self.processors.msg_typing,
payload,
seq,
websocket,
userId,
)
case self.opcodes.CONTACT_INFO:
await self.auth_required(
userPhone,
self.processors.contact_info,
payload,
seq,
websocket,
)
case self.opcodes.COMPLAIN_REASONS_GET:
await self.auth_required(
userPhone,
self.processors.complain_reasons_get,
payload,
seq,
websocket,
)
case self.opcodes.PROFILE:
await self.processors.profile(
payload, seq, websocket, userId=userId
)
case self.opcodes.CHAT_SUBSCRIBE:
await self.auth_required(
userPhone,
self.processors.chat_subscribe,
payload,
seq,
websocket,
)
case _:
self.logger.warning(f"Неизвестный опкод {opcode}")
except websockets.exceptions.ConnectionClosed:
self.logger.info(f"Прекратил работать с клиентом {address[0]}:{address[1]}")
except Exception as e:
self.logger.error(f" Произошла ошибка при работе с клиентом {address[0]}:{address[1]}: {e}")
traceback.print_exc()
# Удаляем клиента из словаря при отключении
if userId:
await self._end_session(userId, address[0], address[1])
self.logger.info(f"Прекратил работать с клиентом {address[0]}:{address[1]}")
async def _finish_auth(self, websocket, addr, phone, id):
"""Завершение открытия сессии"""
# Ищем пользователя в словаре
user = self.clients.get(id)
# Добавляем новое подключение в словарь
if user:
user["clients"].append(
{
"writer": websocket,
"ip": addr[0],
"port": addr[1],
"protocol": "oneme"
}
)
else:
self.clients[id] = {
"phone": phone,
"id": id,
"clients": [
{
"writer": websocket,
"ip": addr[0],
"port": addr[1],
"protocol": "oneme"
}
]
}
async def _end_session(self, id, ip, port):
"""Завершение сессии"""
# Получаем пользователя в списке
user = self.clients.get(id)
if not user:
return
# Получаем подключения пользователя
clients = user.get("clients", [])
# Удаляем нужное подключение из словаря
for i, client in enumerate(clients):
if (client.get("ip"), client.get("port")) == (ip, port):
clients.pop(i)
async def start(self):
"""Функция для запуска WebSocket сервера"""
self.server = await websockets.serve(
self.handle_client,
self.host,
self.port,
ssl=self.ssl_context
)
self.logger.info(f"WebSocket запущен на порту {self.port}")
await self.server.wait_closed()