Files
openmax-server/src/common/proto_tcp.py
2026-04-10 17:43:35 +03:00

148 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import lz4.block
import msgpack
class MobileProto:
def __init__(self) -> None:
self.logger = logging.getLogger(__name__)
# TODO узнать какие должны быть лимиты и поменять,
# сейчас это больше заглушка
MAX_PAYLOAD_SIZE = 1048576 # 1 MB
MAX_DECOMPRESSED_SIZE = 1048576 # 1 MB
HEADER_SIZE = 10 # 1+2+1+2+4
### Работа с протоколом
def unpack_packet(self, data: bytes) -> dict | None:
# Проверяем минимальный размер пакета
if len(data) < self.HEADER_SIZE:
self.logger.warning(f"Пакет слишком маленький: {len(data)} байт")
return None
# Распаковываем заголовок
ver = int.from_bytes(data[0:1], "big", signed=False)
cmd = int.from_bytes(data[1:2], "big", signed=False)
seq = int.from_bytes(data[2:4], "big", signed=False)
opcode = int.from_bytes(data[4:6], "big", signed=False)
packed_len = int.from_bytes(data[6:10], "big", signed=False)
# Флаг упаковки
comp_flag = packed_len >> 24
# Парсим данные пакета
payload_length = packed_len & 0xFFFFFF
# Проверяем размер payload
if payload_length > self.MAX_PAYLOAD_SIZE:
self.logger.warning(
f"Payload слишком большой: {payload_length} B (лимит {self.MAX_PAYLOAD_SIZE})"
)
return None
# Проверяем длину пакета
if len(data) < self.HEADER_SIZE + payload_length:
self.logger.warning(
f"Пакет неполный: требуется {self.HEADER_SIZE + payload_length} B, получено {len(data)}"
)
return None
payload_bytes = data[10 : 10 + payload_length]
payload = None
# Декодируем данные пакета
if payload_bytes:
# Разжимаем данные пакета, если требуется
if comp_flag != 0:
compressed_data = payload_bytes
try:
payload_bytes = lz4.block.decompress(
compressed_data,
uncompressed_size=self.MAX_DECOMPRESSED_SIZE,
)
except lz4.block.LZ4BlockError:
self.logger.warning("Ошибка декомпрессии LZ4")
return None
# Распаковываем msgpack
payload = msgpack.unpackb(payload_bytes, raw=False, strict_map_key=False)
self.logger.debug(
f"Распаковал - ver={ver} cmd={cmd} seq={seq} opcode={opcode} payload={payload} comp_flag={comp_flag}"
)
# Возвращаем
return {
"ver": ver,
"cmd": cmd,
"seq": seq,
"opcode": opcode,
"payload": payload,
}
def pack_packet(
self,
ver: int = 10,
cmd: int = 1,
seq: int = 1,
opcode: int = 6,
payload: dict = {},
) -> bytes:
# Запаковываем заголовок
ver_b = ver.to_bytes(1, "big")
cmd_b = cmd.to_bytes(1, "big")
seq_b = seq.to_bytes(2, "big")
opcode_b = opcode.to_bytes(2, "big")
# Запаковываем данные пакета
payload_bytes: bytes | None = msgpack.packb(payload)
if payload_bytes is None:
payload_bytes = b""
# Флаг сжатия
comp_flag = 0
# Пробуем сжать данные пакета
try:
payload_comp = lz4.block.compress(
payload_bytes,
mode='high_compression',
store_size=False,
)
# Если сжатие нам выгодно, то используем его
if len(payload_bytes) > len(payload_comp):
final_payload = payload_comp
# Официальный сервер MAX отправлял мне в качестве
# флага сжатия 2, поэтому думаю стоит использовать ее
comp_flag = 2
else:
# В случае если сжатие нам не выгодно, используем
# только запакованные данные через msgpack
final_payload = payload_bytes
except Exception as e:
self.logger.warning(f"Ошибка сжатия LZ4: {e}")
# В случае ошибки сжатия используем
# только запакованные данные через msgpack
final_payload = payload_bytes
payload_len = len(final_payload) & 0xFFFFFF
packed_len = (comp_flag << 24) | payload_len
payload_len_b = packed_len.to_bytes(4, "big")
self.logger.debug(
f"Упаковал - ver={ver} cmd={cmd} seq={seq} opcode={opcode} payload={payload} comp_flag={comp_flag}"
)
# Возвращаем пакет
return ver_b + cmd_b + seq_b + opcode_b + payload_len_b + final_payload
### Констаты протокола
CMD_OK = 1
CMD_NOF = 2
CMD_ERR = 3
PROTO_VER = 10