From 957d825a3b189b8cfbd8c0244ab8cb6d3596d003 Mon Sep 17 00:00:00 2001
From: controllerzz <79202101363@mail.ru>
Date: Thu, 11 Dec 2025 09:04:25 +0300
Subject: [PATCH] save
---
.gitignore | 1 +
.idea/carbus_lib.iml | 10 +
README.md | 257 +++++++-
carbus_async/__init__.py | 12 +
carbus_async/device.py | 1147 +++++++++++++++++++++++++++++++++++
carbus_async/exceptions.py | 10 +
carbus_async/messages.py | 55 ++
carbus_async/protocol.py | 144 +++++
carbus_async/tcp_bridge.py | 135 +++++
isotp_async/__init__.py | 6 +
isotp_async/carbus_iface.py | 45 ++
isotp_async/iface.py | 13 +
isotp_async/transport.py | 223 +++++++
main.py | 56 ++
pyproject.toml | 27 +
uds_async/__init__.py | 22 +
uds_async/client.py | 63 ++
uds_async/exceptions.py | 16 +
uds_async/server.py | 62 ++
19 files changed, 2303 insertions(+), 1 deletion(-)
create mode 100644 .idea/carbus_lib.iml
create mode 100644 carbus_async/__init__.py
create mode 100644 carbus_async/device.py
create mode 100644 carbus_async/exceptions.py
create mode 100644 carbus_async/messages.py
create mode 100644 carbus_async/protocol.py
create mode 100644 carbus_async/tcp_bridge.py
create mode 100644 isotp_async/__init__.py
create mode 100644 isotp_async/carbus_iface.py
create mode 100644 isotp_async/iface.py
create mode 100644 isotp_async/transport.py
create mode 100644 main.py
create mode 100644 pyproject.toml
create mode 100644 uds_async/__init__.py
create mode 100644 uds_async/client.py
create mode 100644 uds_async/exceptions.py
create mode 100644 uds_async/server.py
diff --git a/.gitignore b/.gitignore
index b7faf40..336736f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -205,3 +205,4 @@ cython_debug/
marimo/_static/
marimo/_lsp/
__marimo__/
+*.xml
diff --git a/.idea/carbus_lib.iml b/.idea/carbus_lib.iml
new file mode 100644
index 0000000..74d515a
--- /dev/null
+++ b/.idea/carbus_lib.iml
@@ -0,0 +1,10 @@
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/README.md b/README.md
index 0a6d62b..33f37a8 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,256 @@
-# carbus_lib
\ No newline at end of file
+# car-bus-lib (async CAN / ISO-TP / UDS stack)
+
+Асинхронная библиотека на Python для работы с CAN-адаптером **CAN-Hacker / Car Bus Analyzer**:
+
+- 📡 **`carbus_async`** – низкоуровневая работа с железкой (CAN/LIN, фильтры, терминаторы и т.д.)
+- 📦 **`isotp_async`** – ISO-TP (ISO 15765-2) поверх CAN (single + multi-frame)
+- 🩺 **`uds_async`** – UDS (ISO 14229) клиент и сервер (диагностика, чтение VIN и т.п.)
+- 🌐 **TCP-bridge** – удалённое подключение к адаптеру через сеть (как будто он воткнут локально)
+
+> Минимальные примеры, никаких «магических» зависимостей — всё на `asyncio`.
+
+---
+
+## Установка
+
+Пока проект в разработке, можно ставить его как editable-модуль из репозитория:
+
+```bash
+git clone https://github.com/your_name/carbus_lib.git
+cd car_bus_lib
+pip install -e .
+```
+
+# car-bus-lib
+
+Асинхронная библиотека для работы с CAN / CAN-FD, ISO-TP и UDS.
+Поддерживает локальное подключение через USB CDC и удалённую работу через TCP-bridge.
+
+---
+
+## Возможности
+
+- CAN / CAN-FD отправка и приём
+- Настройка каналов, скоростей, режимов, BRS
+- Фильтры ID, очистка фильтров, управление терминатором 120 Ω
+- ISO-TP (single + multi-frame)
+- UDS Client и UDS Server (эмуляция ЭБУ)
+- TCP-мост: удалённая работа с адаптером так, как будто он подключён локально
+- Логирование всего протокольного трафика
+
+---
+
+## Установка
+````bat
+ pip install pyserial pyserial-asyncio
+
+ git clone https://github.com/your_name/carbus_lib.git
+ cd car-bus-lib
+ pip install -e .
+````
+
+## 1. Работа с CAN
+
+Простейший пример: открыть устройство, настроить канал и отправить / принять кадр.
+
+````python
+ import asyncio
+ from carbus_async.device import CarBusDevice
+ from carbus_async.messages import CanMessage
+
+ async def main():
+ dev = await CarBusDevice.open("COM6", baudrate=115200)
+
+ # классический CAN 500 kbit/s
+ await dev.open_can_channel(
+ channel=1,
+ nominal_bitrate=500_000,
+ fd=False,
+ )
+
+ # отправка кадра 0x7E0 8 байт
+ msg = CanMessage(can_id=0x7E0, data=b"\x02\x3E\x00\x00\x00\x00\x00\x00")
+ await dev.send_can(msg, channel=1)
+
+ # приём любого сообщения
+ ch, rx = await dev.receive_can()
+ print("RX:", ch, rx)
+
+ await dev.close()
+
+ asyncio.run(main())
+````
+
+
+## 2. Информация об устройстве и фильтры
+
+Пример запроса DEVICE_INFO и настройки фильтров:
+````python
+ info = await dev.get_device_info()
+
+ print("HW:", info.hardware_name)
+ print("FW:", info.firmware_version)
+ print("Serial:", info.serial_int)
+
+ # очистить все фильтры на канале 1
+ await dev.clear_all_filters(1)
+
+ # разрешить только ответы с ID 0x7E8 (11-битный стандартный ID)
+ await dev.set_std_id_filter(
+ channel=1,
+ index=0,
+ can_id=0x7E8,
+ mask=0x7FF,
+ )
+
+ # включить/выключить терминатор 120 Ω
+ await dev.set_terminator(channel=1, enabled=True)
+````
+
+## 3. ISO-TP (isotp_async)
+ISO-TP канал строится поверх CarBusDevice:
+````python
+ from isotp_async import IsoTpChannel
+
+ # предполагается, что dev уже открыт и канал CAN настроен
+ isotp = IsoTpChannel(
+ device=dev,
+ channel=1,
+ tx_id=0x7E0, # наш запрос
+ rx_id=0x7E8, # ответ ЭБУ
+ )
+
+ # отправить запрос ReadDataByIdentifier F190 (VIN)
+ await isotp.send(b"\x22\xF1\x90")
+
+ # получить полный ответ (single или multi-frame)
+ resp = await isotp.recv(timeout=1.0)
+ print("ISO-TP:", resp.hex())
+````
+
+## 4. UDS Client (uds_async.client)
+
+Клиент UDS использует IsoTpChannel:
+````python
+ from uds_async.client import UdsClient
+ from isotp_async import IsoTpChannel
+
+ isotp = IsoTpChannel(dev, channel=1, tx_id=0x7E0, rx_id=0x7E8)
+ uds = UdsClient(isotp_channel=isotp)
+
+ # переход в расширенную диагностическую сессию
+ await uds.diagnostic_session_control(0x03)
+
+ # чтение VIN (DID F190)
+ vin_bytes = await uds.read_data_by_identifier(0xF190)
+ print("VIN:", vin_bytes.decode(errors="ignore"))
+````
+
+## 5. UDS Server (эмулятор ЭБУ)
+
+Простой UDS-сервер, который отвечает на запрос VIN:
+````python
+ from uds_async.server import UdsServer, UdsRequest, UdsPositiveResponse
+ from isotp_async import IsoTpChannel
+
+ class MyEcuServer(UdsServer):
+ async def handle_read_data_by_identifier(self, req: UdsRequest):
+ if req.data_identifier == 0xF190:
+ # положительный ответ: 62 F1 90 + данные
+ return UdsPositiveResponse(b"\x62\xF1\x90DEMO-VIN-1234567")
+ # всё остальное обрабатывается базовой реализацией
+ return await super().handle_read_data_by_identifier(req)
+
+ async def main():
+ dev = await CarBusDevice.open("COM6", baudrate=115200)
+ await dev.open_can_channel(channel=1, nominal_bitrate=500_000, fd=False)
+
+ isotp = IsoTpChannel(dev, channel=1, tx_id=0x7E8, rx_id=0x7E0)
+ server = MyEcuServer(isotp_channel=isotp)
+ await server.serve_forever()
+
+ asyncio.run(main())
+````
+
+## 6. Удалённая работа через TCP (tcp_bridge)
+
+### 6.1. Сервер (рядом с адаптером)
+
+На машине, где физически подключён CAN-адаптер:
+
+ python -m carbus_async.tcp_bridge --serial COM6 --port 7000
+
+Адаптер открывается локально, а поверх него поднимается TCP-мост.
+
+### 6.2. Клиент (удалённая машина)
+
+На другой машине можно использовать тот же API, как с локальным COM, но через `open_tcp`:
+````python
+ import asyncio
+ from carbus_async.device import CarBusDevice
+ from carbus_async.messages import CanMessage
+
+ async def main():
+ dev = await CarBusDevice.open_tcp("192.168.1.10", 7000)
+
+ await dev.open_can_channel(
+ channel=1,
+ nominal_bitrate=500_000,
+ fd=False,
+ )
+
+ msg = CanMessage(can_id=0x321, data=b"\x01\x02\x03\x04")
+ await dev.send_can(msg, channel=1)
+
+ ch, rx = await dev.receive_can()
+ print("REMOTE RX:", ch, rx)
+
+ await dev.close()
+
+ asyncio.run(main())
+````
+
+## 7. Логирование
+
+Для отладки удобно включить подробное логирование:
+````python
+ import logging
+
+ logging.basicConfig(
+ level=logging.DEBUG,
+ format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
+ )
+````
+Логгеры:
+
+- `carbus_async.wire.*` — сырые кадры по USB/TCP (TX/RX)
+- `carbus_async.device.*` — высокоуровневые события, ошибки, BUS_ERROR
+- дополнительные логгеры в isotp_async / uds_async
+
+---
+
+## 8. Структура проекта
+
+ carbus_async/
+ device.py — асинхронный интерфейс к адаптеру (CAN/CAN-FD)
+ protocol.py — описания команд, флагов и структур протокола
+ messages.py — модель CanMessage и вспомогательные типы
+ tcp_bridge.py — TCP-мост (сервер для удалённой работы)
+
+ isotp_async/
+ __init__.py — IsoTpChannel и вспомогательные сущности
+
+ uds_async/
+ client.py — UdsClient (клиент UDS)
+ server.py — UdsServer (сервер / эмулятор ЭБУ)
+ types.py — структуры запросов/ответов
+
+---
+
+## 9. Лицензия
+
+MIT (можно поменять под нужды проекта).
+
+Pull Requests и предложения по улучшению приветствуются 🚗
+
+
diff --git a/carbus_async/__init__.py b/carbus_async/__init__.py
new file mode 100644
index 0000000..a8ef2ad
--- /dev/null
+++ b/carbus_async/__init__.py
@@ -0,0 +1,12 @@
+from .device import CarBusDevice
+from .messages import CanMessage, MessageDirection
+from .exceptions import CarBusError, CommandError, SyncError
+
+__all__ = [
+ "CarBusDevice",
+ "CanMessage",
+ "MessageDirection",
+ "CarBusError",
+ "CommandError",
+ "SyncError",
+]
diff --git a/carbus_async/device.py b/carbus_async/device.py
new file mode 100644
index 0000000..781300d
--- /dev/null
+++ b/carbus_async/device.py
@@ -0,0 +1,1147 @@
+from __future__ import annotations
+
+import asyncio
+import contextlib
+import logging
+import struct
+from dataclasses import dataclass, field
+from typing import Dict, Optional, Tuple, List
+
+import serial_asyncio
+
+from .exceptions import CarBusError, SyncError, CommandError
+from .messages import CanMessage
+
+
+from .protocol import (
+ Command,
+ CommandHeader,
+ MsgCommandHeader,
+ HeaderFlags,
+ BusMessageFlags,
+ CC_MULTIWORD,
+ is_ack,
+ base_command_from_ack,
+ need_extended_header,
+)
+
+
+NOMINAL_BITRATE_INDEX: Dict[int, int] = {
+ 10_000: 0,
+ 20_000: 1,
+ 33_300: 2,
+ 50_000: 3,
+ 62_500: 4,
+ 83_300: 5,
+ 95_200: 6,
+ 100_000: 7,
+ 125_000: 8,
+ 250_000: 9,
+ 400_000: 10,
+ 500_000: 11,
+ 800_000: 12,
+ 1_000_000: 13,
+ # 0xFF = detect,
+}
+
+
+DATA_BITRATE_INDEX: Dict[int, int] = {
+ 500_000: 0,
+ 1_000_000: 1,
+ 2_000_000: 2,
+ 4_000_000: 3,
+ 5_000_000: 4,
+}
+
+
+@dataclass
+class DeviceInfoParam:
+ header: int
+ data: List[int]
+
+from .protocol import (
+ Command,
+ CommandHeader,
+ MsgCommandHeader,
+ HeaderFlags,
+ BusMessageFlags,
+ CC_MULTIWORD,
+ DI_HARDWARE_ID,
+ DI_FIRMWARE_VERSION,
+ DI_DEVICE_SERIAL,
+ DI_FEATURES,
+ DI_CHANNEL_MAP,
+ DI_CHANNEL_FEATURES,
+ DI_FILTER,
+ DI_GATEWAY,
+ DI_CHANNEL_FREQUENCY,
+ DI_ISOTP,
+ DI_TX_BUFFER,
+ DI_TX_TASK,
+ is_ack,
+ base_command_from_ack,
+ need_extended_header,
+)
+
+HW_ID_NAMES: Dict[int, str] = {
+ 0xFF: "HW_CH30",
+ 0x02: "HW_ODB_OLD",
+ 0x01: "HW_CH32",
+ 0x04: "HW_ODB",
+ 0x03: "HW_CHP",
+ 0x11: "HW_CH33",
+ 0x13: "HW_CHPM03",
+ 0x14: "HW_ODB_FD",
+ 0x06: "HW_FDL2_M02",
+ 0x16: "HW_FDL2_M05",
+}
+
+# DI_FEATURES
+DI_FEATURE_GATEWAY = 0x00000001
+DI_FEATURE_ISOTP = 0x00000002
+DI_FEATURE_TX_BUFFER = 0x00000004
+DI_FEATURE_TX_TASK = 0x00000008
+
+FLAG_CONFIG_TERMINATOR = 0x05
+
+# ChannelType (DI_CHANNEL_MAP)
+CHANNEL_TYPE_MAP: Dict[int, str] = {
+ 0x00: "NONE", # CTE_None
+ 0x01: "CAN", # CTE_CAN
+ 0x02: "CANFD", # CTE_CANFD
+ 0x10: "LIN", # CTE_LIN
+}
+
+# DI_CHANNEL_FEATURES
+DI_CHANNEL_CHANNEL_MASK = 0x00FF0000
+DI_CHANNEL_CHANNEL_SHIFT = 16
+
+DI_CHANNEL_FEATURE_ALC = 0x00000001
+DI_CHANNEL_FEATURE_TERMINATOR = 0x00000002
+DI_CHANNEL_FEATURE_PULLUP = 0x00000004
+DI_CHANNEL_FEATURE_CSD = 0x00000008
+DI_CHANNEL_FEATURE_IDLE = 0x00000010
+DI_CHANNEL_FEATURE_DSD = 0x00000020
+DI_CHANNEL_FEATURE_NONISO = 0x00000040
+
+# DI_FILTER
+DI_FILTER_CHANNEL_MASK = 0x00FF0000
+DI_FILTER_CHANNEL_SHIFT = 16
+
+DI_FILTER_TYPE_MASK = 0x0000FF00
+DI_FILTER_TYPE_SHIFT = 8
+DI_FILTER_SIZE_MASK = 0x000000FF
+DI_FILTER_SIZE_SHIFT = 0
+
+DI_FILTER_TYPE_8BIT = 0x01
+DI_FILTER_TYPE_11BIT = 0x02
+DI_FILTER_TYPE_29BIT = 0x04
+
+# DI_GATEWAY
+DI_GATEWAY_SRC_MASK = 0x00FF0000
+DI_GATEWAY_SRC_SHIFT = 16
+DI_GATEWAY_DST_MASK = 0x0000FF00
+DI_GATEWAY_DST_SHIFT = 8
+DI_GATEWAY_FILTER_MASK = 0x000000FF
+DI_GATEWAY_FILTER_SHIFT = 0
+
+@dataclass
+class DeviceInfo:
+ params: List[DeviceInfoParam]
+ raw_payload: bytes
+
+ @classmethod
+ def from_payload(cls, payload: bytes) -> "DeviceInfo":
+ if len(payload) % 4 != 0:
+ raise ValueError(
+ f"DEVICE_INFO payload length {len(payload)} is not multiple of 4"
+ )
+
+ words = [
+ int.from_bytes(payload[i: i + 4], "little")
+ for i in range(0, len(payload), 4)
+ ]
+
+ params: List[DeviceInfoParam] = []
+ i = 0
+ n = len(words)
+
+ while i < n:
+ header = words[i]
+ i += 1
+
+ if header & CC_MULTIWORD:
+ length = (header >> 16) & 0xFF
+ data = words[i: i + length]
+ i += length
+ else:
+ data = []
+
+ params.append(DeviceInfoParam(header=header, data=data))
+
+ return cls(params=params, raw_payload=payload)
+
+ @staticmethod
+ def _param_code(value: int) -> int:
+ return value & 0x7F000000
+
+ def _find_first(self, base: int) -> Optional[DeviceInfoParam]:
+ base_code = self._param_code(base)
+ for p in self.params:
+ if self._param_code(p.header) == base_code:
+ return p
+ return None
+
+ def _find_all(self, base: int) -> List[DeviceInfoParam]:
+ base_code = self._param_code(base)
+ return [
+ p for p in self.params
+ if self._param_code(p.header) == base_code
+ ]
+
+ def find_by_prefix(self, prefix: int) -> List[DeviceInfoParam]:
+ base_code = self._param_code(prefix)
+ return [
+ p for p in self.params
+ if self._param_code(p.header) == base_code
+ ]
+
+ @property
+ def hardware_id(self) -> Optional[int]:
+ p = self._find_first(DI_HARDWARE_ID)
+ if not p:
+ return None
+ return p.header & 0xFF
+
+ @property
+ def hardware_name(self) -> Optional[str]:
+ hw_id = self.hardware_id
+ if hw_id is None:
+ return None
+ return HW_ID_NAMES.get(hw_id, f"0x{hw_id:02X}")
+
+ @property
+ def firmware_version(self) -> Optional[str]:
+ p = self._find_first(DI_FIRMWARE_VERSION)
+ if not p:
+ return None
+ if not p.data:
+ return ""
+
+ b = b"".join(w.to_bytes(4, "little") for w in p.data)
+ # Обрезаем по 0x00
+ if b"\x00" in b:
+ b = b.split(b"\x00", 1)[0]
+ try:
+ return b.decode("ascii", errors="ignore")
+ except Exception:
+ return b.hex()
+
+ @property
+ def serial_bytes(self) -> Optional[bytes]:
+ p = self._find_first(DI_DEVICE_SERIAL)
+ if not p or not p.data:
+ return None
+ return b"".join(w.to_bytes(4, "little") for w in p.data)
+
+ @property
+ def serial_int(self) -> Optional[int]:
+ b = self.serial_bytes
+ if b is None:
+ return None
+ return int.from_bytes(b, "big")
+
+ @property
+ def features_mask(self) -> int:
+ p = self._find_first(DI_FEATURES)
+ if not p:
+ return 0
+ return p.header & 0x00FFFFFF
+
+ @property
+ def feature_gateway(self) -> bool:
+ return bool(self.features_mask & DI_FEATURE_GATEWAY)
+
+ @property
+ def feature_isotp(self) -> bool:
+ return bool(self.features_mask & DI_FEATURE_ISOTP)
+
+ @property
+ def feature_tx_buffer(self) -> bool:
+ return bool(self.features_mask & DI_FEATURE_TX_BUFFER)
+
+ @property
+ def feature_tx_task(self) -> bool:
+ return bool(self.features_mask & DI_FEATURE_TX_TASK)
+
+ @property
+ def channel_types(self) -> Dict[int, str]:
+ p = self._find_first(DI_CHANNEL_MAP)
+ if not p:
+ return {}
+
+ value = p.header
+ b0 = (value >> 0) & 0xFF
+ b1 = (value >> 8) & 0xFF
+ b2 = (value >> 16) & 0xFF
+ out: Dict[int, str] = {}
+
+ for idx, v in enumerate((b0, b1, b2), start=1):
+ out[idx] = CHANNEL_TYPE_MAP.get(v, f"0x{v:02X}")
+
+ return out
+
+ @property
+ def channel_features(self) -> Dict[int, Dict[str, bool]]:
+ res: Dict[int, Dict[str, bool]] = {}
+ for p in self._find_all(DI_CHANNEL_FEATURES):
+ ch = (p.header & DI_CHANNEL_CHANNEL_MASK) >> DI_CHANNEL_CHANNEL_SHIFT
+ mask = p.header & 0x0000FFFF
+ if ch == 0:
+ continue
+ res[ch] = {
+ "raw_mask": mask,
+ "alc": bool(mask & DI_CHANNEL_FEATURE_ALC),
+ "terminator": bool(mask & DI_CHANNEL_FEATURE_TERMINATOR),
+ "pullup": bool(mask & DI_CHANNEL_FEATURE_PULLUP),
+ "csd": bool(mask & DI_CHANNEL_FEATURE_CSD),
+ "idle": bool(mask & DI_CHANNEL_FEATURE_IDLE),
+ "dsd": bool(mask & DI_CHANNEL_FEATURE_DSD),
+ "noniso": bool(mask & DI_CHANNEL_FEATURE_NONISO),
+ }
+ return res
+
+ @property
+ def filters_info(self) -> List[Dict[str, int]]:
+ out: List[Dict[str, int]] = []
+ for p in self._find_all(DI_FILTER):
+ h = p.header
+ ch = (h & DI_FILTER_CHANNEL_MASK) >> DI_FILTER_CHANNEL_SHIFT
+ type_mask = (h & DI_FILTER_TYPE_MASK) >> DI_FILTER_TYPE_SHIFT
+ size = (h & DI_FILTER_SIZE_MASK) >> DI_FILTER_SIZE_SHIFT
+
+ if ch == 0:
+ continue
+
+ info = {
+ "channel": ch,
+ "type_mask": type_mask,
+ "size": size,
+ "has_8bit": bool(type_mask & DI_FILTER_TYPE_8BIT),
+ "has_11bit": bool(type_mask & DI_FILTER_TYPE_11BIT),
+ "has_29bit": bool(type_mask & DI_FILTER_TYPE_29BIT),
+ }
+ out.append(info)
+ return out
+
+ @property
+ def gateway_info(self) -> List[Dict[str, int]]:
+ out: List[Dict[str, int]] = []
+ for p in self._find_all(DI_GATEWAY):
+ h = p.header
+ src = (h & DI_GATEWAY_SRC_MASK) >> DI_GATEWAY_SRC_SHIFT
+ dst = (h & DI_GATEWAY_DST_MASK) >> DI_GATEWAY_DST_SHIFT
+ flt = (h & DI_GATEWAY_FILTER_MASK) >> DI_GATEWAY_FILTER_SHIFT
+ if src == 0 or dst == 0:
+ continue
+ out.append({"src": src, "dst": dst, "filters": flt})
+ return out
+
+ @property
+ def channel_frequencies(self) -> Dict[int, int]:
+ out: Dict[int, int] = {}
+ for p in self._find_all(DI_CHANNEL_FREQUENCY):
+ h = p.header
+ ch = (h & DI_GATEWAY_SRC_MASK) >> DI_GATEWAY_SRC_SHIFT # по доке тот же формат
+ freq = h & 0x0000FFFF # в примере 0x15010078 → 0x0078 → 120 (МГц)
+ if ch == 0:
+ continue
+ # по доке пример: 0x78 (120МГц) — домножаем до Hz
+ out[ch] = freq * 1_000_000
+ return out
+
+ @property
+ def isotp_buffer_size(self) -> Optional[int]:
+ p = self._find_first(DI_ISOTP)
+ if not p:
+ return None
+ return p.header & 0x00FFFFFF
+
+ @property
+ def tx_buffer_size(self) -> Optional[int]:
+ p = self._find_first(DI_TX_BUFFER)
+ if not p:
+ return None
+ return p.header & 0x00FFFFFF
+
+ @property
+ def tx_task_count(self) -> Optional[int]:
+ p = self._find_first(DI_TX_TASK)
+ if not p:
+ return None
+ return p.header & 0x00FFFFFF
+
+ def find_by_prefix(self, prefix: int) -> List[DeviceInfoParam]:
+ return [
+ p for p in self.params
+ if (p.header & 0xFF000000) == (prefix & 0xFF000000)
+ ]
+
+ def find_by_prefix(self, prefix: int) -> List[DeviceInfoParam]:
+ """
+ Вернуть все параметры, у которых старший байт совпадает с prefix & 0xFF000000.
+ Например, prefix=0x01000000 вернёт все DI_*, начинающиеся с 0x01.
+ """
+ return [
+ p
+ for p in self.params
+ if (p.header & 0xFF000000) == (prefix & 0xFF000000)
+ ]
+
+
+@dataclass
+class _PendingRequest:
+ future: asyncio.Future
+ command: int
+
+
+@dataclass
+class CarBusDevice:
+ port: str
+ baudrate: int = 115200
+ loop: Optional[asyncio.AbstractEventLoop] = None
+
+ _reader: asyncio.StreamReader = field(init=False, repr=False)
+ _writer: asyncio.StreamWriter = field(init=False, repr=False)
+ _rx_queue: "asyncio.Queue[tuple[int, CanMessage]]" = field(init=False, repr=False)
+ _rx_channel_queues: Dict[int, "asyncio.Queue[CanMessage]"] = field(init=False, repr=False)
+ _pending: Dict[int, _PendingRequest] = field(init=False, repr=False)
+ _seq_counter: int = field(init=False, default=0, repr=False)
+ _reader_task: Optional[asyncio.Task] = field(init=False, default=None, repr=False)
+ _closed: bool = field(init=False, default=False, repr=False)
+
+ _log: logging.Logger = field(init=False, repr=False)
+ _wire_log: logging.Logger = field(init=False, repr=False)
+
+ def _ensure_channel_queue(self, channel: int) -> "asyncio.Queue[CanMessage]":
+ q = self._rx_channel_queues.get(channel)
+ if q is None:
+ q = asyncio.Queue()
+ self._rx_channel_queues[channel] = q
+ return q
+
+ @classmethod
+ async def open(
+ cls,
+ port: str,
+ baudrate: int = 115200,
+ *,
+ loop: Optional[asyncio.AbstractEventLoop] = None,
+ use_can: bool = True,
+ use_lin: bool = False,
+ ) -> "CarBusDevice":
+ self = cls(port=port, baudrate=baudrate, loop=loop)
+ await self._connect()
+ await self.sync()
+ self._start_reader()
+ await self.device_open(use_can=use_can, use_lin=use_lin)
+ return self
+
+ @classmethod
+ async def open_tcp(cls, host: str, port: int, **kwargs) -> "CarBusDevice":
+ return await cls.open(f"socket://{host}:{port}", **kwargs)
+
+ async def _connect(self) -> None:
+ loop = self.loop or asyncio.get_running_loop()
+
+ self._log = logging.getLogger(f"carbus_async.device.{self.port}")
+ self._wire_log = logging.getLogger(f"carbus_async.wire.{self.port}")
+
+ if self.port.startswith("socket://"):
+ addr = self.port[len("socket://") :]
+ if ":" not in addr:
+ raise ValueError(
+ f"Invalid socket URL '{self.port}'. Expected 'socket://host:port'"
+ )
+
+ host, port_str = addr.rsplit(":", 1)
+ try:
+ tcp_port = int(port_str)
+ except ValueError as e:
+ raise ValueError(
+ f"Invalid TCP port in '{self.port}': {port_str!r}"
+ ) from e
+
+ self._log.debug(
+ "Connecting via TCP to %s:%d (tcp_bridge)", host, tcp_port
+ )
+
+ self._reader, self._writer = await asyncio.open_connection(
+ host=host,
+ port=tcp_port,
+ )
+
+ self._log.debug(
+ "Connected to TCP %s:%d (logical port=%s)",
+ host,
+ tcp_port,
+ self.port,
+ )
+
+ else:
+ self._log.debug(
+ "Connecting to serial port %s @ %d using serial_asyncio",
+ self.port,
+ self.baudrate,
+ )
+ self._reader, self._writer = await serial_asyncio.open_serial_connection(
+ loop=loop,
+ url=self.port,
+ baudrate=self.baudrate,
+ )
+ self._log.debug("Connected to %s @ %d", self.port, self.baudrate)
+
+ self._rx_queue = asyncio.Queue()
+ self._rx_channel_queues = {}
+ self._pending = {}
+ self._seq_counter = 0
+ self._reader_task = None
+ self._closed = False
+
+ async def close(self) -> None:
+ if self._closed:
+ return
+
+ try:
+ try:
+ await self.device_close()
+ except Exception:
+ self._log.debug("DEVICE_CLOSE failed, ignoring", exc_info=True)
+
+ if self._reader_task:
+ self._reader_task.cancel()
+ with contextlib.suppress(asyncio.CancelledError):
+ await self._reader_task
+
+ self._writer.close()
+ await self._writer.wait_closed()
+ finally:
+ self._closed = True
+
+ for pending in list(self._pending.values()):
+ if not pending.future.done():
+ pending.future.set_exception(
+ CarBusError("Device closed before response was received")
+ )
+ self._pending.clear()
+
+ def _start_reader(self) -> None:
+ if self._reader_task is None or self._reader_task.done():
+ self._reader_task = asyncio.create_task(
+ self._read_loop(),
+ name=f"carbus_read_loop_{self.port}",
+ )
+
+ def _next_seq(self) -> int:
+ self._seq_counter = (self._seq_counter + 1) & 0xFF
+ if self._seq_counter == 0:
+ self._seq_counter = 1
+ return self._seq_counter
+
+ async def sync(self) -> None:
+ frame = bytes((Command.SYNC, 0x00, Command.SYNC, 0x00))
+ self._wire_log.debug("TX SYNC: %s", frame.hex(" "))
+
+ self._writer.write(frame)
+ await self._writer.drain()
+
+ resp = await self._reader.readexactly(4)
+ self._wire_log.debug("RX SYNC: %s", resp.hex(" "))
+
+ if resp != bytes((0x5A, 0x00, 0x5A, 0x00)):
+ raise SyncError(f"Unexpected SYNC response: {resp!r}")
+
+ async def _send_raw(
+ self,
+ command: int,
+ *,
+ header_flags: int = 0,
+ payload: bytes = b"",
+ expect_response: bool = True,
+ ) -> Tuple[int, int, bytes]:
+ if self._closed:
+ raise CarBusError("Device is closed")
+
+ seq = self._next_seq()
+ dsize = len(payload)
+
+ if need_extended_header(command):
+ header = MsgCommandHeader(
+ command=command,
+ sequence=seq,
+ flags=header_flags,
+ dsize=dsize,
+ )
+ else:
+ if dsize > 0xFF:
+ raise ValueError(
+ f"dsize={dsize} too big for CommandHeader (max 255)"
+ )
+ header = CommandHeader(
+ command=command,
+ sequence=seq,
+ flags=header_flags,
+ dsize=dsize,
+ )
+
+ header_bytes = header.to_bytes()
+ frame = header_bytes + payload
+
+ fut: asyncio.Future = asyncio.get_running_loop().create_future()
+ if expect_response:
+ self._pending[seq] = _PendingRequest(future=fut, command=command)
+
+ self._wire_log.debug(
+ "TX cmd=0x%02X seq=%d flags=0x%04X dsize=%d :: %s",
+ command,
+ seq,
+ header_flags,
+ dsize,
+ frame.hex(" "),
+ )
+
+ self._writer.write(frame)
+ await self._writer.drain()
+
+ if not expect_response:
+ return 0, 0, b""
+
+ cmd_resp, flags_resp, payload_resp = await fut
+ return cmd_resp, flags_resp, payload_resp
+
+ async def get_device_info(self) -> DeviceInfo:
+ cmd, flags, payload = await self._send_raw(
+ Command.DEVICE_INFO,
+ header_flags=0,
+ payload=b"",
+ expect_response=True,
+ )
+
+ if cmd != Command.DEVICE_INFO:
+ raise CommandError(
+ f"Unexpected DEVICE_INFO response: cmd=0x{cmd:02X}, flags=0x{flags:04X}"
+ )
+
+ return DeviceInfo.from_payload(payload)
+
+ async def device_open(self, *, use_can: bool = True, use_lin: bool = False) -> None:
+ if use_can and use_lin:
+ mode_val = 0x00 # FULL
+ elif use_can and not use_lin:
+ mode_val = 0x01 # CAN only
+ elif not use_can and use_lin:
+ mode_val = 0x02 # LIN only
+ else:
+ mode_val = 0x00 # FULL по умолчанию
+
+ dc_mode = 0x01000000 | mode_val
+ payload = dc_mode.to_bytes(4, "little")
+
+ cmd, flags, resp_payload = await self._send_raw(
+ Command.DEVICE_OPEN,
+ header_flags=0,
+ payload=payload,
+ expect_response=True,
+ )
+
+ if not is_ack(cmd) or base_command_from_ack(cmd) != Command.DEVICE_OPEN:
+ raise CommandError(
+ f"Unexpected DEVICE_OPEN response: cmd=0x{cmd:02X}, flags=0x{flags:04X}"
+ )
+
+ async def device_close(self) -> None:
+ cmd, flags, _ = await self._send_raw(
+ Command.DEVICE_CLOSE,
+ header_flags=0,
+ payload=b"",
+ expect_response=True,
+ )
+
+ if not is_ack(cmd) or base_command_from_ack(cmd) != Command.DEVICE_CLOSE:
+ raise CommandError(
+ f"Unexpected DEVICE_CLOSE response: cmd=0x{cmd:02X}, flags=0x{flags:04X}"
+ )
+
+ async def open_can_channel(
+ self,
+ channel: int = 1,
+ *,
+ nominal_bitrate: int = 500_000,
+ fd: bool = False,
+ data_bitrate: Optional[int] = None,
+ brs: bool = False,
+ listen_only: bool = False,
+ loopback: bool = False,
+ auto_detect: bool = False,
+ retransmit: bool = False,
+ non_iso: bool = False,
+ nominal_index: Optional[int] = None,
+ data_index: Optional[int] = None,
+ ) -> None:
+
+ if loopback:
+ mode_val = 0x02
+ elif listen_only:
+ mode_val = 0x01
+ else:
+ mode_val = 0x00
+
+ cc_can_mode = 0x11000000 | mode_val
+
+ if not fd:
+ frame_mode = 0x00
+ else:
+ frame_mode = 0x02 if brs else 0x01
+
+ cc_can_frame = 0x12000000 | frame_mode
+
+ if auto_detect:
+ n_index = 0xFF
+ elif nominal_index is not None:
+ n_index = nominal_index & 0xFF
+ else:
+ try:
+ n_index = NOMINAL_BITRATE_INDEX[nominal_bitrate]
+ except KeyError:
+ raise ValueError(
+ f"Unsupported nominal bitrate {nominal_bitrate}. "
+ f"Known: {sorted(NOMINAL_BITRATE_INDEX.keys())}"
+ ) from None
+
+ cc_bus_speed_n = 0x01000000 | (n_index & 0xFF)
+
+ params: List[int] = [cc_can_mode, cc_can_frame, cc_bus_speed_n]
+
+ if fd:
+ if data_index is not None:
+ d_index = data_index & 0xFF
+ else:
+ if data_bitrate is None:
+ raise ValueError("fd=True требует указать data_bitrate или data_index")
+ try:
+ d_index = DATA_BITRATE_INDEX[data_bitrate]
+ except KeyError:
+ raise ValueError(
+ f"Unsupported data bitrate {data_bitrate}. "
+ f"Known: {sorted(DATA_BITRATE_INDEX.keys())}"
+ ) from None
+
+ cc_bus_speed_d = 0x02000000 | (d_index & 0xFF)
+ params.append(cc_bus_speed_d)
+
+ if retransmit:
+ params.append(0x13000001)
+
+ if fd and non_iso:
+ params.append(0x14000001)
+
+ payload = b"".join(p.to_bytes(4, "little") for p in params)
+
+ header_flags = (channel & 0x0F) * 0x20
+
+ cmd, flags, resp_payload = await self._send_raw(
+ Command.CHANNEL_OPEN,
+ header_flags=header_flags,
+ payload=payload,
+ expect_response=True,
+ )
+
+ if not is_ack(cmd) or base_command_from_ack(cmd) != Command.CHANNEL_OPEN:
+ raise CommandError(
+ f"Unexpected CHANNEL_OPEN response: cmd=0x{cmd:02X}, flags=0x{flags:04X}"
+ )
+
+ async def set_can_filter(
+ self,
+ channel: int,
+ index: int,
+ *,
+ can_id: int,
+ mask: int | None = None,
+ extended: bool = False,
+ ) -> None:
+
+ if index < 0:
+ raise ValueError("filter index must be >= 0")
+
+ filter_type = 0x01 if extended else 0x00
+
+ if mask is None:
+ mask = 0x1FFFFFFF if extended else 0x7FF
+
+ payload = struct.pack(" None:
+
+ if can_id < 0 or can_id > 0x7FF:
+ raise ValueError("can_id for standard ID must be in 0x000..0x7FF")
+
+ await self.set_can_filter(
+ channel=channel,
+ index=index,
+ can_id=can_id,
+ mask=mask,
+ extended=False,
+ )
+
+ async def set_ext_id_filter(
+ self,
+ channel: int,
+ index: int,
+ can_id: int,
+ mask: int = 0x1FFFFFFF,
+ ) -> None:
+
+ if can_id < 0 or can_id > 0x1FFFFFFF:
+ raise ValueError("can_id for extended ID must be in 0x00000000..0x1FFFFFFF")
+
+ await self.set_can_filter(
+ channel=channel,
+ index=index,
+ can_id=can_id,
+ mask=mask,
+ extended=True,
+ )
+
+ async def clear_all_filters(
+ self,
+ channel: int,
+ *,
+ max_filters: int = 64,
+ stop_on_error: bool = True,
+ ) -> int:
+
+ cleared = 0
+ for idx in range(max_filters):
+ try:
+ await self.clear_can_filter(channel=channel, index=idx)
+ cleared += 1
+ except CommandError as e:
+ if stop_on_error:
+ self._log.debug(
+ "Stop clearing filters on channel %d at index %d due to error: %s",
+ channel,
+ idx,
+ e,
+ )
+ break
+ else:
+ self._log.warning(
+ "Error while clearing filter %d on channel %d: %s",
+ idx,
+ channel,
+ e,
+ )
+ continue
+
+ return cleared
+
+ async def clear_can_filter(
+ self,
+ channel: int,
+ index: int,
+ ) -> None:
+
+ if index < 0:
+ raise ValueError("filter index must be >= 0")
+
+ payload = struct.pack(" None:
+
+ state = 0x01 if enabled else 0x00
+ channel_flag = (channel & 0x0F) * 0x20
+ header_flags = channel_flag | FLAG_CONFIG_TERMINATOR
+
+ payload = bytes((state,))
+
+ cmd, flags, resp_payload = await self._send_raw(
+ Command.CHANNEL_CONFIG,
+ header_flags=header_flags,
+ payload=payload,
+ expect_response=True,
+ )
+
+ if not is_ack(cmd) or base_command_from_ack(cmd) != Command.CHANNEL_CONFIG:
+ raise CommandError(
+ f"Unexpected TERMINATOR response: cmd=0x{cmd:02X}, flags=0x{flags:04X}"
+ )
+
+
+ async def send_can(
+ self,
+ msg: CanMessage,
+ *,
+ channel: int,
+ confirm: bool = False,
+ echo: bool = False,
+ ) -> None:
+
+ if channel == 1:
+ hflags = int(HeaderFlags.CHANNEL_1)
+ elif channel == 2:
+ hflags = int(HeaderFlags.CHANNEL_2)
+ elif channel == 3:
+ hflags = int(HeaderFlags.CHANNEL_3)
+ elif channel == 4:
+ hflags = int(HeaderFlags.CHANNEL_4)
+ else:
+ hflags = (channel & 0x0F) * 0x20
+
+ if confirm:
+ hflags |= int(HeaderFlags.CONFIRM_REQUIRED)
+
+ mflags = BusMessageFlags.NONE
+ if msg.extended:
+ mflags |= BusMessageFlags.EXTID
+ if msg.rtr:
+ mflags |= BusMessageFlags.RTR
+ if msg.fd:
+ mflags |= BusMessageFlags.FDF
+ if msg.brs:
+ mflags |= BusMessageFlags.BRS
+ if not echo:
+ mflags |= BusMessageFlags.BLOCK_TX
+
+ if msg.extended:
+ raw_id = msg.can_id & 0x1FFFFFFF
+ else:
+ raw_id = (msg.can_id & 0x7FF) #<< 16
+
+ timestamp = 0
+ dlc = len(msg.data) & 0xFF
+
+ header_struct = struct.pack(
+ " tuple[int, CanMessage]:
+ return await self._rx_queue.get()
+
+ async def receive_can_on(self, channel: int = 1) -> CanMessage:
+ q = self._ensure_channel_queue(channel)
+ return await q.get()
+
+ async def receive_can_on_timeout(
+ self,
+ channel: int = 1,
+ timeout: float = 1.0,
+ ) -> CanMessage | None:
+ try:
+ return await asyncio.wait_for(self.receive_can_on(channel), timeout=timeout)
+ except asyncio.TimeoutError:
+ return None
+
+ async def _read_loop(self) -> None:
+ try:
+ while not self._closed:
+ cmd_bytes = await self._reader.readexactly(1)
+ if not cmd_bytes:
+ break
+ cmd = cmd_bytes[0]
+
+ if need_extended_header(cmd):
+ header_rest = await self._reader.readexactly(5)
+ header = MsgCommandHeader.from_bytes(cmd_bytes + header_rest)
+ flags = header.flags
+ dsize = header.dsize
+ seq = header.sequence
+ else:
+ header_rest = await self._reader.readexactly(3)
+ header = CommandHeader.from_bytes(cmd_bytes + header_rest)
+ flags = header.flags
+ dsize = header.dsize
+ seq = header.sequence
+
+ payload = b""
+ if dsize:
+ payload = await self._reader.readexactly(dsize)
+
+ full_frame = cmd_bytes + header_rest + payload
+ self._wire_log.debug(
+ "RX cmd=0x%02X seq=%d flags=0x%04X dsize=%d :: %s",
+ cmd,
+ seq,
+ flags,
+ dsize,
+ full_frame.hex(" "),
+ )
+
+ if cmd == Command.ERROR:
+ pending = self._pending.pop(seq, None)
+ if pending is not None and not pending.future.done():
+ pending.future.set_exception(
+ CommandError(
+ f"Device ERROR for seq={seq}, "
+ f"flags=0x{flags:04X}, payload={payload.hex()}"
+ )
+ )
+ continue
+
+ if is_ack(cmd):
+ base_cmd = base_command_from_ack(cmd)
+ pending = self._pending.pop(seq, None)
+ if pending is not None and not pending.future.done():
+ pending.future.set_result((cmd, flags, payload))
+ continue
+
+ if seq in self._pending:
+ pending = self._pending.pop(seq)
+ if not pending.future.done():
+ pending.future.set_result((cmd, flags, payload))
+ continue
+
+ if cmd == Command.MESSAGE:
+ await self._handle_bus_message(flags, payload)
+ elif cmd == Command.BUS_ERROR:
+ await self._handle_bus_error(flags, payload)
+ else:
+ self._log.debug(
+ "Unhandled async command: cmd=0x%02X, flags=0x%04X, payload=%s",
+ cmd,
+ flags,
+ payload.hex(" "),
+ )
+ continue
+
+ except asyncio.IncompleteReadError:
+ self._closed = True
+ except Exception as e:
+ self._closed = True
+ self._log.exception("Read loop exception: %s", e)
+ finally:
+ for pending in list(self._pending.values()):
+ if not pending.future.done():
+ pending.future.set_exception(
+ CarBusError("Read loop terminated before response was received")
+ )
+ self._pending.clear()
+
+ async def _handle_bus_message(self, header_flags: int, payload: bytes) -> None:
+ if len(payload) < 16:
+ return
+
+ flags_val, timestamp_us, _reserved, id_raw, dlc = struct.unpack_from(" None:
+ self._log.warning(
+ "BUS_ERROR: flags=0x%04X, payload=%s", header_flags, payload.hex(" ")
+ )
+
+
+async def _example() -> None:
+ dev = await CarBusDevice.open("COM6", baudrate=115200)
+
+ info = await dev.get_device_info()
+ print("DEVICE_INFO raw:", info.raw_payload.hex(" "))
+ for p in info.params:
+ print(f"DI header=0x{p.header:08X}, data={[f'0x{w:08X}' for w in p.data]}")
+
+ await dev.open_can_channel(channel=1, nominal_bitrate=500_000, fd=False)
+
+ msg = CanMessage(can_id=0x123, data=b"\x01\x02\x03\x04", dlc=4, channel=1)
+ await dev.send_can(msg, confirm=False, echo=True)
+
+ rx = await dev.receive_can()
+ print("RX CAN:", rx)
+
+ await dev.close()
+
+
+if __name__ == "__main__":
+ logging.basicConfig(
+ level=logging.DEBUG,
+ format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
+ )
+ with contextlib.suppress(KeyboardInterrupt):
+ asyncio.run(_example())
diff --git a/carbus_async/exceptions.py b/carbus_async/exceptions.py
new file mode 100644
index 0000000..1397146
--- /dev/null
+++ b/carbus_async/exceptions.py
@@ -0,0 +1,10 @@
+class CarBusError(Exception):
+ ...
+
+
+class SyncError(CarBusError):
+ ...
+
+
+class CommandError(CarBusError):
+ ...
\ No newline at end of file
diff --git a/carbus_async/messages.py b/carbus_async/messages.py
new file mode 100644
index 0000000..a16dc47
--- /dev/null
+++ b/carbus_async/messages.py
@@ -0,0 +1,55 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from enum import Enum
+from typing import Optional
+
+from .protocol import BusMessageFlags
+
+
+class MessageDirection(str, Enum):
+ RX = "rx"
+ TX = "tx"
+ UNKNOWN = "unknown"
+
+
+@dataclass
+class CanMessage:
+ can_id: int
+ data: bytes = b""
+ extended: bool = False
+ rtr: bool = False
+ fd: bool = False
+ brs: bool = False
+ timestamp_us: int = 0
+
+ @property
+ def dlc(self) -> int:
+ return len(self.data)
+
+ @classmethod
+ def from_bus_payload(
+ cls,
+ *,
+ flags: BusMessageFlags,
+ timestamp_us: int,
+ can_id: int,
+ dlc: int,
+ data: bytes,
+ ) -> "CanMessage":
+ extended = bool(flags & BusMessageFlags.EXTID)
+ rtr = bool(flags & BusMessageFlags.RTR)
+ fd = bool(flags & BusMessageFlags.FDF)
+ brs = bool(flags & BusMessageFlags.BRS)
+
+ payload = data[:dlc]
+
+ return cls(
+ can_id=can_id,
+ data=payload,
+ extended=extended,
+ rtr=rtr,
+ fd=fd,
+ brs=brs,
+ timestamp_us=timestamp_us,
+ )
diff --git a/carbus_async/protocol.py b/carbus_async/protocol.py
new file mode 100644
index 0000000..f615349
--- /dev/null
+++ b/carbus_async/protocol.py
@@ -0,0 +1,144 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from enum import IntEnum, IntFlag
+
+
+class Command(IntEnum):
+ SYNC = 0xA5
+ DEVICE_INFO = 0x06
+ DEVICE_OPEN = 0x08
+ DEVICE_CLOSE = 0x09
+ CHANNEL_CONFIG = 0x11
+ CHANNEL_OPEN = 0x18
+
+ FILTER_SET = 0x21 # COMMAND_FILTER_SET
+ FILTER_CLEAR = 0x22 # COMMAND_FILTER_CLEAR
+
+ MESSAGE = 0x40
+ BUS_ERROR = 0x48
+
+ ERROR = 0xFF
+
+
+class FilterType(IntEnum):
+ STD_11BIT = 0x00
+ EXT_29BIT = 0x01
+
+
+EXTENDED_HEADER_COMMANDS = {Command.MESSAGE, Command.BUS_ERROR}
+
+
+class HeaderFlags(IntFlag):
+ NONE = 0x0000
+
+ CHANNEL_1 = 0x2000
+ CHANNEL_2 = 0x4000
+ CHANNEL_3 = 0x6000
+ CHANNEL_4 = 0x8000
+
+ CONFIRM_REQUIRED = 0x0001
+
+
+class BusMessageFlags(IntFlag):
+ NONE = 0x00000000
+
+ EXTID = 0x00000001
+ RTR = 0x00000002
+ FDF = 0x00000004
+ BRS = 0x00000008
+ ESI = 0x00000010
+
+ ERROR_FRAME = 0x01000000
+ RX = 0x10000000
+ TX = 0x20000000
+ BLOCK_TX = 0x30000000
+
+
+CC_MULTIWORD = 0x80000000
+
+DI_MULTIWORD = CC_MULTIWORD
+
+DI_HARDWARE_ID = 0x01000000 # тип устройства (HWIdentifiers)
+DI_FIRMWARE_VERSION = 0x02000000 # строка версии прошивки (ASCII, MULTIWORD)
+DI_DEVICE_SERIAL = 0x03000000 # серийный номер (бинарно, MULTIWORD)
+DI_FEATURES = 0x11000000 # битовая маска общих фич
+DI_CHANNEL_MAP = 0x12000000 # карта каналов (тип канала)
+DI_CHANNEL_FEATURES = 0x13000000 # опции по каналам (ALC, TERMINATOR, ...)
+DI_FILTER = 0x14000000 # настройки фильтров по каналам
+DI_GATEWAY = 0x15000000 # возможные пробросы между каналами
+DI_CHANNEL_FREQUENCY = 0x16000000 # частота работы CAN/CAN-FD модуля
+DI_ISOTP = 0x21000000 # размер ISO-TP буфера
+DI_TX_BUFFER = 0x22000000 # размер буфера трейсера (в сообщениях)
+DI_TX_TASK = 0x23000000 # количество задач периодической отправки
+
+
+@dataclass
+class CommandHeader:
+ command: int
+ sequence: int
+ flags: int
+ dsize: int
+
+ @classmethod
+ def from_bytes(cls, data: bytes) -> "CommandHeader":
+ if len(data) != 4:
+ raise ValueError("CommandHeader требует ровно 4 байта")
+ cmd, seq, flg, size = data
+ return cls(cmd, seq, flg, size)
+
+ def to_bytes(self) -> bytes:
+ return bytes(
+ (
+ self.command & 0xFF,
+ self.sequence & 0xFF,
+ self.flags & 0xFF,
+ self.dsize & 0xFF,
+ )
+ )
+
+
+@dataclass
+class MsgCommandHeader:
+ command: int
+ sequence: int
+ flags: int
+ dsize: int
+
+ @classmethod
+ def from_bytes(cls, data: bytes) -> "MsgCommandHeader":
+ if len(data) != 6:
+ raise ValueError("MsgCommandHeader требует ровно 6 байт")
+ cmd = data[0]
+ seq = data[1]
+ flags = int.from_bytes(data[2:4], "little")
+ dsize = int.from_bytes(data[4:6], "little")
+ return cls(cmd, seq, flags, dsize)
+
+ def to_bytes(self) -> bytes:
+ return (
+ bytes(
+ (
+ self.command & 0xFF,
+ self.sequence & 0xFF,
+ )
+ )
+ + self.flags.to_bytes(2, "little")
+ + self.dsize.to_bytes(2, "little")
+ )
+
+
+def is_ack(cmd: int) -> bool:
+ return 0x80 <= cmd < 0xFF
+
+
+def base_command_from_ack(cmd: int) -> int:
+ return cmd & 0x7F
+
+
+def need_extended_header(command: int) -> bool:
+ try:
+ c = Command(command)
+ except ValueError:
+ return False
+ return c in EXTENDED_HEADER_COMMANDS
diff --git a/carbus_async/tcp_bridge.py b/carbus_async/tcp_bridge.py
new file mode 100644
index 0000000..5311e8d
--- /dev/null
+++ b/carbus_async/tcp_bridge.py
@@ -0,0 +1,135 @@
+from __future__ import annotations
+
+import asyncio
+import logging
+
+import serial_asyncio
+
+log = logging.getLogger("carbus_async.tcp_bridge")
+
+
+async def _pump(
+ src: asyncio.StreamReader,
+ dst: asyncio.StreamWriter,
+ direction: str,
+ chunk_size: int = 4096,
+) -> None:
+ try:
+ while True:
+ data = await src.read(chunk_size)
+ if not data:
+ log.debug("%s: EOF", direction)
+ break
+ dst.write(data)
+ await dst.drain()
+ except asyncio.CancelledError:
+ log.debug("%s: cancelled", direction)
+ raise
+ except Exception as e:
+ log.exception("%s: exception: %s", direction, e)
+ finally:
+ try:
+ dst.close()
+ except Exception:
+ pass
+
+
+async def handle_client(
+ tcp_reader: asyncio.StreamReader,
+ tcp_writer: asyncio.StreamWriter,
+ *,
+ serial_port: str,
+ baudrate: int = 115200,
+) -> None:
+ peer = tcp_writer.get_extra_info("peername")
+ log.info("Client connected: %s", peer)
+
+ try:
+ serial_reader, serial_writer = await serial_asyncio.open_serial_connection(
+ url=serial_port,
+ baudrate=baudrate,
+ )
+ log.info("Opened local serial %s @ %d for %s", serial_port, baudrate, peer)
+
+ pump_tcp_to_serial = asyncio.create_task(
+ _pump(tcp_reader, serial_writer, f"{peer} tcp->serial")
+ )
+ pump_serial_to_tcp = asyncio.create_task(
+ _pump(serial_reader, tcp_writer, f"{peer} serial->tcp")
+ )
+
+ done, pending = await asyncio.wait(
+ [pump_tcp_to_serial, pump_serial_to_tcp],
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ for task in pending:
+ task.cancel()
+ with asyncio.SuppressCancelledError if hasattr(asyncio, "SuppressCancelledError") else nullcontext():
+ try:
+ await task
+ except asyncio.CancelledError:
+ pass
+
+ except Exception as e:
+ log.exception("Error in handle_client %s: %s", peer, e)
+ finally:
+ try:
+ tcp_writer.close()
+ except Exception:
+ pass
+ log.info("Client disconnected: %s", peer)
+
+
+async def run_tcp_bridge(
+ *,
+ listen_host: str = "0.0.0.0",
+ listen_port: int = 7000,
+ serial_port: str = "COM6",
+ baudrate: int = 115200,
+) -> None:
+ server = await asyncio.start_server(
+ lambda r, w: handle_client(r, w, serial_port=serial_port, baudrate=baudrate),
+ listen_host,
+ listen_port,
+ )
+
+ addr = ", ".join(str(sock.getsockname()) for sock in server.sockets or [])
+ log.info(
+ "TCP bridge listening on %s -> serial %s @ %d",
+ addr,
+ serial_port,
+ baudrate,
+ )
+
+ async with server:
+ await server.serve_forever()
+
+
+if __name__ == "__main__":
+ import argparse
+
+ logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
+ )
+
+ parser = argparse.ArgumentParser(
+ description="TCP bridge for CarBus device (raw byte forwarder)."
+ )
+ parser.add_argument("--host", default="0.0.0.0", help="Listen host (default 0.0.0.0)")
+ parser.add_argument("--port", type=int, default=7000, help="Listen TCP port (default 7000)")
+ parser.add_argument("--serial", required=True, help="Local serial port (e.g. COM6, /dev/ttyACM0)")
+ parser.add_argument("--baudrate", type=int, default=115200, help="Serial baudrate (default 115200)")
+
+ args = parser.parse_args()
+
+ async def _main() -> None:
+ await run_tcp_bridge(
+ listen_host=args.host,
+ listen_port=args.port,
+ serial_port=args.serial,
+ baudrate=args.baudrate,
+ )
+
+ asyncio.run(_main())
diff --git a/isotp_async/__init__.py b/isotp_async/__init__.py
new file mode 100644
index 0000000..f5d69eb
--- /dev/null
+++ b/isotp_async/__init__.py
@@ -0,0 +1,6 @@
+
+from .carbus_iface import CarBusCanTransport
+
+__all__ = [
+ "CarBusCanTransport",
+]
diff --git a/isotp_async/carbus_iface.py b/isotp_async/carbus_iface.py
new file mode 100644
index 0000000..c646118
--- /dev/null
+++ b/isotp_async/carbus_iface.py
@@ -0,0 +1,45 @@
+from __future__ import annotations
+
+import asyncio
+from typing import Optional
+
+from carbus_async.device import CarBusDevice
+from carbus_async.messages import CanMessage
+
+
+from .iface import CanTransport
+
+
+class CarBusCanTransport(CanTransport):
+ def __init__(self, dev: CarBusDevice, channel: int, rx_id: int) -> None:
+ self._dev = dev
+ self._channel = channel
+ self._rx_id = rx_id
+
+ async def send(self, msg: CanMessage) -> None:
+ await self._dev.send_can(
+ msg,
+ channel=self._channel,
+ confirm=False,
+ echo=False,
+ )
+
+ async def recv(self, timeout: Optional[float] = None) -> Optional[CanMessage]:
+ while True:
+ if timeout is None:
+ ch, msg = await self._dev.receive_can()
+ else:
+ try:
+ ch, msg = await asyncio.wait_for(
+ self._dev.receive_can(),
+ timeout=timeout,
+ )
+ except asyncio.TimeoutError:
+ return None
+
+ if ch != self._channel:
+ continue
+ if msg.can_id != self._rx_id:
+ continue
+
+ return msg
diff --git a/isotp_async/iface.py b/isotp_async/iface.py
new file mode 100644
index 0000000..ea0fdf4
--- /dev/null
+++ b/isotp_async/iface.py
@@ -0,0 +1,13 @@
+from __future__ import annotations
+
+from typing import Protocol, Optional
+from carbus_async.messages import CanMessage
+
+
+class CanTransport(Protocol):
+
+ async def send(self, msg: CanMessage) -> None:
+ ...
+
+ async def recv(self, timeout: Optional[float] = None) -> Optional[CanMessage]:
+ ...
diff --git a/isotp_async/transport.py b/isotp_async/transport.py
new file mode 100644
index 0000000..efa681f
--- /dev/null
+++ b/isotp_async/transport.py
@@ -0,0 +1,223 @@
+from __future__ import annotations
+
+import asyncio
+from dataclasses import dataclass
+from typing import Optional
+
+from carbus_async.messages import CanMessage
+from .iface import CanTransport
+
+
+def _st_min_to_seconds(st_min: int) -> float:
+
+ if 0x00 <= st_min <= 0x7F:
+ return st_min / 1000.0
+ if 0xF1 <= st_min <= 0xF9:
+ return (st_min - 0xF0) * 100e-6 # 0xF1 -> 1*100us, ...
+ return 0.0
+
+
+@dataclass
+class IsoTpChannel:
+
+ can: CanTransport
+ tx_id: int
+ rx_id: int
+
+ block_size: int = 8
+ st_min_ms: int = 0
+ fc_timeout: float = 1.0
+ cf_timeout: float = 1.0
+
+ async def send_pdu(self, data: bytes) -> None:
+ length = len(data)
+ if length <= 7:
+ # Single Frame
+ pci = bytes([0x00 | (length & 0x0F)])
+
+ if len(data) < 7:
+ for _ in range(7-len(data)):
+ data = data + b"\xaa"
+
+ msg = CanMessage(
+ can_id=self.tx_id,
+ data=pci + data,
+ )
+ await self.can.send(msg)
+ return
+
+ # --- Multi-frame: First Frame ---
+ len_hi = (length >> 8) & 0x0F
+ len_lo = length & 0xFF
+ ff_pci0 = 0x10 | len_hi # high nibble = 0x1 (FF), low=high bits len
+ ff_pci1 = len_lo
+
+ first_data = data[:6]
+ ff_payload = bytes([ff_pci0, ff_pci1]) + first_data
+ msg = CanMessage(
+ can_id=self.tx_id,
+ data=ff_payload,
+ )
+ await self.can.send(msg)
+
+ offset = 6
+
+ # --- FlowControl от peer ---
+ fc_msg = await self.can.recv(timeout=self.fc_timeout)
+ if fc_msg is None:
+ raise asyncio.TimeoutError("ISO-TP: FlowControl timeout (N_Bs)")
+
+ if not fc_msg.data:
+ raise RuntimeError("ISO-TP: empty FlowControl frame")
+
+ fc_pci = fc_msg.data[0]
+ fc_type = fc_pci >> 4 # 3 = FC
+
+ if fc_type != 0x3:
+ raise RuntimeError(f"ISO-TP: expected FlowControl, got PCI=0x{fc_pci:02X}")
+
+ fs = fc_pci & 0x0F # FS (0=CTS, 1=WT, 2=OVFLW)
+ bs = fc_msg.data[1] if len(fc_msg.data) > 1 else 0
+ st_min_raw = fc_msg.data[2] if len(fc_msg.data) > 2 else self.st_min_ms
+
+ if fs == 0x2:
+ raise RuntimeError("ISO-TP: FlowControl OVFLW from peer")
+
+ # Для простоты: поддерживаем только FS=CTS (0x0)
+ if fs != 0x0:
+ raise RuntimeError(f"ISO-TP: unsupported FlowStatus=0x{fs:02X}")
+
+ if bs == 0x00:
+ # 0 => "unlimited"
+ bs = 0
+
+ st_min = _st_min_to_seconds(st_min_raw)
+
+ # --- Consecutive Frames ---
+ seq_num = 1
+ frames_in_block = 0
+
+ while offset < length:
+ if bs != 0 and frames_in_block >= bs:
+ # BS, FC
+ fc_msg = await self.can.recv(timeout=self.fc_timeout)
+ if fc_msg is None:
+ raise asyncio.TimeoutError("ISO-TP: second FlowControl timeout (N_Bs)")
+
+ fc_pci = fc_msg.data[0]
+ fc_type = fc_pci >> 4
+ if fc_type != 0x3:
+ raise RuntimeError(f"ISO-TP: expected FlowControl, got PCI=0x{fc_pci:02X}")
+ fs = fc_pci & 0x0F
+ bs = fc_msg.data[1] if len(fc_msg.data) > 1 else 0
+ st_min_raw = fc_msg.data[2] if len(fc_msg.data) > 2 else self.st_min_ms
+
+ if fs == 0x2:
+ raise RuntimeError("ISO-TP: FlowControl OVFLW from peer")
+ if fs != 0x0:
+ raise RuntimeError(f"ISO-TP: unsupported FlowStatus=0x{fs:02X}")
+
+ if bs == 0x00:
+ bs = 0
+ st_min = _st_min_to_seconds(st_min_raw)
+ frames_in_block = 0
+
+ chunk = data[offset:offset + 7]
+ offset += len(chunk)
+
+ cf_pci = 0x20 | (seq_num & 0x0F) # high nibble = 0x2 (CF)
+ cf_payload = bytes([cf_pci]) + chunk
+
+ if len(cf_payload) < 8:
+ for _ in range(8 - len(cf_payload)):
+ cf_payload = cf_payload + b"\xaa"
+
+ msg = CanMessage(
+ can_id=self.tx_id,
+ data=cf_payload,
+ )
+ await self.can.send(msg)
+
+ seq_num = (seq_num + 1) & 0x0F
+ if seq_num == 0:
+ seq_num = 0x1
+
+ frames_in_block += 1
+
+ if st_min > 0:
+ await asyncio.sleep(st_min)
+
+ async def recv_pdu(self, timeout: float = 1.0) -> Optional[bytes]:
+
+ first = await self.can.recv(timeout=timeout)
+ if first is None:
+ return None
+
+ data = first.data
+ if not data:
+ return None
+
+ pci = data[0]
+ frame_type = pci >> 4
+
+ # --- Single Frame ---
+ if frame_type == 0x0:
+ length = pci & 0x0F
+ return data[1:1 + length]
+
+ # --- First Frame ---
+ if frame_type != 0x1:
+ return None
+
+ # FF length
+ length_hi = pci & 0x0F
+ length_lo = data[1] if len(data) > 1 else 0
+ total_length = (length_hi << 8) | length_lo
+
+ payload = bytearray(data[2:])
+
+ bs = self.block_size
+ st_raw = self.st_min_ms
+ fc_pci = 0x30 # type=3 (FC), FS=0 (CTS)
+ fc_payload = bytes([fc_pci, bs & 0xFF, st_raw & 0xFF])
+
+ if len(fc_payload) < 8:
+ fc_payload = fc_payload + b"\x00" * (8 - len(fc_payload))
+
+ fc_frame = CanMessage(
+ can_id=self.tx_id,
+ data=fc_payload,
+ )
+ await self.can.send(fc_frame)
+
+ st_min = _st_min_to_seconds(st_raw)
+
+ expected_sn = 1
+ while len(payload) < total_length:
+ cf = await self.can.recv(timeout=self.cf_timeout)
+ if cf is None:
+ raise asyncio.TimeoutError("ISO-TP: CF timeout (N_Cr)")
+
+ if not cf.data:
+ continue
+
+ cf_pci = cf.data[0]
+ cf_type = cf_pci >> 4
+ if cf_type != 0x2:
+ continue
+
+ sn = cf_pci & 0x0F
+ if sn != (expected_sn & 0x0F):
+ raise RuntimeError(
+ f"ISO-TP: wrong sequence number: got {sn}, expected {expected_sn}"
+ )
+
+ payload.extend(cf.data[1:])
+ expected_sn = (expected_sn + 1) & 0x0F
+ if expected_sn == 0:
+ expected_sn = 1
+
+ if st_min > 0:
+ await asyncio.sleep(st_min)
+
+ return bytes(payload[:total_length])
diff --git a/main.py b/main.py
new file mode 100644
index 0000000..c89fe5d
--- /dev/null
+++ b/main.py
@@ -0,0 +1,56 @@
+import asyncio
+
+from carbus_async.device import CarBusDevice
+from isotp_async.carbus_iface import CarBusCanTransport
+from isotp_async.transport import IsoTpChannel
+from uds_async.client import UdsClient
+
+# import logging
+#
+# logging.basicConfig(
+# level=logging.DEBUG,
+# format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
+# )
+#
+# logging.getLogger("carbus_async.wire").setLevel(logging.DEBUG)
+
+
+async def main():
+
+ dev = await CarBusDevice.open("COM6")
+
+ await dev.open_can_channel(
+ channel=1,
+ nominal_bitrate=500_000,
+ )
+
+ await dev.set_terminator(channel=1, enabled=True)
+
+ await asyncio.sleep(0.5)
+
+ info = await dev.get_device_info()
+ print("HW:", info.hardware_id, info.hardware_name)
+ print("FW:", info.firmware_version)
+ print("Serial #", info.serial_int)
+
+ print("Features:",
+ "gateway" if info.feature_gateway else "",
+ "isotp" if info.feature_isotp else "",
+ "txbuf" if info.feature_tx_buffer else "",
+ "txtask" if info.feature_tx_task else "",
+ )
+
+ await dev.clear_all_filters(1)
+ await dev.set_std_id_filter(1, index=0, can_id=0x7E8, mask=0x7FF)
+
+ can_tr = CarBusCanTransport(dev, channel=1, rx_id=0x7E8)
+ isotp = IsoTpChannel(can_tr, tx_id=0x7E0, rx_id=0x7E8)
+ uds = UdsClient(isotp)
+
+ vin = await uds.read_data_by_identifier(0xF190)
+ print("VIN:", vin.decode(errors="ignore"))
+
+ await dev.close()
+
+
+asyncio.run(main())
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..ceaea1f
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,27 @@
+[build-system]
+requires = ["setuptools>=64", "wheel"]
+build-backend = "setuptools.build_meta"
+
+[project]
+name = "carbus-lib"
+version = "0.1.0"
+description = "Async CAN / ISO-TP / UDS library for Car Bus Analyzer"
+readme = "README.md"
+requires-python = ">=3.10"
+authors = [{ name = "Mike" }]
+license = "MIT"
+
+dependencies = [
+ "pyserial>=3.5",
+ "pyserial-asyncio>=0.6",
+]
+
+[project.urls]
+Homepage = "https://github.com/controllerzz/car_bus_lib"
+
+[tool.setuptools]
+package-dir = {"" = "."}
+
+[tool.setuptools.packages.find]
+where = ["."]
+include = ["carbus_async", "isotp_async", "uds_async"]
diff --git a/uds_async/__init__.py b/uds_async/__init__.py
new file mode 100644
index 0000000..7111068
--- /dev/null
+++ b/uds_async/__init__.py
@@ -0,0 +1,22 @@
+from .client import UdsClient
+from .server import UdsServer
+
+try:
+ from .types import (
+ UdsRequest,
+ UdsResponse,
+ UdsPositiveResponse,
+ UdsNegativeResponse,
+ ResponseCode,
+ )
+
+ __all__ = [
+ "UdsClient",
+ "UdsServer",
+ ]
+except ImportError:
+ # если types.py нет или переименован — хотя бы клиент и сервер доступны
+ __all__ = [
+ "UdsClient",
+ "UdsServer",
+ ]
diff --git a/uds_async/client.py b/uds_async/client.py
new file mode 100644
index 0000000..491d552
--- /dev/null
+++ b/uds_async/client.py
@@ -0,0 +1,63 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Optional
+
+from isotp_async.transport import IsoTpChannel
+from .exceptions import UdsError, UdsNegativeResponse
+
+
+@dataclass
+class UdsClient:
+
+ isotp: IsoTpChannel
+ p2_timeout: float = 1.0
+
+ async def _request(self, payload: bytes) -> bytes:
+ await self.isotp.send_pdu(payload)
+ resp = await self.isotp.recv_pdu(timeout=self.p2_timeout)
+ if resp is None:
+ raise TimeoutError("UDS response timeout")
+
+ if resp[0] == 0x7F:
+ if len(resp) < 3:
+ raise UdsError("Malformed UDS negative response")
+ sid = resp[1]
+ nrc = resp[2]
+ raise UdsNegativeResponse(req_sid=sid, nrc=nrc)
+
+ return resp
+
+ async def diagnostic_session_control(self, session: int) -> bytes:
+ req = bytes([0x10, session & 0xFF])
+ resp = await self._request(req)
+ if resp[0] != 0x50:
+ raise UdsError(f"Unexpected SID 0x{resp[0]:02X} for DSC")
+ return resp
+
+ async def tester_present(self, suppress_response: bool = False) -> Optional[bytes]:
+
+ sub = 0x80 if suppress_response else 0x00
+ req = bytes([0x3E, sub])
+
+ if suppress_response:
+ try:
+ resp = await self._request(req)
+ except TimeoutError:
+ return None
+ return resp
+
+ resp = await self._request(req)
+ if resp[0] != 0x7E:
+ raise UdsError(f"Unexpected SID 0x{resp[0]:02X} for TesterPresent")
+ return resp
+
+ async def read_data_by_identifier(self, did: int) -> bytes:
+
+ req = bytes([0x22, (did >> 8) & 0xFF, did & 0xFF])
+ resp = await self._request(req)
+ if resp[0] != 0x62:
+ raise UdsError(f"Unexpected SID 0x{resp[0]:02X} for RDBI")
+ if len(resp) < 3:
+ raise UdsError("Malformed RDBI response")
+ return resp[3:]
diff --git a/uds_async/exceptions.py b/uds_async/exceptions.py
new file mode 100644
index 0000000..03a5968
--- /dev/null
+++ b/uds_async/exceptions.py
@@ -0,0 +1,16 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+
+class UdsError(Exception):
+ ...
+
+@dataclass
+class UdsNegativeResponse(UdsError):
+
+ req_sid: int
+ nrc: int
+
+ def __str__(self) -> str:
+ return f"UDS NRC 0x{self.nrc:02X} for SID 0x{self.req_sid:02X}"
diff --git a/uds_async/server.py b/uds_async/server.py
new file mode 100644
index 0000000..5cc87ad
--- /dev/null
+++ b/uds_async/server.py
@@ -0,0 +1,62 @@
+from __future__ import annotations
+
+import asyncio
+from dataclasses import dataclass, field
+from typing import Awaitable, Callable, Dict, Optional
+
+from isotp_async.transport import IsoTpChannel
+from .exceptions import UdsNegativeResponse
+
+UdsHandler = Callable[[bytes], Awaitable[Optional[bytes]]]
+
+
+@dataclass
+class UdsServer:
+
+ isotp: IsoTpChannel
+ p2_timeout: float = 1.0
+ handlers: Dict[int, UdsHandler] = field(default_factory=dict)
+
+ async def serve_forever(self) -> None:
+ while True:
+ try:
+ req = await self.isotp.recv_pdu(timeout=self.p2_timeout)
+ except asyncio.CancelledError:
+ break
+
+ if not req:
+ continue
+
+ sid = req[0]
+ handler = self.handlers.get(sid)
+ if handler is None:
+ # ServiceNotSupported
+ await self._send_negative_response(sid, 0x11)
+ continue
+
+ try:
+ resp = await handler(req)
+ if resp is None:
+ continue
+
+ await self.isotp.send_pdu(resp)
+
+ except UdsNegativeResponse as e:
+ await self._send_negative_response(e.req_sid, e.nrc)
+
+ except Exception:
+ # General programming failure (0x72)
+ await self._send_negative_response(sid, 0x72)
+
+ async def _send_negative_response(self, sid: int, nrc: int) -> None:
+ payload = bytes([0x7F, sid & 0xFF, nrc & 0xFF])
+ await self.isotp.send_pdu(payload)
+
+ def add_handler(self, sid: int, handler: UdsHandler) -> None:
+ self.handlers[sid & 0xFF] = handler
+
+ def service(self, sid: int):
+ def decorator(func: UdsHandler) -> UdsHandler:
+ self.add_handler(sid, func)
+ return func
+ return decorator