diff --git a/.gitignore b/.gitignore index b7faf40..336736f 100644 --- a/.gitignore +++ b/.gitignore @@ -205,3 +205,4 @@ cython_debug/ marimo/_static/ marimo/_lsp/ __marimo__/ +*.xml diff --git a/.idea/carbus_lib.iml b/.idea/carbus_lib.iml new file mode 100644 index 0000000..74d515a --- /dev/null +++ b/.idea/carbus_lib.iml @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/README.md b/README.md index 0a6d62b..33f37a8 100644 --- a/README.md +++ b/README.md @@ -1 +1,256 @@ -# carbus_lib \ No newline at end of file +# car-bus-lib (async CAN / ISO-TP / UDS stack) + +Асинхронная библиотека на Python для работы с CAN-адаптером **CAN-Hacker / Car Bus Analyzer**: + +- 📡 **`carbus_async`** – низкоуровневая работа с железкой (CAN/LIN, фильтры, терминаторы и т.д.) +- 📦 **`isotp_async`** – ISO-TP (ISO 15765-2) поверх CAN (single + multi-frame) +- 🩺 **`uds_async`** – UDS (ISO 14229) клиент и сервер (диагностика, чтение VIN и т.п.) +- 🌐 **TCP-bridge** – удалённое подключение к адаптеру через сеть (как будто он воткнут локально) + +> Минимальные примеры, никаких «магических» зависимостей — всё на `asyncio`. + +--- + +## Установка + +Пока проект в разработке, можно ставить его как editable-модуль из репозитория: + +```bash +git clone https://github.com/your_name/carbus_lib.git +cd car_bus_lib +pip install -e . +``` + +# car-bus-lib + +Асинхронная библиотека для работы с CAN / CAN-FD, ISO-TP и UDS. +Поддерживает локальное подключение через USB CDC и удалённую работу через TCP-bridge. + +--- + +## Возможности + +- CAN / CAN-FD отправка и приём +- Настройка каналов, скоростей, режимов, BRS +- Фильтры ID, очистка фильтров, управление терминатором 120 Ω +- ISO-TP (single + multi-frame) +- UDS Client и UDS Server (эмуляция ЭБУ) +- TCP-мост: удалённая работа с адаптером так, как будто он подключён локально +- Логирование всего протокольного трафика + +--- + +## Установка +````bat + pip install pyserial pyserial-asyncio + + git clone https://github.com/your_name/carbus_lib.git + cd car-bus-lib + pip install -e . +```` + +## 1. Работа с CAN + +Простейший пример: открыть устройство, настроить канал и отправить / принять кадр. + +````python + import asyncio + from carbus_async.device import CarBusDevice + from carbus_async.messages import CanMessage + + async def main(): + dev = await CarBusDevice.open("COM6", baudrate=115200) + + # классический CAN 500 kbit/s + await dev.open_can_channel( + channel=1, + nominal_bitrate=500_000, + fd=False, + ) + + # отправка кадра 0x7E0 8 байт + msg = CanMessage(can_id=0x7E0, data=b"\x02\x3E\x00\x00\x00\x00\x00\x00") + await dev.send_can(msg, channel=1) + + # приём любого сообщения + ch, rx = await dev.receive_can() + print("RX:", ch, rx) + + await dev.close() + + asyncio.run(main()) +```` + + +## 2. Информация об устройстве и фильтры + +Пример запроса DEVICE_INFO и настройки фильтров: +````python + info = await dev.get_device_info() + + print("HW:", info.hardware_name) + print("FW:", info.firmware_version) + print("Serial:", info.serial_int) + + # очистить все фильтры на канале 1 + await dev.clear_all_filters(1) + + # разрешить только ответы с ID 0x7E8 (11-битный стандартный ID) + await dev.set_std_id_filter( + channel=1, + index=0, + can_id=0x7E8, + mask=0x7FF, + ) + + # включить/выключить терминатор 120 Ω + await dev.set_terminator(channel=1, enabled=True) +```` + +## 3. ISO-TP (isotp_async) +ISO-TP канал строится поверх CarBusDevice: +````python + from isotp_async import IsoTpChannel + + # предполагается, что dev уже открыт и канал CAN настроен + isotp = IsoTpChannel( + device=dev, + channel=1, + tx_id=0x7E0, # наш запрос + rx_id=0x7E8, # ответ ЭБУ + ) + + # отправить запрос ReadDataByIdentifier F190 (VIN) + await isotp.send(b"\x22\xF1\x90") + + # получить полный ответ (single или multi-frame) + resp = await isotp.recv(timeout=1.0) + print("ISO-TP:", resp.hex()) +```` + +## 4. UDS Client (uds_async.client) + +Клиент UDS использует IsoTpChannel: +````python + from uds_async.client import UdsClient + from isotp_async import IsoTpChannel + + isotp = IsoTpChannel(dev, channel=1, tx_id=0x7E0, rx_id=0x7E8) + uds = UdsClient(isotp_channel=isotp) + + # переход в расширенную диагностическую сессию + await uds.diagnostic_session_control(0x03) + + # чтение VIN (DID F190) + vin_bytes = await uds.read_data_by_identifier(0xF190) + print("VIN:", vin_bytes.decode(errors="ignore")) +```` + +## 5. UDS Server (эмулятор ЭБУ) + +Простой UDS-сервер, который отвечает на запрос VIN: +````python + from uds_async.server import UdsServer, UdsRequest, UdsPositiveResponse + from isotp_async import IsoTpChannel + + class MyEcuServer(UdsServer): + async def handle_read_data_by_identifier(self, req: UdsRequest): + if req.data_identifier == 0xF190: + # положительный ответ: 62 F1 90 + данные + return UdsPositiveResponse(b"\x62\xF1\x90DEMO-VIN-1234567") + # всё остальное обрабатывается базовой реализацией + return await super().handle_read_data_by_identifier(req) + + async def main(): + dev = await CarBusDevice.open("COM6", baudrate=115200) + await dev.open_can_channel(channel=1, nominal_bitrate=500_000, fd=False) + + isotp = IsoTpChannel(dev, channel=1, tx_id=0x7E8, rx_id=0x7E0) + server = MyEcuServer(isotp_channel=isotp) + await server.serve_forever() + + asyncio.run(main()) +```` + +## 6. Удалённая работа через TCP (tcp_bridge) + +### 6.1. Сервер (рядом с адаптером) + +На машине, где физически подключён CAN-адаптер: + + python -m carbus_async.tcp_bridge --serial COM6 --port 7000 + +Адаптер открывается локально, а поверх него поднимается TCP-мост. + +### 6.2. Клиент (удалённая машина) + +На другой машине можно использовать тот же API, как с локальным COM, но через `open_tcp`: +````python + import asyncio + from carbus_async.device import CarBusDevice + from carbus_async.messages import CanMessage + + async def main(): + dev = await CarBusDevice.open_tcp("192.168.1.10", 7000) + + await dev.open_can_channel( + channel=1, + nominal_bitrate=500_000, + fd=False, + ) + + msg = CanMessage(can_id=0x321, data=b"\x01\x02\x03\x04") + await dev.send_can(msg, channel=1) + + ch, rx = await dev.receive_can() + print("REMOTE RX:", ch, rx) + + await dev.close() + + asyncio.run(main()) +```` + +## 7. Логирование + +Для отладки удобно включить подробное логирование: +````python + import logging + + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) +```` +Логгеры: + +- `carbus_async.wire.*` — сырые кадры по USB/TCP (TX/RX) +- `carbus_async.device.*` — высокоуровневые события, ошибки, BUS_ERROR +- дополнительные логгеры в isotp_async / uds_async + +--- + +## 8. Структура проекта + + carbus_async/ + device.py — асинхронный интерфейс к адаптеру (CAN/CAN-FD) + protocol.py — описания команд, флагов и структур протокола + messages.py — модель CanMessage и вспомогательные типы + tcp_bridge.py — TCP-мост (сервер для удалённой работы) + + isotp_async/ + __init__.py — IsoTpChannel и вспомогательные сущности + + uds_async/ + client.py — UdsClient (клиент UDS) + server.py — UdsServer (сервер / эмулятор ЭБУ) + types.py — структуры запросов/ответов + +--- + +## 9. Лицензия + +MIT (можно поменять под нужды проекта). + +Pull Requests и предложения по улучшению приветствуются 🚗 + + diff --git a/carbus_async/__init__.py b/carbus_async/__init__.py new file mode 100644 index 0000000..a8ef2ad --- /dev/null +++ b/carbus_async/__init__.py @@ -0,0 +1,12 @@ +from .device import CarBusDevice +from .messages import CanMessage, MessageDirection +from .exceptions import CarBusError, CommandError, SyncError + +__all__ = [ + "CarBusDevice", + "CanMessage", + "MessageDirection", + "CarBusError", + "CommandError", + "SyncError", +] diff --git a/carbus_async/device.py b/carbus_async/device.py new file mode 100644 index 0000000..781300d --- /dev/null +++ b/carbus_async/device.py @@ -0,0 +1,1147 @@ +from __future__ import annotations + +import asyncio +import contextlib +import logging +import struct +from dataclasses import dataclass, field +from typing import Dict, Optional, Tuple, List + +import serial_asyncio + +from .exceptions import CarBusError, SyncError, CommandError +from .messages import CanMessage + + +from .protocol import ( + Command, + CommandHeader, + MsgCommandHeader, + HeaderFlags, + BusMessageFlags, + CC_MULTIWORD, + is_ack, + base_command_from_ack, + need_extended_header, +) + + +NOMINAL_BITRATE_INDEX: Dict[int, int] = { + 10_000: 0, + 20_000: 1, + 33_300: 2, + 50_000: 3, + 62_500: 4, + 83_300: 5, + 95_200: 6, + 100_000: 7, + 125_000: 8, + 250_000: 9, + 400_000: 10, + 500_000: 11, + 800_000: 12, + 1_000_000: 13, + # 0xFF = detect, +} + + +DATA_BITRATE_INDEX: Dict[int, int] = { + 500_000: 0, + 1_000_000: 1, + 2_000_000: 2, + 4_000_000: 3, + 5_000_000: 4, +} + + +@dataclass +class DeviceInfoParam: + header: int + data: List[int] + +from .protocol import ( + Command, + CommandHeader, + MsgCommandHeader, + HeaderFlags, + BusMessageFlags, + CC_MULTIWORD, + DI_HARDWARE_ID, + DI_FIRMWARE_VERSION, + DI_DEVICE_SERIAL, + DI_FEATURES, + DI_CHANNEL_MAP, + DI_CHANNEL_FEATURES, + DI_FILTER, + DI_GATEWAY, + DI_CHANNEL_FREQUENCY, + DI_ISOTP, + DI_TX_BUFFER, + DI_TX_TASK, + is_ack, + base_command_from_ack, + need_extended_header, +) + +HW_ID_NAMES: Dict[int, str] = { + 0xFF: "HW_CH30", + 0x02: "HW_ODB_OLD", + 0x01: "HW_CH32", + 0x04: "HW_ODB", + 0x03: "HW_CHP", + 0x11: "HW_CH33", + 0x13: "HW_CHPM03", + 0x14: "HW_ODB_FD", + 0x06: "HW_FDL2_M02", + 0x16: "HW_FDL2_M05", +} + +# DI_FEATURES +DI_FEATURE_GATEWAY = 0x00000001 +DI_FEATURE_ISOTP = 0x00000002 +DI_FEATURE_TX_BUFFER = 0x00000004 +DI_FEATURE_TX_TASK = 0x00000008 + +FLAG_CONFIG_TERMINATOR = 0x05 + +# ChannelType (DI_CHANNEL_MAP) +CHANNEL_TYPE_MAP: Dict[int, str] = { + 0x00: "NONE", # CTE_None + 0x01: "CAN", # CTE_CAN + 0x02: "CANFD", # CTE_CANFD + 0x10: "LIN", # CTE_LIN +} + +# DI_CHANNEL_FEATURES +DI_CHANNEL_CHANNEL_MASK = 0x00FF0000 +DI_CHANNEL_CHANNEL_SHIFT = 16 + +DI_CHANNEL_FEATURE_ALC = 0x00000001 +DI_CHANNEL_FEATURE_TERMINATOR = 0x00000002 +DI_CHANNEL_FEATURE_PULLUP = 0x00000004 +DI_CHANNEL_FEATURE_CSD = 0x00000008 +DI_CHANNEL_FEATURE_IDLE = 0x00000010 +DI_CHANNEL_FEATURE_DSD = 0x00000020 +DI_CHANNEL_FEATURE_NONISO = 0x00000040 + +# DI_FILTER +DI_FILTER_CHANNEL_MASK = 0x00FF0000 +DI_FILTER_CHANNEL_SHIFT = 16 + +DI_FILTER_TYPE_MASK = 0x0000FF00 +DI_FILTER_TYPE_SHIFT = 8 +DI_FILTER_SIZE_MASK = 0x000000FF +DI_FILTER_SIZE_SHIFT = 0 + +DI_FILTER_TYPE_8BIT = 0x01 +DI_FILTER_TYPE_11BIT = 0x02 +DI_FILTER_TYPE_29BIT = 0x04 + +# DI_GATEWAY +DI_GATEWAY_SRC_MASK = 0x00FF0000 +DI_GATEWAY_SRC_SHIFT = 16 +DI_GATEWAY_DST_MASK = 0x0000FF00 +DI_GATEWAY_DST_SHIFT = 8 +DI_GATEWAY_FILTER_MASK = 0x000000FF +DI_GATEWAY_FILTER_SHIFT = 0 + +@dataclass +class DeviceInfo: + params: List[DeviceInfoParam] + raw_payload: bytes + + @classmethod + def from_payload(cls, payload: bytes) -> "DeviceInfo": + if len(payload) % 4 != 0: + raise ValueError( + f"DEVICE_INFO payload length {len(payload)} is not multiple of 4" + ) + + words = [ + int.from_bytes(payload[i: i + 4], "little") + for i in range(0, len(payload), 4) + ] + + params: List[DeviceInfoParam] = [] + i = 0 + n = len(words) + + while i < n: + header = words[i] + i += 1 + + if header & CC_MULTIWORD: + length = (header >> 16) & 0xFF + data = words[i: i + length] + i += length + else: + data = [] + + params.append(DeviceInfoParam(header=header, data=data)) + + return cls(params=params, raw_payload=payload) + + @staticmethod + def _param_code(value: int) -> int: + return value & 0x7F000000 + + def _find_first(self, base: int) -> Optional[DeviceInfoParam]: + base_code = self._param_code(base) + for p in self.params: + if self._param_code(p.header) == base_code: + return p + return None + + def _find_all(self, base: int) -> List[DeviceInfoParam]: + base_code = self._param_code(base) + return [ + p for p in self.params + if self._param_code(p.header) == base_code + ] + + def find_by_prefix(self, prefix: int) -> List[DeviceInfoParam]: + base_code = self._param_code(prefix) + return [ + p for p in self.params + if self._param_code(p.header) == base_code + ] + + @property + def hardware_id(self) -> Optional[int]: + p = self._find_first(DI_HARDWARE_ID) + if not p: + return None + return p.header & 0xFF + + @property + def hardware_name(self) -> Optional[str]: + hw_id = self.hardware_id + if hw_id is None: + return None + return HW_ID_NAMES.get(hw_id, f"0x{hw_id:02X}") + + @property + def firmware_version(self) -> Optional[str]: + p = self._find_first(DI_FIRMWARE_VERSION) + if not p: + return None + if not p.data: + return "" + + b = b"".join(w.to_bytes(4, "little") for w in p.data) + # Обрезаем по 0x00 + if b"\x00" in b: + b = b.split(b"\x00", 1)[0] + try: + return b.decode("ascii", errors="ignore") + except Exception: + return b.hex() + + @property + def serial_bytes(self) -> Optional[bytes]: + p = self._find_first(DI_DEVICE_SERIAL) + if not p or not p.data: + return None + return b"".join(w.to_bytes(4, "little") for w in p.data) + + @property + def serial_int(self) -> Optional[int]: + b = self.serial_bytes + if b is None: + return None + return int.from_bytes(b, "big") + + @property + def features_mask(self) -> int: + p = self._find_first(DI_FEATURES) + if not p: + return 0 + return p.header & 0x00FFFFFF + + @property + def feature_gateway(self) -> bool: + return bool(self.features_mask & DI_FEATURE_GATEWAY) + + @property + def feature_isotp(self) -> bool: + return bool(self.features_mask & DI_FEATURE_ISOTP) + + @property + def feature_tx_buffer(self) -> bool: + return bool(self.features_mask & DI_FEATURE_TX_BUFFER) + + @property + def feature_tx_task(self) -> bool: + return bool(self.features_mask & DI_FEATURE_TX_TASK) + + @property + def channel_types(self) -> Dict[int, str]: + p = self._find_first(DI_CHANNEL_MAP) + if not p: + return {} + + value = p.header + b0 = (value >> 0) & 0xFF + b1 = (value >> 8) & 0xFF + b2 = (value >> 16) & 0xFF + out: Dict[int, str] = {} + + for idx, v in enumerate((b0, b1, b2), start=1): + out[idx] = CHANNEL_TYPE_MAP.get(v, f"0x{v:02X}") + + return out + + @property + def channel_features(self) -> Dict[int, Dict[str, bool]]: + res: Dict[int, Dict[str, bool]] = {} + for p in self._find_all(DI_CHANNEL_FEATURES): + ch = (p.header & DI_CHANNEL_CHANNEL_MASK) >> DI_CHANNEL_CHANNEL_SHIFT + mask = p.header & 0x0000FFFF + if ch == 0: + continue + res[ch] = { + "raw_mask": mask, + "alc": bool(mask & DI_CHANNEL_FEATURE_ALC), + "terminator": bool(mask & DI_CHANNEL_FEATURE_TERMINATOR), + "pullup": bool(mask & DI_CHANNEL_FEATURE_PULLUP), + "csd": bool(mask & DI_CHANNEL_FEATURE_CSD), + "idle": bool(mask & DI_CHANNEL_FEATURE_IDLE), + "dsd": bool(mask & DI_CHANNEL_FEATURE_DSD), + "noniso": bool(mask & DI_CHANNEL_FEATURE_NONISO), + } + return res + + @property + def filters_info(self) -> List[Dict[str, int]]: + out: List[Dict[str, int]] = [] + for p in self._find_all(DI_FILTER): + h = p.header + ch = (h & DI_FILTER_CHANNEL_MASK) >> DI_FILTER_CHANNEL_SHIFT + type_mask = (h & DI_FILTER_TYPE_MASK) >> DI_FILTER_TYPE_SHIFT + size = (h & DI_FILTER_SIZE_MASK) >> DI_FILTER_SIZE_SHIFT + + if ch == 0: + continue + + info = { + "channel": ch, + "type_mask": type_mask, + "size": size, + "has_8bit": bool(type_mask & DI_FILTER_TYPE_8BIT), + "has_11bit": bool(type_mask & DI_FILTER_TYPE_11BIT), + "has_29bit": bool(type_mask & DI_FILTER_TYPE_29BIT), + } + out.append(info) + return out + + @property + def gateway_info(self) -> List[Dict[str, int]]: + out: List[Dict[str, int]] = [] + for p in self._find_all(DI_GATEWAY): + h = p.header + src = (h & DI_GATEWAY_SRC_MASK) >> DI_GATEWAY_SRC_SHIFT + dst = (h & DI_GATEWAY_DST_MASK) >> DI_GATEWAY_DST_SHIFT + flt = (h & DI_GATEWAY_FILTER_MASK) >> DI_GATEWAY_FILTER_SHIFT + if src == 0 or dst == 0: + continue + out.append({"src": src, "dst": dst, "filters": flt}) + return out + + @property + def channel_frequencies(self) -> Dict[int, int]: + out: Dict[int, int] = {} + for p in self._find_all(DI_CHANNEL_FREQUENCY): + h = p.header + ch = (h & DI_GATEWAY_SRC_MASK) >> DI_GATEWAY_SRC_SHIFT # по доке тот же формат + freq = h & 0x0000FFFF # в примере 0x15010078 → 0x0078 → 120 (МГц) + if ch == 0: + continue + # по доке пример: 0x78 (120МГц) — домножаем до Hz + out[ch] = freq * 1_000_000 + return out + + @property + def isotp_buffer_size(self) -> Optional[int]: + p = self._find_first(DI_ISOTP) + if not p: + return None + return p.header & 0x00FFFFFF + + @property + def tx_buffer_size(self) -> Optional[int]: + p = self._find_first(DI_TX_BUFFER) + if not p: + return None + return p.header & 0x00FFFFFF + + @property + def tx_task_count(self) -> Optional[int]: + p = self._find_first(DI_TX_TASK) + if not p: + return None + return p.header & 0x00FFFFFF + + def find_by_prefix(self, prefix: int) -> List[DeviceInfoParam]: + return [ + p for p in self.params + if (p.header & 0xFF000000) == (prefix & 0xFF000000) + ] + + def find_by_prefix(self, prefix: int) -> List[DeviceInfoParam]: + """ + Вернуть все параметры, у которых старший байт совпадает с prefix & 0xFF000000. + Например, prefix=0x01000000 вернёт все DI_*, начинающиеся с 0x01. + """ + return [ + p + for p in self.params + if (p.header & 0xFF000000) == (prefix & 0xFF000000) + ] + + +@dataclass +class _PendingRequest: + future: asyncio.Future + command: int + + +@dataclass +class CarBusDevice: + port: str + baudrate: int = 115200 + loop: Optional[asyncio.AbstractEventLoop] = None + + _reader: asyncio.StreamReader = field(init=False, repr=False) + _writer: asyncio.StreamWriter = field(init=False, repr=False) + _rx_queue: "asyncio.Queue[tuple[int, CanMessage]]" = field(init=False, repr=False) + _rx_channel_queues: Dict[int, "asyncio.Queue[CanMessage]"] = field(init=False, repr=False) + _pending: Dict[int, _PendingRequest] = field(init=False, repr=False) + _seq_counter: int = field(init=False, default=0, repr=False) + _reader_task: Optional[asyncio.Task] = field(init=False, default=None, repr=False) + _closed: bool = field(init=False, default=False, repr=False) + + _log: logging.Logger = field(init=False, repr=False) + _wire_log: logging.Logger = field(init=False, repr=False) + + def _ensure_channel_queue(self, channel: int) -> "asyncio.Queue[CanMessage]": + q = self._rx_channel_queues.get(channel) + if q is None: + q = asyncio.Queue() + self._rx_channel_queues[channel] = q + return q + + @classmethod + async def open( + cls, + port: str, + baudrate: int = 115200, + *, + loop: Optional[asyncio.AbstractEventLoop] = None, + use_can: bool = True, + use_lin: bool = False, + ) -> "CarBusDevice": + self = cls(port=port, baudrate=baudrate, loop=loop) + await self._connect() + await self.sync() + self._start_reader() + await self.device_open(use_can=use_can, use_lin=use_lin) + return self + + @classmethod + async def open_tcp(cls, host: str, port: int, **kwargs) -> "CarBusDevice": + return await cls.open(f"socket://{host}:{port}", **kwargs) + + async def _connect(self) -> None: + loop = self.loop or asyncio.get_running_loop() + + self._log = logging.getLogger(f"carbus_async.device.{self.port}") + self._wire_log = logging.getLogger(f"carbus_async.wire.{self.port}") + + if self.port.startswith("socket://"): + addr = self.port[len("socket://") :] + if ":" not in addr: + raise ValueError( + f"Invalid socket URL '{self.port}'. Expected 'socket://host:port'" + ) + + host, port_str = addr.rsplit(":", 1) + try: + tcp_port = int(port_str) + except ValueError as e: + raise ValueError( + f"Invalid TCP port in '{self.port}': {port_str!r}" + ) from e + + self._log.debug( + "Connecting via TCP to %s:%d (tcp_bridge)", host, tcp_port + ) + + self._reader, self._writer = await asyncio.open_connection( + host=host, + port=tcp_port, + ) + + self._log.debug( + "Connected to TCP %s:%d (logical port=%s)", + host, + tcp_port, + self.port, + ) + + else: + self._log.debug( + "Connecting to serial port %s @ %d using serial_asyncio", + self.port, + self.baudrate, + ) + self._reader, self._writer = await serial_asyncio.open_serial_connection( + loop=loop, + url=self.port, + baudrate=self.baudrate, + ) + self._log.debug("Connected to %s @ %d", self.port, self.baudrate) + + self._rx_queue = asyncio.Queue() + self._rx_channel_queues = {} + self._pending = {} + self._seq_counter = 0 + self._reader_task = None + self._closed = False + + async def close(self) -> None: + if self._closed: + return + + try: + try: + await self.device_close() + except Exception: + self._log.debug("DEVICE_CLOSE failed, ignoring", exc_info=True) + + if self._reader_task: + self._reader_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._reader_task + + self._writer.close() + await self._writer.wait_closed() + finally: + self._closed = True + + for pending in list(self._pending.values()): + if not pending.future.done(): + pending.future.set_exception( + CarBusError("Device closed before response was received") + ) + self._pending.clear() + + def _start_reader(self) -> None: + if self._reader_task is None or self._reader_task.done(): + self._reader_task = asyncio.create_task( + self._read_loop(), + name=f"carbus_read_loop_{self.port}", + ) + + def _next_seq(self) -> int: + self._seq_counter = (self._seq_counter + 1) & 0xFF + if self._seq_counter == 0: + self._seq_counter = 1 + return self._seq_counter + + async def sync(self) -> None: + frame = bytes((Command.SYNC, 0x00, Command.SYNC, 0x00)) + self._wire_log.debug("TX SYNC: %s", frame.hex(" ")) + + self._writer.write(frame) + await self._writer.drain() + + resp = await self._reader.readexactly(4) + self._wire_log.debug("RX SYNC: %s", resp.hex(" ")) + + if resp != bytes((0x5A, 0x00, 0x5A, 0x00)): + raise SyncError(f"Unexpected SYNC response: {resp!r}") + + async def _send_raw( + self, + command: int, + *, + header_flags: int = 0, + payload: bytes = b"", + expect_response: bool = True, + ) -> Tuple[int, int, bytes]: + if self._closed: + raise CarBusError("Device is closed") + + seq = self._next_seq() + dsize = len(payload) + + if need_extended_header(command): + header = MsgCommandHeader( + command=command, + sequence=seq, + flags=header_flags, + dsize=dsize, + ) + else: + if dsize > 0xFF: + raise ValueError( + f"dsize={dsize} too big for CommandHeader (max 255)" + ) + header = CommandHeader( + command=command, + sequence=seq, + flags=header_flags, + dsize=dsize, + ) + + header_bytes = header.to_bytes() + frame = header_bytes + payload + + fut: asyncio.Future = asyncio.get_running_loop().create_future() + if expect_response: + self._pending[seq] = _PendingRequest(future=fut, command=command) + + self._wire_log.debug( + "TX cmd=0x%02X seq=%d flags=0x%04X dsize=%d :: %s", + command, + seq, + header_flags, + dsize, + frame.hex(" "), + ) + + self._writer.write(frame) + await self._writer.drain() + + if not expect_response: + return 0, 0, b"" + + cmd_resp, flags_resp, payload_resp = await fut + return cmd_resp, flags_resp, payload_resp + + async def get_device_info(self) -> DeviceInfo: + cmd, flags, payload = await self._send_raw( + Command.DEVICE_INFO, + header_flags=0, + payload=b"", + expect_response=True, + ) + + if cmd != Command.DEVICE_INFO: + raise CommandError( + f"Unexpected DEVICE_INFO response: cmd=0x{cmd:02X}, flags=0x{flags:04X}" + ) + + return DeviceInfo.from_payload(payload) + + async def device_open(self, *, use_can: bool = True, use_lin: bool = False) -> None: + if use_can and use_lin: + mode_val = 0x00 # FULL + elif use_can and not use_lin: + mode_val = 0x01 # CAN only + elif not use_can and use_lin: + mode_val = 0x02 # LIN only + else: + mode_val = 0x00 # FULL по умолчанию + + dc_mode = 0x01000000 | mode_val + payload = dc_mode.to_bytes(4, "little") + + cmd, flags, resp_payload = await self._send_raw( + Command.DEVICE_OPEN, + header_flags=0, + payload=payload, + expect_response=True, + ) + + if not is_ack(cmd) or base_command_from_ack(cmd) != Command.DEVICE_OPEN: + raise CommandError( + f"Unexpected DEVICE_OPEN response: cmd=0x{cmd:02X}, flags=0x{flags:04X}" + ) + + async def device_close(self) -> None: + cmd, flags, _ = await self._send_raw( + Command.DEVICE_CLOSE, + header_flags=0, + payload=b"", + expect_response=True, + ) + + if not is_ack(cmd) or base_command_from_ack(cmd) != Command.DEVICE_CLOSE: + raise CommandError( + f"Unexpected DEVICE_CLOSE response: cmd=0x{cmd:02X}, flags=0x{flags:04X}" + ) + + async def open_can_channel( + self, + channel: int = 1, + *, + nominal_bitrate: int = 500_000, + fd: bool = False, + data_bitrate: Optional[int] = None, + brs: bool = False, + listen_only: bool = False, + loopback: bool = False, + auto_detect: bool = False, + retransmit: bool = False, + non_iso: bool = False, + nominal_index: Optional[int] = None, + data_index: Optional[int] = None, + ) -> None: + + if loopback: + mode_val = 0x02 + elif listen_only: + mode_val = 0x01 + else: + mode_val = 0x00 + + cc_can_mode = 0x11000000 | mode_val + + if not fd: + frame_mode = 0x00 + else: + frame_mode = 0x02 if brs else 0x01 + + cc_can_frame = 0x12000000 | frame_mode + + if auto_detect: + n_index = 0xFF + elif nominal_index is not None: + n_index = nominal_index & 0xFF + else: + try: + n_index = NOMINAL_BITRATE_INDEX[nominal_bitrate] + except KeyError: + raise ValueError( + f"Unsupported nominal bitrate {nominal_bitrate}. " + f"Known: {sorted(NOMINAL_BITRATE_INDEX.keys())}" + ) from None + + cc_bus_speed_n = 0x01000000 | (n_index & 0xFF) + + params: List[int] = [cc_can_mode, cc_can_frame, cc_bus_speed_n] + + if fd: + if data_index is not None: + d_index = data_index & 0xFF + else: + if data_bitrate is None: + raise ValueError("fd=True требует указать data_bitrate или data_index") + try: + d_index = DATA_BITRATE_INDEX[data_bitrate] + except KeyError: + raise ValueError( + f"Unsupported data bitrate {data_bitrate}. " + f"Known: {sorted(DATA_BITRATE_INDEX.keys())}" + ) from None + + cc_bus_speed_d = 0x02000000 | (d_index & 0xFF) + params.append(cc_bus_speed_d) + + if retransmit: + params.append(0x13000001) + + if fd and non_iso: + params.append(0x14000001) + + payload = b"".join(p.to_bytes(4, "little") for p in params) + + header_flags = (channel & 0x0F) * 0x20 + + cmd, flags, resp_payload = await self._send_raw( + Command.CHANNEL_OPEN, + header_flags=header_flags, + payload=payload, + expect_response=True, + ) + + if not is_ack(cmd) or base_command_from_ack(cmd) != Command.CHANNEL_OPEN: + raise CommandError( + f"Unexpected CHANNEL_OPEN response: cmd=0x{cmd:02X}, flags=0x{flags:04X}" + ) + + async def set_can_filter( + self, + channel: int, + index: int, + *, + can_id: int, + mask: int | None = None, + extended: bool = False, + ) -> None: + + if index < 0: + raise ValueError("filter index must be >= 0") + + filter_type = 0x01 if extended else 0x00 + + if mask is None: + mask = 0x1FFFFFFF if extended else 0x7FF + + payload = struct.pack(" None: + + if can_id < 0 or can_id > 0x7FF: + raise ValueError("can_id for standard ID must be in 0x000..0x7FF") + + await self.set_can_filter( + channel=channel, + index=index, + can_id=can_id, + mask=mask, + extended=False, + ) + + async def set_ext_id_filter( + self, + channel: int, + index: int, + can_id: int, + mask: int = 0x1FFFFFFF, + ) -> None: + + if can_id < 0 or can_id > 0x1FFFFFFF: + raise ValueError("can_id for extended ID must be in 0x00000000..0x1FFFFFFF") + + await self.set_can_filter( + channel=channel, + index=index, + can_id=can_id, + mask=mask, + extended=True, + ) + + async def clear_all_filters( + self, + channel: int, + *, + max_filters: int = 64, + stop_on_error: bool = True, + ) -> int: + + cleared = 0 + for idx in range(max_filters): + try: + await self.clear_can_filter(channel=channel, index=idx) + cleared += 1 + except CommandError as e: + if stop_on_error: + self._log.debug( + "Stop clearing filters on channel %d at index %d due to error: %s", + channel, + idx, + e, + ) + break + else: + self._log.warning( + "Error while clearing filter %d on channel %d: %s", + idx, + channel, + e, + ) + continue + + return cleared + + async def clear_can_filter( + self, + channel: int, + index: int, + ) -> None: + + if index < 0: + raise ValueError("filter index must be >= 0") + + payload = struct.pack(" None: + + state = 0x01 if enabled else 0x00 + channel_flag = (channel & 0x0F) * 0x20 + header_flags = channel_flag | FLAG_CONFIG_TERMINATOR + + payload = bytes((state,)) + + cmd, flags, resp_payload = await self._send_raw( + Command.CHANNEL_CONFIG, + header_flags=header_flags, + payload=payload, + expect_response=True, + ) + + if not is_ack(cmd) or base_command_from_ack(cmd) != Command.CHANNEL_CONFIG: + raise CommandError( + f"Unexpected TERMINATOR response: cmd=0x{cmd:02X}, flags=0x{flags:04X}" + ) + + + async def send_can( + self, + msg: CanMessage, + *, + channel: int, + confirm: bool = False, + echo: bool = False, + ) -> None: + + if channel == 1: + hflags = int(HeaderFlags.CHANNEL_1) + elif channel == 2: + hflags = int(HeaderFlags.CHANNEL_2) + elif channel == 3: + hflags = int(HeaderFlags.CHANNEL_3) + elif channel == 4: + hflags = int(HeaderFlags.CHANNEL_4) + else: + hflags = (channel & 0x0F) * 0x20 + + if confirm: + hflags |= int(HeaderFlags.CONFIRM_REQUIRED) + + mflags = BusMessageFlags.NONE + if msg.extended: + mflags |= BusMessageFlags.EXTID + if msg.rtr: + mflags |= BusMessageFlags.RTR + if msg.fd: + mflags |= BusMessageFlags.FDF + if msg.brs: + mflags |= BusMessageFlags.BRS + if not echo: + mflags |= BusMessageFlags.BLOCK_TX + + if msg.extended: + raw_id = msg.can_id & 0x1FFFFFFF + else: + raw_id = (msg.can_id & 0x7FF) #<< 16 + + timestamp = 0 + dlc = len(msg.data) & 0xFF + + header_struct = struct.pack( + " tuple[int, CanMessage]: + return await self._rx_queue.get() + + async def receive_can_on(self, channel: int = 1) -> CanMessage: + q = self._ensure_channel_queue(channel) + return await q.get() + + async def receive_can_on_timeout( + self, + channel: int = 1, + timeout: float = 1.0, + ) -> CanMessage | None: + try: + return await asyncio.wait_for(self.receive_can_on(channel), timeout=timeout) + except asyncio.TimeoutError: + return None + + async def _read_loop(self) -> None: + try: + while not self._closed: + cmd_bytes = await self._reader.readexactly(1) + if not cmd_bytes: + break + cmd = cmd_bytes[0] + + if need_extended_header(cmd): + header_rest = await self._reader.readexactly(5) + header = MsgCommandHeader.from_bytes(cmd_bytes + header_rest) + flags = header.flags + dsize = header.dsize + seq = header.sequence + else: + header_rest = await self._reader.readexactly(3) + header = CommandHeader.from_bytes(cmd_bytes + header_rest) + flags = header.flags + dsize = header.dsize + seq = header.sequence + + payload = b"" + if dsize: + payload = await self._reader.readexactly(dsize) + + full_frame = cmd_bytes + header_rest + payload + self._wire_log.debug( + "RX cmd=0x%02X seq=%d flags=0x%04X dsize=%d :: %s", + cmd, + seq, + flags, + dsize, + full_frame.hex(" "), + ) + + if cmd == Command.ERROR: + pending = self._pending.pop(seq, None) + if pending is not None and not pending.future.done(): + pending.future.set_exception( + CommandError( + f"Device ERROR for seq={seq}, " + f"flags=0x{flags:04X}, payload={payload.hex()}" + ) + ) + continue + + if is_ack(cmd): + base_cmd = base_command_from_ack(cmd) + pending = self._pending.pop(seq, None) + if pending is not None and not pending.future.done(): + pending.future.set_result((cmd, flags, payload)) + continue + + if seq in self._pending: + pending = self._pending.pop(seq) + if not pending.future.done(): + pending.future.set_result((cmd, flags, payload)) + continue + + if cmd == Command.MESSAGE: + await self._handle_bus_message(flags, payload) + elif cmd == Command.BUS_ERROR: + await self._handle_bus_error(flags, payload) + else: + self._log.debug( + "Unhandled async command: cmd=0x%02X, flags=0x%04X, payload=%s", + cmd, + flags, + payload.hex(" "), + ) + continue + + except asyncio.IncompleteReadError: + self._closed = True + except Exception as e: + self._closed = True + self._log.exception("Read loop exception: %s", e) + finally: + for pending in list(self._pending.values()): + if not pending.future.done(): + pending.future.set_exception( + CarBusError("Read loop terminated before response was received") + ) + self._pending.clear() + + async def _handle_bus_message(self, header_flags: int, payload: bytes) -> None: + if len(payload) < 16: + return + + flags_val, timestamp_us, _reserved, id_raw, dlc = struct.unpack_from(" None: + self._log.warning( + "BUS_ERROR: flags=0x%04X, payload=%s", header_flags, payload.hex(" ") + ) + + +async def _example() -> None: + dev = await CarBusDevice.open("COM6", baudrate=115200) + + info = await dev.get_device_info() + print("DEVICE_INFO raw:", info.raw_payload.hex(" ")) + for p in info.params: + print(f"DI header=0x{p.header:08X}, data={[f'0x{w:08X}' for w in p.data]}") + + await dev.open_can_channel(channel=1, nominal_bitrate=500_000, fd=False) + + msg = CanMessage(can_id=0x123, data=b"\x01\x02\x03\x04", dlc=4, channel=1) + await dev.send_can(msg, confirm=False, echo=True) + + rx = await dev.receive_can() + print("RX CAN:", rx) + + await dev.close() + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) + with contextlib.suppress(KeyboardInterrupt): + asyncio.run(_example()) diff --git a/carbus_async/exceptions.py b/carbus_async/exceptions.py new file mode 100644 index 0000000..1397146 --- /dev/null +++ b/carbus_async/exceptions.py @@ -0,0 +1,10 @@ +class CarBusError(Exception): + ... + + +class SyncError(CarBusError): + ... + + +class CommandError(CarBusError): + ... \ No newline at end of file diff --git a/carbus_async/messages.py b/carbus_async/messages.py new file mode 100644 index 0000000..a16dc47 --- /dev/null +++ b/carbus_async/messages.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum +from typing import Optional + +from .protocol import BusMessageFlags + + +class MessageDirection(str, Enum): + RX = "rx" + TX = "tx" + UNKNOWN = "unknown" + + +@dataclass +class CanMessage: + can_id: int + data: bytes = b"" + extended: bool = False + rtr: bool = False + fd: bool = False + brs: bool = False + timestamp_us: int = 0 + + @property + def dlc(self) -> int: + return len(self.data) + + @classmethod + def from_bus_payload( + cls, + *, + flags: BusMessageFlags, + timestamp_us: int, + can_id: int, + dlc: int, + data: bytes, + ) -> "CanMessage": + extended = bool(flags & BusMessageFlags.EXTID) + rtr = bool(flags & BusMessageFlags.RTR) + fd = bool(flags & BusMessageFlags.FDF) + brs = bool(flags & BusMessageFlags.BRS) + + payload = data[:dlc] + + return cls( + can_id=can_id, + data=payload, + extended=extended, + rtr=rtr, + fd=fd, + brs=brs, + timestamp_us=timestamp_us, + ) diff --git a/carbus_async/protocol.py b/carbus_async/protocol.py new file mode 100644 index 0000000..f615349 --- /dev/null +++ b/carbus_async/protocol.py @@ -0,0 +1,144 @@ +from __future__ import annotations + +from dataclasses import dataclass +from enum import IntEnum, IntFlag + + +class Command(IntEnum): + SYNC = 0xA5 + DEVICE_INFO = 0x06 + DEVICE_OPEN = 0x08 + DEVICE_CLOSE = 0x09 + CHANNEL_CONFIG = 0x11 + CHANNEL_OPEN = 0x18 + + FILTER_SET = 0x21 # COMMAND_FILTER_SET + FILTER_CLEAR = 0x22 # COMMAND_FILTER_CLEAR + + MESSAGE = 0x40 + BUS_ERROR = 0x48 + + ERROR = 0xFF + + +class FilterType(IntEnum): + STD_11BIT = 0x00 + EXT_29BIT = 0x01 + + +EXTENDED_HEADER_COMMANDS = {Command.MESSAGE, Command.BUS_ERROR} + + +class HeaderFlags(IntFlag): + NONE = 0x0000 + + CHANNEL_1 = 0x2000 + CHANNEL_2 = 0x4000 + CHANNEL_3 = 0x6000 + CHANNEL_4 = 0x8000 + + CONFIRM_REQUIRED = 0x0001 + + +class BusMessageFlags(IntFlag): + NONE = 0x00000000 + + EXTID = 0x00000001 + RTR = 0x00000002 + FDF = 0x00000004 + BRS = 0x00000008 + ESI = 0x00000010 + + ERROR_FRAME = 0x01000000 + RX = 0x10000000 + TX = 0x20000000 + BLOCK_TX = 0x30000000 + + +CC_MULTIWORD = 0x80000000 + +DI_MULTIWORD = CC_MULTIWORD + +DI_HARDWARE_ID = 0x01000000 # тип устройства (HWIdentifiers) +DI_FIRMWARE_VERSION = 0x02000000 # строка версии прошивки (ASCII, MULTIWORD) +DI_DEVICE_SERIAL = 0x03000000 # серийный номер (бинарно, MULTIWORD) +DI_FEATURES = 0x11000000 # битовая маска общих фич +DI_CHANNEL_MAP = 0x12000000 # карта каналов (тип канала) +DI_CHANNEL_FEATURES = 0x13000000 # опции по каналам (ALC, TERMINATOR, ...) +DI_FILTER = 0x14000000 # настройки фильтров по каналам +DI_GATEWAY = 0x15000000 # возможные пробросы между каналами +DI_CHANNEL_FREQUENCY = 0x16000000 # частота работы CAN/CAN-FD модуля +DI_ISOTP = 0x21000000 # размер ISO-TP буфера +DI_TX_BUFFER = 0x22000000 # размер буфера трейсера (в сообщениях) +DI_TX_TASK = 0x23000000 # количество задач периодической отправки + + +@dataclass +class CommandHeader: + command: int + sequence: int + flags: int + dsize: int + + @classmethod + def from_bytes(cls, data: bytes) -> "CommandHeader": + if len(data) != 4: + raise ValueError("CommandHeader требует ровно 4 байта") + cmd, seq, flg, size = data + return cls(cmd, seq, flg, size) + + def to_bytes(self) -> bytes: + return bytes( + ( + self.command & 0xFF, + self.sequence & 0xFF, + self.flags & 0xFF, + self.dsize & 0xFF, + ) + ) + + +@dataclass +class MsgCommandHeader: + command: int + sequence: int + flags: int + dsize: int + + @classmethod + def from_bytes(cls, data: bytes) -> "MsgCommandHeader": + if len(data) != 6: + raise ValueError("MsgCommandHeader требует ровно 6 байт") + cmd = data[0] + seq = data[1] + flags = int.from_bytes(data[2:4], "little") + dsize = int.from_bytes(data[4:6], "little") + return cls(cmd, seq, flags, dsize) + + def to_bytes(self) -> bytes: + return ( + bytes( + ( + self.command & 0xFF, + self.sequence & 0xFF, + ) + ) + + self.flags.to_bytes(2, "little") + + self.dsize.to_bytes(2, "little") + ) + + +def is_ack(cmd: int) -> bool: + return 0x80 <= cmd < 0xFF + + +def base_command_from_ack(cmd: int) -> int: + return cmd & 0x7F + + +def need_extended_header(command: int) -> bool: + try: + c = Command(command) + except ValueError: + return False + return c in EXTENDED_HEADER_COMMANDS diff --git a/carbus_async/tcp_bridge.py b/carbus_async/tcp_bridge.py new file mode 100644 index 0000000..5311e8d --- /dev/null +++ b/carbus_async/tcp_bridge.py @@ -0,0 +1,135 @@ +from __future__ import annotations + +import asyncio +import logging + +import serial_asyncio + +log = logging.getLogger("carbus_async.tcp_bridge") + + +async def _pump( + src: asyncio.StreamReader, + dst: asyncio.StreamWriter, + direction: str, + chunk_size: int = 4096, +) -> None: + try: + while True: + data = await src.read(chunk_size) + if not data: + log.debug("%s: EOF", direction) + break + dst.write(data) + await dst.drain() + except asyncio.CancelledError: + log.debug("%s: cancelled", direction) + raise + except Exception as e: + log.exception("%s: exception: %s", direction, e) + finally: + try: + dst.close() + except Exception: + pass + + +async def handle_client( + tcp_reader: asyncio.StreamReader, + tcp_writer: asyncio.StreamWriter, + *, + serial_port: str, + baudrate: int = 115200, +) -> None: + peer = tcp_writer.get_extra_info("peername") + log.info("Client connected: %s", peer) + + try: + serial_reader, serial_writer = await serial_asyncio.open_serial_connection( + url=serial_port, + baudrate=baudrate, + ) + log.info("Opened local serial %s @ %d for %s", serial_port, baudrate, peer) + + pump_tcp_to_serial = asyncio.create_task( + _pump(tcp_reader, serial_writer, f"{peer} tcp->serial") + ) + pump_serial_to_tcp = asyncio.create_task( + _pump(serial_reader, tcp_writer, f"{peer} serial->tcp") + ) + + done, pending = await asyncio.wait( + [pump_tcp_to_serial, pump_serial_to_tcp], + return_when=asyncio.FIRST_COMPLETED, + ) + + for task in pending: + task.cancel() + with asyncio.SuppressCancelledError if hasattr(asyncio, "SuppressCancelledError") else nullcontext(): + try: + await task + except asyncio.CancelledError: + pass + + except Exception as e: + log.exception("Error in handle_client %s: %s", peer, e) + finally: + try: + tcp_writer.close() + except Exception: + pass + log.info("Client disconnected: %s", peer) + + +async def run_tcp_bridge( + *, + listen_host: str = "0.0.0.0", + listen_port: int = 7000, + serial_port: str = "COM6", + baudrate: int = 115200, +) -> None: + server = await asyncio.start_server( + lambda r, w: handle_client(r, w, serial_port=serial_port, baudrate=baudrate), + listen_host, + listen_port, + ) + + addr = ", ".join(str(sock.getsockname()) for sock in server.sockets or []) + log.info( + "TCP bridge listening on %s -> serial %s @ %d", + addr, + serial_port, + baudrate, + ) + + async with server: + await server.serve_forever() + + +if __name__ == "__main__": + import argparse + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) + + parser = argparse.ArgumentParser( + description="TCP bridge for CarBus device (raw byte forwarder)." + ) + parser.add_argument("--host", default="0.0.0.0", help="Listen host (default 0.0.0.0)") + parser.add_argument("--port", type=int, default=7000, help="Listen TCP port (default 7000)") + parser.add_argument("--serial", required=True, help="Local serial port (e.g. COM6, /dev/ttyACM0)") + parser.add_argument("--baudrate", type=int, default=115200, help="Serial baudrate (default 115200)") + + args = parser.parse_args() + + async def _main() -> None: + await run_tcp_bridge( + listen_host=args.host, + listen_port=args.port, + serial_port=args.serial, + baudrate=args.baudrate, + ) + + asyncio.run(_main()) diff --git a/isotp_async/__init__.py b/isotp_async/__init__.py new file mode 100644 index 0000000..f5d69eb --- /dev/null +++ b/isotp_async/__init__.py @@ -0,0 +1,6 @@ + +from .carbus_iface import CarBusCanTransport + +__all__ = [ + "CarBusCanTransport", +] diff --git a/isotp_async/carbus_iface.py b/isotp_async/carbus_iface.py new file mode 100644 index 0000000..c646118 --- /dev/null +++ b/isotp_async/carbus_iface.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import asyncio +from typing import Optional + +from carbus_async.device import CarBusDevice +from carbus_async.messages import CanMessage + + +from .iface import CanTransport + + +class CarBusCanTransport(CanTransport): + def __init__(self, dev: CarBusDevice, channel: int, rx_id: int) -> None: + self._dev = dev + self._channel = channel + self._rx_id = rx_id + + async def send(self, msg: CanMessage) -> None: + await self._dev.send_can( + msg, + channel=self._channel, + confirm=False, + echo=False, + ) + + async def recv(self, timeout: Optional[float] = None) -> Optional[CanMessage]: + while True: + if timeout is None: + ch, msg = await self._dev.receive_can() + else: + try: + ch, msg = await asyncio.wait_for( + self._dev.receive_can(), + timeout=timeout, + ) + except asyncio.TimeoutError: + return None + + if ch != self._channel: + continue + if msg.can_id != self._rx_id: + continue + + return msg diff --git a/isotp_async/iface.py b/isotp_async/iface.py new file mode 100644 index 0000000..ea0fdf4 --- /dev/null +++ b/isotp_async/iface.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from typing import Protocol, Optional +from carbus_async.messages import CanMessage + + +class CanTransport(Protocol): + + async def send(self, msg: CanMessage) -> None: + ... + + async def recv(self, timeout: Optional[float] = None) -> Optional[CanMessage]: + ... diff --git a/isotp_async/transport.py b/isotp_async/transport.py new file mode 100644 index 0000000..efa681f --- /dev/null +++ b/isotp_async/transport.py @@ -0,0 +1,223 @@ +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +from typing import Optional + +from carbus_async.messages import CanMessage +from .iface import CanTransport + + +def _st_min_to_seconds(st_min: int) -> float: + + if 0x00 <= st_min <= 0x7F: + return st_min / 1000.0 + if 0xF1 <= st_min <= 0xF9: + return (st_min - 0xF0) * 100e-6 # 0xF1 -> 1*100us, ... + return 0.0 + + +@dataclass +class IsoTpChannel: + + can: CanTransport + tx_id: int + rx_id: int + + block_size: int = 8 + st_min_ms: int = 0 + fc_timeout: float = 1.0 + cf_timeout: float = 1.0 + + async def send_pdu(self, data: bytes) -> None: + length = len(data) + if length <= 7: + # Single Frame + pci = bytes([0x00 | (length & 0x0F)]) + + if len(data) < 7: + for _ in range(7-len(data)): + data = data + b"\xaa" + + msg = CanMessage( + can_id=self.tx_id, + data=pci + data, + ) + await self.can.send(msg) + return + + # --- Multi-frame: First Frame --- + len_hi = (length >> 8) & 0x0F + len_lo = length & 0xFF + ff_pci0 = 0x10 | len_hi # high nibble = 0x1 (FF), low=high bits len + ff_pci1 = len_lo + + first_data = data[:6] + ff_payload = bytes([ff_pci0, ff_pci1]) + first_data + msg = CanMessage( + can_id=self.tx_id, + data=ff_payload, + ) + await self.can.send(msg) + + offset = 6 + + # --- FlowControl от peer --- + fc_msg = await self.can.recv(timeout=self.fc_timeout) + if fc_msg is None: + raise asyncio.TimeoutError("ISO-TP: FlowControl timeout (N_Bs)") + + if not fc_msg.data: + raise RuntimeError("ISO-TP: empty FlowControl frame") + + fc_pci = fc_msg.data[0] + fc_type = fc_pci >> 4 # 3 = FC + + if fc_type != 0x3: + raise RuntimeError(f"ISO-TP: expected FlowControl, got PCI=0x{fc_pci:02X}") + + fs = fc_pci & 0x0F # FS (0=CTS, 1=WT, 2=OVFLW) + bs = fc_msg.data[1] if len(fc_msg.data) > 1 else 0 + st_min_raw = fc_msg.data[2] if len(fc_msg.data) > 2 else self.st_min_ms + + if fs == 0x2: + raise RuntimeError("ISO-TP: FlowControl OVFLW from peer") + + # Для простоты: поддерживаем только FS=CTS (0x0) + if fs != 0x0: + raise RuntimeError(f"ISO-TP: unsupported FlowStatus=0x{fs:02X}") + + if bs == 0x00: + # 0 => "unlimited" + bs = 0 + + st_min = _st_min_to_seconds(st_min_raw) + + # --- Consecutive Frames --- + seq_num = 1 + frames_in_block = 0 + + while offset < length: + if bs != 0 and frames_in_block >= bs: + # BS, FC + fc_msg = await self.can.recv(timeout=self.fc_timeout) + if fc_msg is None: + raise asyncio.TimeoutError("ISO-TP: second FlowControl timeout (N_Bs)") + + fc_pci = fc_msg.data[0] + fc_type = fc_pci >> 4 + if fc_type != 0x3: + raise RuntimeError(f"ISO-TP: expected FlowControl, got PCI=0x{fc_pci:02X}") + fs = fc_pci & 0x0F + bs = fc_msg.data[1] if len(fc_msg.data) > 1 else 0 + st_min_raw = fc_msg.data[2] if len(fc_msg.data) > 2 else self.st_min_ms + + if fs == 0x2: + raise RuntimeError("ISO-TP: FlowControl OVFLW from peer") + if fs != 0x0: + raise RuntimeError(f"ISO-TP: unsupported FlowStatus=0x{fs:02X}") + + if bs == 0x00: + bs = 0 + st_min = _st_min_to_seconds(st_min_raw) + frames_in_block = 0 + + chunk = data[offset:offset + 7] + offset += len(chunk) + + cf_pci = 0x20 | (seq_num & 0x0F) # high nibble = 0x2 (CF) + cf_payload = bytes([cf_pci]) + chunk + + if len(cf_payload) < 8: + for _ in range(8 - len(cf_payload)): + cf_payload = cf_payload + b"\xaa" + + msg = CanMessage( + can_id=self.tx_id, + data=cf_payload, + ) + await self.can.send(msg) + + seq_num = (seq_num + 1) & 0x0F + if seq_num == 0: + seq_num = 0x1 + + frames_in_block += 1 + + if st_min > 0: + await asyncio.sleep(st_min) + + async def recv_pdu(self, timeout: float = 1.0) -> Optional[bytes]: + + first = await self.can.recv(timeout=timeout) + if first is None: + return None + + data = first.data + if not data: + return None + + pci = data[0] + frame_type = pci >> 4 + + # --- Single Frame --- + if frame_type == 0x0: + length = pci & 0x0F + return data[1:1 + length] + + # --- First Frame --- + if frame_type != 0x1: + return None + + # FF length + length_hi = pci & 0x0F + length_lo = data[1] if len(data) > 1 else 0 + total_length = (length_hi << 8) | length_lo + + payload = bytearray(data[2:]) + + bs = self.block_size + st_raw = self.st_min_ms + fc_pci = 0x30 # type=3 (FC), FS=0 (CTS) + fc_payload = bytes([fc_pci, bs & 0xFF, st_raw & 0xFF]) + + if len(fc_payload) < 8: + fc_payload = fc_payload + b"\x00" * (8 - len(fc_payload)) + + fc_frame = CanMessage( + can_id=self.tx_id, + data=fc_payload, + ) + await self.can.send(fc_frame) + + st_min = _st_min_to_seconds(st_raw) + + expected_sn = 1 + while len(payload) < total_length: + cf = await self.can.recv(timeout=self.cf_timeout) + if cf is None: + raise asyncio.TimeoutError("ISO-TP: CF timeout (N_Cr)") + + if not cf.data: + continue + + cf_pci = cf.data[0] + cf_type = cf_pci >> 4 + if cf_type != 0x2: + continue + + sn = cf_pci & 0x0F + if sn != (expected_sn & 0x0F): + raise RuntimeError( + f"ISO-TP: wrong sequence number: got {sn}, expected {expected_sn}" + ) + + payload.extend(cf.data[1:]) + expected_sn = (expected_sn + 1) & 0x0F + if expected_sn == 0: + expected_sn = 1 + + if st_min > 0: + await asyncio.sleep(st_min) + + return bytes(payload[:total_length]) diff --git a/main.py b/main.py new file mode 100644 index 0000000..c89fe5d --- /dev/null +++ b/main.py @@ -0,0 +1,56 @@ +import asyncio + +from carbus_async.device import CarBusDevice +from isotp_async.carbus_iface import CarBusCanTransport +from isotp_async.transport import IsoTpChannel +from uds_async.client import UdsClient + +# import logging +# +# logging.basicConfig( +# level=logging.DEBUG, +# format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +# ) +# +# logging.getLogger("carbus_async.wire").setLevel(logging.DEBUG) + + +async def main(): + + dev = await CarBusDevice.open("COM6") + + await dev.open_can_channel( + channel=1, + nominal_bitrate=500_000, + ) + + await dev.set_terminator(channel=1, enabled=True) + + await asyncio.sleep(0.5) + + info = await dev.get_device_info() + print("HW:", info.hardware_id, info.hardware_name) + print("FW:", info.firmware_version) + print("Serial #", info.serial_int) + + print("Features:", + "gateway" if info.feature_gateway else "", + "isotp" if info.feature_isotp else "", + "txbuf" if info.feature_tx_buffer else "", + "txtask" if info.feature_tx_task else "", + ) + + await dev.clear_all_filters(1) + await dev.set_std_id_filter(1, index=0, can_id=0x7E8, mask=0x7FF) + + can_tr = CarBusCanTransport(dev, channel=1, rx_id=0x7E8) + isotp = IsoTpChannel(can_tr, tx_id=0x7E0, rx_id=0x7E8) + uds = UdsClient(isotp) + + vin = await uds.read_data_by_identifier(0xF190) + print("VIN:", vin.decode(errors="ignore")) + + await dev.close() + + +asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..ceaea1f --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,27 @@ +[build-system] +requires = ["setuptools>=64", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "carbus-lib" +version = "0.1.0" +description = "Async CAN / ISO-TP / UDS library for Car Bus Analyzer" +readme = "README.md" +requires-python = ">=3.10" +authors = [{ name = "Mike" }] +license = "MIT" + +dependencies = [ + "pyserial>=3.5", + "pyserial-asyncio>=0.6", +] + +[project.urls] +Homepage = "https://github.com/controllerzz/car_bus_lib" + +[tool.setuptools] +package-dir = {"" = "."} + +[tool.setuptools.packages.find] +where = ["."] +include = ["carbus_async", "isotp_async", "uds_async"] diff --git a/uds_async/__init__.py b/uds_async/__init__.py new file mode 100644 index 0000000..7111068 --- /dev/null +++ b/uds_async/__init__.py @@ -0,0 +1,22 @@ +from .client import UdsClient +from .server import UdsServer + +try: + from .types import ( + UdsRequest, + UdsResponse, + UdsPositiveResponse, + UdsNegativeResponse, + ResponseCode, + ) + + __all__ = [ + "UdsClient", + "UdsServer", + ] +except ImportError: + # если types.py нет или переименован — хотя бы клиент и сервер доступны + __all__ = [ + "UdsClient", + "UdsServer", + ] diff --git a/uds_async/client.py b/uds_async/client.py new file mode 100644 index 0000000..491d552 --- /dev/null +++ b/uds_async/client.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + +from isotp_async.transport import IsoTpChannel +from .exceptions import UdsError, UdsNegativeResponse + + +@dataclass +class UdsClient: + + isotp: IsoTpChannel + p2_timeout: float = 1.0 + + async def _request(self, payload: bytes) -> bytes: + await self.isotp.send_pdu(payload) + resp = await self.isotp.recv_pdu(timeout=self.p2_timeout) + if resp is None: + raise TimeoutError("UDS response timeout") + + if resp[0] == 0x7F: + if len(resp) < 3: + raise UdsError("Malformed UDS negative response") + sid = resp[1] + nrc = resp[2] + raise UdsNegativeResponse(req_sid=sid, nrc=nrc) + + return resp + + async def diagnostic_session_control(self, session: int) -> bytes: + req = bytes([0x10, session & 0xFF]) + resp = await self._request(req) + if resp[0] != 0x50: + raise UdsError(f"Unexpected SID 0x{resp[0]:02X} for DSC") + return resp + + async def tester_present(self, suppress_response: bool = False) -> Optional[bytes]: + + sub = 0x80 if suppress_response else 0x00 + req = bytes([0x3E, sub]) + + if suppress_response: + try: + resp = await self._request(req) + except TimeoutError: + return None + return resp + + resp = await self._request(req) + if resp[0] != 0x7E: + raise UdsError(f"Unexpected SID 0x{resp[0]:02X} for TesterPresent") + return resp + + async def read_data_by_identifier(self, did: int) -> bytes: + + req = bytes([0x22, (did >> 8) & 0xFF, did & 0xFF]) + resp = await self._request(req) + if resp[0] != 0x62: + raise UdsError(f"Unexpected SID 0x{resp[0]:02X} for RDBI") + if len(resp) < 3: + raise UdsError("Malformed RDBI response") + return resp[3:] diff --git a/uds_async/exceptions.py b/uds_async/exceptions.py new file mode 100644 index 0000000..03a5968 --- /dev/null +++ b/uds_async/exceptions.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +class UdsError(Exception): + ... + +@dataclass +class UdsNegativeResponse(UdsError): + + req_sid: int + nrc: int + + def __str__(self) -> str: + return f"UDS NRC 0x{self.nrc:02X} for SID 0x{self.req_sid:02X}" diff --git a/uds_async/server.py b/uds_async/server.py new file mode 100644 index 0000000..5cc87ad --- /dev/null +++ b/uds_async/server.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from typing import Awaitable, Callable, Dict, Optional + +from isotp_async.transport import IsoTpChannel +from .exceptions import UdsNegativeResponse + +UdsHandler = Callable[[bytes], Awaitable[Optional[bytes]]] + + +@dataclass +class UdsServer: + + isotp: IsoTpChannel + p2_timeout: float = 1.0 + handlers: Dict[int, UdsHandler] = field(default_factory=dict) + + async def serve_forever(self) -> None: + while True: + try: + req = await self.isotp.recv_pdu(timeout=self.p2_timeout) + except asyncio.CancelledError: + break + + if not req: + continue + + sid = req[0] + handler = self.handlers.get(sid) + if handler is None: + # ServiceNotSupported + await self._send_negative_response(sid, 0x11) + continue + + try: + resp = await handler(req) + if resp is None: + continue + + await self.isotp.send_pdu(resp) + + except UdsNegativeResponse as e: + await self._send_negative_response(e.req_sid, e.nrc) + + except Exception: + # General programming failure (0x72) + await self._send_negative_response(sid, 0x72) + + async def _send_negative_response(self, sid: int, nrc: int) -> None: + payload = bytes([0x7F, sid & 0xFF, nrc & 0xFF]) + await self.isotp.send_pdu(payload) + + def add_handler(self, sid: int, handler: UdsHandler) -> None: + self.handlers[sid & 0xFF] = handler + + def service(self, sid: int): + def decorator(func: UdsHandler) -> UdsHandler: + self.add_handler(sid, func) + return func + return decorator