Швырнул архитектуру, чтобы позже объединить контроллеры веба и сокета в одно, а также разделить процессоры

This commit is contained in:
Alexey Polyakov
2026-03-19 01:13:12 +03:00
parent 2dab853569
commit 6c05b5f1b5
22 changed files with 411 additions and 696 deletions

0
src/oneme/__init__.py Normal file
View File

7
src/oneme/config.py Normal file
View File

@@ -0,0 +1,7 @@
class OnemeConfig:
def __init__(self):
pass
SERVER_CONFIG = {
}

91
src/oneme/controller.py Normal file
View File

@@ -0,0 +1,91 @@
import asyncio
from oneme.socket import OnemeMobileServer
from common.proto_tcp import MobileProto
from common.proto_web import WebProto
from classes.controllerbase import ControllerBase
from common.config import ServerConfig
from common.opcodes import Opcodes
class OnemeMobileController(ControllerBase):
def __init__(self):
self.config = ServerConfig()
self.proto = MobileProto()
self.opcodes = Opcodes()
async def event(self, target, client, eventData):
# Извлекаем тип события и врайтер
eventType = eventData.get("eventType")
writer = client.get("writer")
# Обрабатываем событие
if eventType == "new_msg":
# Данные сообщения
chatId = eventData.get("chatId")
message = eventData.get("message")
prevMessageId = eventData.get("prevMessageId")
time = eventData.get("time")
# Данные пакета
payload = {
"chatId": chatId,
"message": message,
"prevMessageId": prevMessageId,
"ttl": False,
"unread": 0,
"mark": time
}
# Создаем пакет
packet = self.proto.pack_packet(
cmd=0, seq=1, opcode=self.opcodes.NOTIF_MESSAGE, payload=payload
)
elif eventType == "typing":
# Данные события
chatId = eventData.get("chatId")
userId = eventData.get("userId")
type = eventData.get("type")
# Данные пакета
payload = {
"chatId": chatId,
"userId": userId,
"type": type
}
# Создаем пакет
packet = self.proto.pack_packet(
cmd=0, seq=1, opcode=self.opcodes.NOTIF_TYPING, payload=payload
)
elif eventType == "profile_updated":
# Данные события
profile = eventData.get("profile")
# Данные пакета
payload = {
"profile": profile
}
# Создаем пакет
packet = self.proto.pack_packet(
cmd=0, seq=1, opcode=self.opcodes.NOTIF_PROFILE, payload=payload
)
# Отправляем пакет
writer.write(packet)
await writer.drain()
def launch(self, api):
async def _start_all():
await asyncio.gather(
OnemeMobileServer(
host=self.config.host,
port=self.config.oneme_tcp_port,
ssl_context=api['ssl'],
db_pool=api['db'],
clients=api['clients'],
send_event=api['event'],
telegram_bot=api.get('telegram_bot'),
).start()
)
return _start_all()

129
src/oneme/models.py Normal file
View File

@@ -0,0 +1,129 @@
import pydantic
class UserAgentModel(pydantic.BaseModel):
deviceType: str
appVersion: str
osVersion: str
timezone: str
release: int = None
screen: str
pushDeviceType: str
arch: str = None
locale: str
buildNumber: int
deviceName: str
deviceLocale: str
class HelloPayloadModel(pydantic.BaseModel):
clientSessionId: int
mt_instanceid: str = None
userAgent: UserAgentModel
deviceId: str
class RequestCodePayloadModel(pydantic.BaseModel):
phone: str
type: str
@pydantic.field_validator('phone')
def validate_phone(cls, v):
"""Валидация номера телефона"""
if not v.replace("+", "").replace(" ", "").replace("-", "").isdigit():
raise ValueError('phone must be digits')
return v
@pydantic.field_validator('type')
def validate_type(cls, v):
"""Валидация типа запроса"""
if not v in ("START_AUTH", "RESEND"):
raise ValueError('type must be valid')
return v
class VerifyCodePayloadModel(pydantic.BaseModel):
verifyCode: str
authTokenType: str
token: str
class LoginPayloadModel(pydantic.BaseModel):
interactive: bool
token: str
class PingPayloadModel(pydantic.BaseModel):
interactive: bool
class AssetsPayloadModel(pydantic.BaseModel):
sync: int
type: str
class GetCallHistoryPayloadModel(pydantic.BaseModel):
forward: bool
count: int
class MessageModel(pydantic.BaseModel):
isLive: bool
detectShare: bool
elements: list
attaches: list = None
cid: int
text: str = None
class SendMessagePayloadModel(pydantic.BaseModel):
# TODO: пишем сервер макса в 2 ночи и не понимаем как это валидировать (блять)
userId: int = None
chatId: int = None
message: MessageModel
class SyncFoldersPayloadModel(pydantic.BaseModel):
folderSync: int
class SearchChatsPayloadModel(pydantic.BaseModel):
chatIds: list
class SearchByPhonePayloadModel(pydantic.BaseModel):
phone: str
class GetCallTokenPayloadModel(pydantic.BaseModel):
userId: int
value: str
class TypingPayloadModel(pydantic.BaseModel):
chatId: int
type: str = None
class SearchUsersPayloadModel(pydantic.BaseModel):
contactIds: list
class ComplainReasonsGetPayloadModel(pydantic.BaseModel):
complainSync: int
class UpdateProfilePayloadModel(pydantic.BaseModel):
description: str = None
firstName: str = None
lastName: str = None
class AuthConfirmRegisterPayloadModel(pydantic.BaseModel):
token: str
firstName: str
lastName: str = None
tokenType: str
@pydantic.field_validator('firstName')
def validate_first_name(cls, v):
v = v.strip()
if not v:
raise ValueError('firstName must not be empty')
if len(v) > 59:
raise ValueError('firstName too long')
return v
@pydantic.field_validator('lastName')
def validate_last_name(cls, v):
if v is None:
return v
v = v.strip()
if len(v) > 59:
raise ValueError('lastName too long')
return v
class ChatHistoryPayloadModel(pydantic.BaseModel):
chatId: int
backward: int

1251
src/oneme/processors.py Normal file

File diff suppressed because it is too large Load Diff

228
src/oneme/socket.py Normal file
View File

@@ -0,0 +1,228 @@
import asyncio, logging, traceback
from common.proto_tcp import MobileProto
from oneme.processors import Processors
from common.rate_limiter import RateLimiter
from common.tools import Tools
from common.opcodes import Opcodes
class OnemeMobileServer:
def __init__(self, host="0.0.0.0", port=443, ssl_context=None, db_pool=None, clients={}, send_event=None, telegram_bot=None):
self.host = host
self.port = port
self.ssl_context = ssl_context
self.server = None
self.logger = logging.getLogger(__name__)
self.db_pool = db_pool
self.clients = clients
self.proto = MobileProto()
self.auth_required = Tools().auth_required
self.processors = Processors(db_pool=db_pool, clients=clients, send_event=send_event, telegram_bot=telegram_bot)
self.opcodes = Opcodes()
# rate limiter anti ddos brute force protection
self.auth_rate_limiter = RateLimiter(max_attempts=5, window_seconds=60)
self.read_timeout = 300 # Таймаут чтения из сокета (секунды)
self.max_read_size = 65536 # Максимальный размер данных из сокета
async def handle_client(self, reader, writer):
"""Функция для обработки подключений"""
# IP-адрес клиента
address = writer.get_extra_info("peername")
self.logger.info(f"Работаю с клиентом {address[0]}:{address[1]}")
deviceType = None
deviceName = None
userPhone = None
userId = None
hashedToken = None
try:
while True:
# Читаем новые данные из сокета с таймаутом
try:
data = await asyncio.wait_for(
reader.read(self.max_read_size),
timeout=self.read_timeout
)
except asyncio.TimeoutError:
self.logger.info(f"Таймаут соединения для {address[0]}:{address[1]}")
break
# Если сокет закрыт - выходим из цикла
if not data:
break
if len(data) > self.max_read_size:
self.logger.warning(f"Пакет от {address[0]}:{address[1]} превышает лимит ({len(data)} байт)")
break
# Распаковываем данные
packet = self.proto.unpack_packet(data)
# Скип если пакет невалидный
if packet is None:
self.logger.warning(f"Невалидный пакет от {address[0]}:{address[1]}")
continue
opcode = packet.get("opcode")
seq = packet.get("seq")
payload = packet.get("payload")
match opcode:
case self.opcodes.SESSION_INIT:
deviceType, deviceName = await self.processors.session_init(payload, seq, writer)
case self.opcodes.AUTH_REQUEST:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.AUTH_REQUEST, self.processors.error_types.RATE_LIMITED, writer)
else:
await self.processors.auth_request(payload, seq, writer)
case self.opcodes.AUTH:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.AUTH, self.processors.error_types.RATE_LIMITED, writer)
else:
await self.processors.auth(payload, seq, writer, deviceType, deviceName)
case self.opcodes.AUTH_CONFIRM:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.AUTH_CONFIRM, self.processors.error_types.RATE_LIMITED, writer)
elif payload and payload.get("tokenType") == "REGISTER":
await self.processors.auth_confirm(payload, seq, writer, deviceType, deviceName)
else:
self.logger.warning(f"AUTH_CONFIRM с неизвестным tokenType: {payload}")
case self.opcodes.LOGIN:
if not self.auth_rate_limiter.is_allowed(address[0]):
await self.processors._send_error(seq, self.opcodes.LOGIN, self.processors.error_types.RATE_LIMITED, writer)
else:
userPhone, userId, hashedToken = await self.processors.login(payload, seq, writer)
if userPhone:
await self._finish_auth(writer, address, userPhone, userId)
case self.opcodes.LOGOUT:
await self.processors.logout(seq, writer, hashedToken=hashedToken)
break
case self.opcodes.PING:
await self.processors.ping(payload, seq, writer)
case self.opcodes.LOG:
await self.processors.log(payload, seq, writer)
case self.opcodes.ASSETS_UPDATE:
await self.auth_required(
userPhone, self.processors.assets_update, payload, seq, writer
)
case self.opcodes.VIDEO_CHAT_HISTORY:
await self.auth_required(
userPhone, self.processors.video_chat_history, payload, seq, writer
)
case self.opcodes.MSG_SEND:
await self.auth_required(
userPhone, self.processors.msg_send, payload, seq, writer, userId, self.db_pool
)
case self.opcodes.FOLDERS_GET:
await self.auth_required(
userPhone, self.processors.folders_get, payload, seq, writer, userPhone
)
case self.opcodes.SESSIONS_INFO:
await self.auth_required(
userPhone, self.processors.sessions_info, payload, seq, writer, userPhone, hashedToken
)
case self.opcodes.CHAT_INFO:
await self.auth_required(
userPhone, self.processors.chat_info, payload, seq, writer, userId
)
case self.opcodes.CHAT_HISTORY:
await self.auth_required(
userPhone, self.processors.chat_history, payload, seq, writer, userId
)
case self.opcodes.CONTACT_INFO_BY_PHONE:
await self.auth_required(
userPhone, self.processors.contact_info_by_phone, payload, seq, writer, userId
)
case self.opcodes.OK_TOKEN:
await self.auth_required(
userPhone, self.processors.ok_token, payload, seq, writer
)
case self.opcodes.MSG_TYPING:
await self.auth_required(
userPhone, self.processors.msg_typing, payload, seq, writer, userId
)
case self.opcodes.CONTACT_INFO:
await self.auth_required(
userPhone, self.processors.contact_info, payload, seq, writer
)
case self.opcodes.COMPLAIN_REASONS_GET:
await self.auth_required(
userPhone, self.processors.complain_reasons_get, payload, seq, writer
)
case self.opcodes.PROFILE:
await self.processors.profile(
payload, seq, writer, userId=userId, userPhone=userPhone
)
case _:
self.logger.warning(f"Неизвестный опкод {opcode}")
except Exception as e:
self.logger.error(f"Произошла ошибка при работе с клиентом {address[0]}:{address[1]}: {e}")
traceback.print_exc()
# Удаляем клиента из словаря
if userPhone:
await self._end_session(userId, address[0], address[1])
writer.close()
self.logger.info(f"Прекратил работать работать с клиентом {address[0]}:{address[1]}")
async def _finish_auth(self, writer, addr, phone, id):
"""Завершение открытия сессии"""
# Ищем пользователя в словаре
user = self.clients.get(id)
# Добавляем новое подключение в словарь
if user:
user["clients"].append(
{
"writer": writer,
"ip": addr[0],
"port": addr[1],
"protocol": "oneme_mobile"
}
)
else:
self.clients[id] = {
"phone": phone,
"id": id,
"clients": [
{
"writer": writer,
"ip": addr[0],
"port": addr[1],
"protocol": "oneme_mobile"
}
]
}
async def _end_session(self, id, ip, port):
"""Завершение сессии"""
# Получаем пользователя в списке
user = self.clients.get(id)
if not user:
return
# Получаем подключения пользователя
clients = user.get("clients", [])
# Удаляем нужное подключение из словаря
for i, client in enumerate(clients):
if (client.get("ip"), client.get("port")) == (ip, port):
clients.pop(i)
async def start(self):
"""Функция для запуска сервера"""
self.server = await asyncio.start_server(
self.handle_client, self.host, self.port, ssl=self.ssl_context
)
self.logger.info(f"Сокет запущен на порту {self.port}")
async with self.server:
await self.server.serve_forever()