This commit is contained in:
controllerzz 2025-12-11 10:27:04 +03:00
parent 957d825a3b
commit 5c7a96d215
4 changed files with 107 additions and 146 deletions

225
README.md
View File

@ -5,9 +5,11 @@
- 📡 **`carbus_async`** низкоуровневая работа с железкой (CAN/LIN, фильтры, терминаторы и т.д.) - 📡 **`carbus_async`** низкоуровневая работа с железкой (CAN/LIN, фильтры, терминаторы и т.д.)
- 📦 **`isotp_async`** ISO-TP (ISO 15765-2) поверх CAN (single + multi-frame) - 📦 **`isotp_async`** ISO-TP (ISO 15765-2) поверх CAN (single + multi-frame)
- 🩺 **`uds_async`** UDS (ISO 14229) клиент и сервер (диагностика, чтение VIN и т.п.) - 🩺 **`uds_async`** UDS (ISO 14229) клиент и сервер (диагностика, чтение VIN и т.п.)
- 🌐 **TCP-bridge** удалённое подключение к адаптеру через сеть (как будто он воткнут локально) - 🌐 **`TCP-bridge`** удалённое подключение к адаптеру через сеть (как будто он воткнут локально)
> Минимальные примеры, никаких «магических» зависимостей — всё на `asyncio`. > Python 3.11 и выше
> Никаких «магических» зависимостей — всё на `asyncio`.
> _*Тестировалось на устройствах с Протоколом Версии 22_
--- ---
@ -16,15 +18,15 @@
Пока проект в разработке, можно ставить его как editable-модуль из репозитория: Пока проект в разработке, можно ставить его как editable-модуль из репозитория:
```bash ```bash
git clone https://github.com/your_name/carbus_lib.git git clone https://github.com/controllerzz/carbus_lib.git
cd car_bus_lib cd car_bus_lib
pip install -e . pip install -e .
``` ```
# car-bus-lib # carbus-lib
Асинхронная библиотека для работы с CAN / CAN-FD, ISO-TP и UDS. Асинхронная библиотека для работы с CAN / CAN-FD, ISO-TP и UDS.
Поддерживает локальное подключение через USB CDC и удалённую работу через TCP-bridge. Поддерживает локальное подключение через USB CDC и удалённую работу через TCP-bridge.
--- ---
@ -40,45 +42,38 @@ pip install -e .
--- ---
## Установка
````bat
pip install pyserial pyserial-asyncio
git clone https://github.com/your_name/carbus_lib.git
cd car-bus-lib
pip install -e .
````
## 1. Работа с CAN ## 1. Работа с CAN
Простейший пример: открыть устройство, настроить канал и отправить / принять кадр. Простейший пример: открыть устройство, настроить канал и отправить / принять кадр.
````python ````python
import asyncio import asyncio
from carbus_async.device import CarBusDevice from carbus_async.device import CarBusDevice
from carbus_async.messages import CanMessage from carbus_async.messages import CanMessage
async def main(): async def main():
dev = await CarBusDevice.open("COM6", baudrate=115200) dev = await CarBusDevice.open("COM6", baudrate=115200)
# классический CAN 500 kbit/s # классический CAN 500 kbit/s
await dev.open_can_channel( await dev.open_can_channel(
channel=1, channel=1,
nominal_bitrate=500_000, nominal_bitrate=500_000,
fd=False, )
)
# отправка кадра 0x7E0 8 байт # включаем терминатор 120 Ом на канале 1
msg = CanMessage(can_id=0x7E0, data=b"\x02\x3E\x00\x00\x00\x00\x00\x00") await dev.set_terminator(channel=1, enabled=True)
await dev.send_can(msg, channel=1)
# приём любого сообщения # отправка кадра 0x7E0 8 байт
ch, rx = await dev.receive_can() msg = CanMessage(can_id=0x7E0, data=b"\x02\x3E\x00\x00\x00\x00\x00\x00")
print("RX:", ch, rx) await dev.send_can(msg, channel=1)
await dev.close() # приём любого сообщения
ch, rx = await dev.receive_can()
print("RX:", ch, rx)
asyncio.run(main()) await dev.close()
asyncio.run(main())
```` ````
@ -86,131 +81,107 @@ pip install -e .
Пример запроса DEVICE_INFO и настройки фильтров: Пример запроса DEVICE_INFO и настройки фильтров:
````python ````python
info = await dev.get_device_info() info = await dev.get_device_info()
print("HW:", info.hardware_name) print("HW:", info.hardware_name)
print("FW:", info.firmware_version) print("FW:", info.firmware_version)
print("Serial:", info.serial_int) print("Serial:", info.serial_int)
# очистить все фильтры на канале 1 # очистить все фильтры на канале 1
await dev.clear_all_filters(1) await dev.clear_all_filters(1)
# разрешить только ответы с ID 0x7E8 (11-битный стандартный ID) # разрешить только ответы с ID 0x7E8 (11-битный стандартный ID)
await dev.set_std_id_filter( await dev.set_std_id_filter(
channel=1, channel=1,
index=0, index=0,
can_id=0x7E8, can_id=0x7E8,
mask=0x7FF, mask=0x7FF,
) )
# включить/выключить терминатор 120 Ω # включить/выключить терминатор 120 Ω
await dev.set_terminator(channel=1, enabled=True) await dev.set_terminator(channel=1, enabled=True)
```` ````
## 3. ISO-TP (isotp_async) ## 3. ISO-TP (isotp_async)
ISO-TP канал строится поверх CarBusDevice: ISO-TP канал строится поверх CarBusDevice:
````python ````python
from isotp_async import IsoTpChannel from isotp_async.transport import IsoTpChannel
# предполагается, что dev уже открыт и канал CAN настроен # предполагается, что dev уже открыт и канал CAN настроен
isotp = IsoTpChannel( isotp = IsoTpChannel(
device=dev, device=dev,
channel=1, channel=1,
tx_id=0x7E0, # наш запрос tx_id=0x7E0, # наш запрос
rx_id=0x7E8, # ответ ЭБУ rx_id=0x7E8, # ответ ЭБУ
) )
# отправить запрос ReadDataByIdentifier F190 (VIN) # отправить запрос ReadDataByIdentifier F190 (VIN)
await isotp.send(b"\x22\xF1\x90") await isotp.send(b"\x22\xF1\x90")
# получить полный ответ (single или multi-frame) # получить полный ответ (single или multi-frame)
resp = await isotp.recv(timeout=1.0) resp = await isotp.recv(timeout=1.0)
print("ISO-TP:", resp.hex()) print("ISO-TP:", resp.hex())
```` ````
## 4. UDS Client (uds_async.client) ## 4. UDS Client (uds_async.client)
Клиент UDS использует IsoTpChannel: Клиент UDS использует IsoTpChannel:
````python ````python
from uds_async.client import UdsClient from uds_async.client import UdsClient
from isotp_async import IsoTpChannel from isotp_async.transport import IsoTpChannel
isotp = IsoTpChannel(dev, channel=1, tx_id=0x7E0, rx_id=0x7E8) isotp = IsoTpChannel(dev, channel=1, tx_id=0x7E0, rx_id=0x7E8)
uds = UdsClient(isotp_channel=isotp) uds = UdsClient(isotp_channel=isotp)
# переход в расширенную диагностическую сессию # переход в расширенную диагностическую сессию
await uds.diagnostic_session_control(0x03) await uds.diagnostic_session_control(0x03)
# чтение VIN (DID F190) # чтение VIN (DID F190)
vin_bytes = await uds.read_data_by_identifier(0xF190) vin_bytes = await uds.read_data_by_identifier(0xF190)
print("VIN:", vin_bytes.decode(errors="ignore")) print("VIN:", vin_bytes.decode(errors="ignore"))
```` ````
## 5. UDS Server (эмулятор ЭБУ)
Простой UDS-сервер, который отвечает на запрос VIN:
````python
from uds_async.server import UdsServer, UdsRequest, UdsPositiveResponse
from isotp_async import IsoTpChannel
class MyEcuServer(UdsServer): ## 5. Удалённая работа через TCP (tcp_bridge)
async def handle_read_data_by_identifier(self, req: UdsRequest):
if req.data_identifier == 0xF190:
# положительный ответ: 62 F1 90 + данные
return UdsPositiveResponse(b"\x62\xF1\x90DEMO-VIN-1234567")
# всё остальное обрабатывается базовой реализацией
return await super().handle_read_data_by_identifier(req)
async def main(): ### 5.1. Сервер (рядом с адаптером)
dev = await CarBusDevice.open("COM6", baudrate=115200)
await dev.open_can_channel(channel=1, nominal_bitrate=500_000, fd=False)
isotp = IsoTpChannel(dev, channel=1, tx_id=0x7E8, rx_id=0x7E0)
server = MyEcuServer(isotp_channel=isotp)
await server.serve_forever()
asyncio.run(main())
````
## 6. Удалённая работа через TCP (tcp_bridge)
### 6.1. Сервер (рядом с адаптером)
На машине, где физически подключён CAN-адаптер: На машине, где физически подключён CAN-адаптер:
python -m carbus_async.tcp_bridge --serial COM6 --port 7000 python.exe -m carbus_async.tcp_bridge --serial COM6 --port 7000
Адаптер открывается локально, а поверх него поднимается TCP-мост. Адаптер открывается локально, а поверх него поднимается TCP-мост.
### 6.2. Клиент (удалённая машина) ### 5.2. Клиент (удалённая машина)
На другой машине можно использовать тот же API, как с локальным COM, но через `open_tcp`: На другой машине можно использовать тот же API, как с локальным COM, но через `open_tcp`:
````python ````python
import asyncio import asyncio
from carbus_async.device import CarBusDevice from carbus_async.device import CarBusDevice
from carbus_async.messages import CanMessage from carbus_async.messages import CanMessage
async def main(): async def main():
dev = await CarBusDevice.open_tcp("192.168.1.10", 7000) dev = await CarBusDevice.open_tcp("192.168.1.10", 7000)
await dev.open_can_channel( await dev.open_can_channel(
channel=1, channel=1,
nominal_bitrate=500_000, nominal_bitrate=500_000,
fd=False, fd=False,
) )
msg = CanMessage(can_id=0x321, data=b"\x01\x02\x03\x04") msg = CanMessage(can_id=0x321, data=b"\x01\x02\x03\x04")
await dev.send_can(msg, channel=1) await dev.send_can(msg, channel=1)
ch, rx = await dev.receive_can() ch, rx = await dev.receive_can()
print("REMOTE RX:", ch, rx) print("REMOTE RX:", ch, rx)
await dev.close() await dev.close()
asyncio.run(main()) asyncio.run(main())
```` ````
## 7. Логирование ## 6. Логирование
Для отладки удобно включить подробное логирование: Для отладки удобно включить подробное логирование:
````python ````python
@ -229,27 +200,13 @@ ISO-TP канал строится поверх CarBusDevice:
--- ---
## 8. Структура проекта ## 7. Лицензия
carbus_async/ MIT.
device.py — асинхронный интерфейс к адаптеру (CAN/CAN-FD)
protocol.py — описания команд, флагов и структур протокола ДАННОЕ ПРОГРАММНОЕ ОБЕСПЕЧЕНИЕ ПРЕДОСТАВЛЯЕТСЯ «КАК ЕСТЬ», БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ, ЯВНО ВЫРАЖЕННЫХ ИЛИ ПОДРАЗУМЕВАЕМЫХ, ВКЛЮЧАЯ ГАРАНТИИ ТОВАРНОЙ ПРИГОДНОСТИ, СООТВЕТСТВИЯ ПО ЕГО КОНКРЕТНОМУ НАЗНАЧЕНИЮ И ОТСУТСТВИЯ НАРУШЕНИЙ, НО НЕ ОГРАНИЧИВАЯСЬ ИМИ. НИ В КАКОМ СЛУЧАЕ АВТОРЫ ИЛИ ПРАВООБЛАДАТЕЛИ НЕ НЕСУТ ОТВЕТСТВЕННОСТИ ПО КАКИМ-ЛИБО ИСКАМ, ЗА УЩЕРБ ИЛИ ПО ИНЫМ ТРЕБОВАНИЯМ, В ТОМ ЧИСЛЕ, ПРИ ДЕЙСТВИИ КОНТРАКТА, ДЕЛИКТЕ ИЛИ ИНОЙ СИТУАЦИИ, ВОЗНИКШИМ ИЗ-ЗА ИСПОЛЬЗОВАНИЯ ПРОГРАММНОГО ОБЕСПЕЧЕНИЯ ИЛИ ИНЫХ ДЕЙСТВИЙ С ПРОГРАММНЫМ ОБЕСПЕЧЕНИЕМ.
messages.py — модель CanMessage и вспомогательные типы
tcp_bridge.py — TCP-мост (сервер для удалённой работы)
isotp_async/
__init__.py — IsoTpChannel и вспомогательные сущности
uds_async/
client.py — UdsClient (клиент UDS)
server.py — UdsServer (сервер / эмулятор ЭБУ)
types.py — структуры запросов/ответов
---
## 9. Лицензия
MIT (можно поменять под нужды проекта).
Pull Requests и предложения по улучшению приветствуются 🚗 Pull Requests и предложения по улучшению приветствуются 🚗

View File

@ -1,6 +1,5 @@
from .carbus_iface import CarBusCanTransport from .carbus_iface import CarBusCanTransport
__all__ = [ __all__ = [
"CarBusCanTransport", "CarBusCanTransport",
] ]

20
main.py
View File

@ -5,17 +5,17 @@ from isotp_async.carbus_iface import CarBusCanTransport
from isotp_async.transport import IsoTpChannel from isotp_async.transport import IsoTpChannel
from uds_async.client import UdsClient from uds_async.client import UdsClient
# import logging import logging
#
# logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
# )
#
# logging.getLogger("carbus_async.wire").setLevel(logging.DEBUG)
async def main(): async def main(is_debug=True):
if is_debug:
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logging.getLogger("carbus_async.wire").setLevel(logging.DEBUG)
dev = await CarBusDevice.open("COM6") dev = await CarBusDevice.open("COM6")
@ -53,4 +53,4 @@ async def main():
await dev.close() await dev.close()
asyncio.run(main()) asyncio.run(main(is_debug=False))

View File

@ -13,6 +13,11 @@ try:
__all__ = [ __all__ = [
"UdsClient", "UdsClient",
"UdsServer", "UdsServer",
"UdsRequest",
"UdsResponse",
"UdsPositiveResponse",
"UdsNegativeResponse",
"ResponseCode",
] ]
except ImportError: except ImportError:
# если types.py нет или переименован — хотя бы клиент и сервер доступны # если types.py нет или переименован — хотя бы клиент и сервер доступны