From 5c7a96d215a3f6309be4f6cf71e87e10d9e141d3 Mon Sep 17 00:00:00 2001 From: controllerzz <79202101363@mail.ru> Date: Thu, 11 Dec 2025 10:27:04 +0300 Subject: [PATCH] to test --- README.md | 225 ++++++++++++++++------------------------ isotp_async/__init__.py | 3 +- main.py | 20 ++-- uds_async/__init__.py | 5 + 4 files changed, 107 insertions(+), 146 deletions(-) diff --git a/README.md b/README.md index 33f37a8..e3aa60c 100644 --- a/README.md +++ b/README.md @@ -5,9 +5,11 @@ - 📡 **`carbus_async`** – низкоуровневая работа с железкой (CAN/LIN, фильтры, терминаторы и т.д.) - 📦 **`isotp_async`** – ISO-TP (ISO 15765-2) поверх CAN (single + multi-frame) - 🩺 **`uds_async`** – UDS (ISO 14229) клиент и сервер (диагностика, чтение VIN и т.п.) -- 🌐 **TCP-bridge** – удалённое подключение к адаптеру через сеть (как будто он воткнут локально) +- 🌐 **`TCP-bridge`** – удалённое подключение к адаптеру через сеть (как будто он воткнут локально) -> Минимальные примеры, никаких «магических» зависимостей — всё на `asyncio`. +> Python 3.11 и выше +> Никаких «магических» зависимостей — всё на `asyncio`. +> _*Тестировалось на устройствах с Протоколом Версии 22_ --- @@ -16,15 +18,15 @@ Пока проект в разработке, можно ставить его как editable-модуль из репозитория: ```bash -git clone https://github.com/your_name/carbus_lib.git +git clone https://github.com/controllerzz/carbus_lib.git cd car_bus_lib pip install -e . ``` -# car-bus-lib +# carbus-lib Асинхронная библиотека для работы с CAN / CAN-FD, ISO-TP и UDS. -Поддерживает локальное подключение через USB CDC и удалённую работу через TCP-bridge. +Поддерживает локальное подключение через USB CDC и удалённую работу через TCP-bridge. --- @@ -40,45 +42,38 @@ pip install -e . --- -## Установка -````bat - pip install pyserial pyserial-asyncio - - git clone https://github.com/your_name/carbus_lib.git - cd car-bus-lib - pip install -e . -```` - ## 1. Работа с CAN Простейший пример: открыть устройство, настроить канал и отправить / принять кадр. ````python - import asyncio - from carbus_async.device import CarBusDevice - from carbus_async.messages import CanMessage +import asyncio +from carbus_async.device import CarBusDevice +from carbus_async.messages import CanMessage - async def main(): - dev = await CarBusDevice.open("COM6", baudrate=115200) +async def main(): + dev = await CarBusDevice.open("COM6", baudrate=115200) - # классический CAN 500 kbit/s - await dev.open_can_channel( - channel=1, - nominal_bitrate=500_000, - fd=False, - ) + # классический CAN 500 kbit/s + await dev.open_can_channel( + channel=1, + nominal_bitrate=500_000, + ) - # отправка кадра 0x7E0 8 байт - msg = CanMessage(can_id=0x7E0, data=b"\x02\x3E\x00\x00\x00\x00\x00\x00") - await dev.send_can(msg, channel=1) + # включаем терминатор 120 Ом на канале 1 + await dev.set_terminator(channel=1, enabled=True) - # приём любого сообщения - ch, rx = await dev.receive_can() - print("RX:", ch, rx) + # отправка кадра 0x7E0 8 байт + msg = CanMessage(can_id=0x7E0, data=b"\x02\x3E\x00\x00\x00\x00\x00\x00") + await dev.send_can(msg, channel=1) - await dev.close() + # приём любого сообщения + ch, rx = await dev.receive_can() + print("RX:", ch, rx) - asyncio.run(main()) + await dev.close() + +asyncio.run(main()) ```` @@ -86,131 +81,107 @@ pip install -e . Пример запроса DEVICE_INFO и настройки фильтров: ````python - info = await dev.get_device_info() +info = await dev.get_device_info() - print("HW:", info.hardware_name) - print("FW:", info.firmware_version) - print("Serial:", info.serial_int) +print("HW:", info.hardware_name) +print("FW:", info.firmware_version) +print("Serial:", info.serial_int) - # очистить все фильтры на канале 1 - await dev.clear_all_filters(1) +# очистить все фильтры на канале 1 +await dev.clear_all_filters(1) - # разрешить только ответы с ID 0x7E8 (11-битный стандартный ID) - await dev.set_std_id_filter( - channel=1, - index=0, - can_id=0x7E8, - mask=0x7FF, - ) +# разрешить только ответы с ID 0x7E8 (11-битный стандартный ID) +await dev.set_std_id_filter( + channel=1, + index=0, + can_id=0x7E8, + mask=0x7FF, +) - # включить/выключить терминатор 120 Ω - await dev.set_terminator(channel=1, enabled=True) +# включить/выключить терминатор 120 Ω +await dev.set_terminator(channel=1, enabled=True) ```` ## 3. ISO-TP (isotp_async) ISO-TP канал строится поверх CarBusDevice: ````python - from isotp_async import IsoTpChannel +from isotp_async.transport import IsoTpChannel - # предполагается, что dev уже открыт и канал CAN настроен - isotp = IsoTpChannel( - device=dev, - channel=1, - tx_id=0x7E0, # наш запрос - rx_id=0x7E8, # ответ ЭБУ - ) +# предполагается, что dev уже открыт и канал CAN настроен +isotp = IsoTpChannel( + device=dev, + channel=1, + tx_id=0x7E0, # наш запрос + rx_id=0x7E8, # ответ ЭБУ +) - # отправить запрос ReadDataByIdentifier F190 (VIN) - await isotp.send(b"\x22\xF1\x90") +# отправить запрос ReadDataByIdentifier F190 (VIN) +await isotp.send(b"\x22\xF1\x90") - # получить полный ответ (single или multi-frame) - resp = await isotp.recv(timeout=1.0) - print("ISO-TP:", resp.hex()) +# получить полный ответ (single или multi-frame) +resp = await isotp.recv(timeout=1.0) +print("ISO-TP:", resp.hex()) ```` ## 4. UDS Client (uds_async.client) Клиент UDS использует IsoTpChannel: ````python - from uds_async.client import UdsClient - from isotp_async import IsoTpChannel +from uds_async.client import UdsClient +from isotp_async.transport import IsoTpChannel - isotp = IsoTpChannel(dev, channel=1, tx_id=0x7E0, rx_id=0x7E8) - uds = UdsClient(isotp_channel=isotp) +isotp = IsoTpChannel(dev, channel=1, tx_id=0x7E0, rx_id=0x7E8) +uds = UdsClient(isotp_channel=isotp) - # переход в расширенную диагностическую сессию - await uds.diagnostic_session_control(0x03) +# переход в расширенную диагностическую сессию +await uds.diagnostic_session_control(0x03) - # чтение VIN (DID F190) - vin_bytes = await uds.read_data_by_identifier(0xF190) - print("VIN:", vin_bytes.decode(errors="ignore")) +# чтение VIN (DID F190) +vin_bytes = await uds.read_data_by_identifier(0xF190) +print("VIN:", vin_bytes.decode(errors="ignore")) ```` -## 5. UDS Server (эмулятор ЭБУ) -Простой UDS-сервер, который отвечает на запрос VIN: -````python - from uds_async.server import UdsServer, UdsRequest, UdsPositiveResponse - from isotp_async import IsoTpChannel - class MyEcuServer(UdsServer): - async def handle_read_data_by_identifier(self, req: UdsRequest): - if req.data_identifier == 0xF190: - # положительный ответ: 62 F1 90 + данные - return UdsPositiveResponse(b"\x62\xF1\x90DEMO-VIN-1234567") - # всё остальное обрабатывается базовой реализацией - return await super().handle_read_data_by_identifier(req) +## 5. Удалённая работа через TCP (tcp_bridge) - async def main(): - dev = await CarBusDevice.open("COM6", baudrate=115200) - await dev.open_can_channel(channel=1, nominal_bitrate=500_000, fd=False) - - isotp = IsoTpChannel(dev, channel=1, tx_id=0x7E8, rx_id=0x7E0) - server = MyEcuServer(isotp_channel=isotp) - await server.serve_forever() - - asyncio.run(main()) -```` - -## 6. Удалённая работа через TCP (tcp_bridge) - -### 6.1. Сервер (рядом с адаптером) +### 5.1. Сервер (рядом с адаптером) На машине, где физически подключён CAN-адаптер: - python -m carbus_async.tcp_bridge --serial COM6 --port 7000 + python.exe -m carbus_async.tcp_bridge --serial COM6 --port 7000 Адаптер открывается локально, а поверх него поднимается TCP-мост. -### 6.2. Клиент (удалённая машина) +### 5.2. Клиент (удалённая машина) На другой машине можно использовать тот же API, как с локальным COM, но через `open_tcp`: ````python - import asyncio - from carbus_async.device import CarBusDevice - from carbus_async.messages import CanMessage +import asyncio +from carbus_async.device import CarBusDevice +from carbus_async.messages import CanMessage - async def main(): - dev = await CarBusDevice.open_tcp("192.168.1.10", 7000) +async def main(): + dev = await CarBusDevice.open_tcp("192.168.1.10", 7000) - await dev.open_can_channel( - channel=1, - nominal_bitrate=500_000, - fd=False, - ) + await dev.open_can_channel( + channel=1, + nominal_bitrate=500_000, + fd=False, + ) - msg = CanMessage(can_id=0x321, data=b"\x01\x02\x03\x04") - await dev.send_can(msg, channel=1) + msg = CanMessage(can_id=0x321, data=b"\x01\x02\x03\x04") + await dev.send_can(msg, channel=1) - ch, rx = await dev.receive_can() - print("REMOTE RX:", ch, rx) + ch, rx = await dev.receive_can() + print("REMOTE RX:", ch, rx) - await dev.close() + await dev.close() - asyncio.run(main()) +asyncio.run(main()) ```` -## 7. Логирование +## 6. Логирование Для отладки удобно включить подробное логирование: ````python @@ -229,27 +200,13 @@ ISO-TP канал строится поверх CarBusDevice: --- -## 8. Структура проекта +## 7. Лицензия - carbus_async/ - device.py — асинхронный интерфейс к адаптеру (CAN/CAN-FD) - protocol.py — описания команд, флагов и структур протокола - messages.py — модель CanMessage и вспомогательные типы - tcp_bridge.py — TCP-мост (сервер для удалённой работы) +MIT. + + ДАННОЕ ПРОГРАММНОЕ ОБЕСПЕЧЕНИЕ ПРЕДОСТАВЛЯЕТСЯ «КАК ЕСТЬ», БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ, ЯВНО ВЫРАЖЕННЫХ ИЛИ ПОДРАЗУМЕВАЕМЫХ, ВКЛЮЧАЯ ГАРАНТИИ ТОВАРНОЙ ПРИГОДНОСТИ, СООТВЕТСТВИЯ ПО ЕГО КОНКРЕТНОМУ НАЗНАЧЕНИЮ И ОТСУТСТВИЯ НАРУШЕНИЙ, НО НЕ ОГРАНИЧИВАЯСЬ ИМИ. НИ В КАКОМ СЛУЧАЕ АВТОРЫ ИЛИ ПРАВООБЛАДАТЕЛИ НЕ НЕСУТ ОТВЕТСТВЕННОСТИ ПО КАКИМ-ЛИБО ИСКАМ, ЗА УЩЕРБ ИЛИ ПО ИНЫМ ТРЕБОВАНИЯМ, В ТОМ ЧИСЛЕ, ПРИ ДЕЙСТВИИ КОНТРАКТА, ДЕЛИКТЕ ИЛИ ИНОЙ СИТУАЦИИ, ВОЗНИКШИМ ИЗ-ЗА ИСПОЛЬЗОВАНИЯ ПРОГРАММНОГО ОБЕСПЕЧЕНИЯ ИЛИ ИНЫХ ДЕЙСТВИЙ С ПРОГРАММНЫМ ОБЕСПЕЧЕНИЕМ. + - isotp_async/ - __init__.py — IsoTpChannel и вспомогательные сущности - - uds_async/ - client.py — UdsClient (клиент UDS) - server.py — UdsServer (сервер / эмулятор ЭБУ) - types.py — структуры запросов/ответов - ---- - -## 9. Лицензия - -MIT (можно поменять под нужды проекта). Pull Requests и предложения по улучшению приветствуются 🚗 diff --git a/isotp_async/__init__.py b/isotp_async/__init__.py index f5d69eb..0c5a3c8 100644 --- a/isotp_async/__init__.py +++ b/isotp_async/__init__.py @@ -1,6 +1,5 @@ - from .carbus_iface import CarBusCanTransport __all__ = [ "CarBusCanTransport", -] +] \ No newline at end of file diff --git a/main.py b/main.py index c89fe5d..fa9aa66 100644 --- a/main.py +++ b/main.py @@ -5,17 +5,17 @@ from isotp_async.carbus_iface import CarBusCanTransport from isotp_async.transport import IsoTpChannel from uds_async.client import UdsClient -# import logging -# -# logging.basicConfig( -# level=logging.DEBUG, -# format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", -# ) -# -# logging.getLogger("carbus_async.wire").setLevel(logging.DEBUG) +import logging -async def main(): +async def main(is_debug=True): + + if is_debug: + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) + logging.getLogger("carbus_async.wire").setLevel(logging.DEBUG) dev = await CarBusDevice.open("COM6") @@ -53,4 +53,4 @@ async def main(): await dev.close() -asyncio.run(main()) +asyncio.run(main(is_debug=False)) diff --git a/uds_async/__init__.py b/uds_async/__init__.py index 7111068..e5ebe76 100644 --- a/uds_async/__init__.py +++ b/uds_async/__init__.py @@ -13,6 +13,11 @@ try: __all__ = [ "UdsClient", "UdsServer", + "UdsRequest", + "UdsResponse", + "UdsPositiveResponse", + "UdsNegativeResponse", + "ResponseCode", ] except ImportError: # если types.py нет или переименован — хотя бы клиент и сервер доступны