From 010b2cc1a249ce6e4724f149a0c5f54bfd773cfb Mon Sep 17 00:00:00 2001 From: controllerzz <79202101363@mail.ru> Date: Fri, 12 Dec 2025 18:59:57 +0300 Subject: [PATCH] add CanTiming --- README.md | 47 ++++++++++++++++++++++++++++++----------- carbus_analyser.py | 0 carbus_async/device.py | 39 +++++++++++++++++----------------- isotp_async/__init__.py | 2 ++ main.py | 22 +++++++++++++++++-- 5 files changed, 77 insertions(+), 33 deletions(-) delete mode 100644 carbus_analyser.py diff --git a/README.md b/README.md index 736268e..28073f2 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# car-bus-lib (async CAN / ISO-TP / UDS stack) +# carbus-lib (async CAN / ISO-TP / UDS stack) Асинхронная библиотека на Python для работы с CAN-адаптером **CAN-Hacker / Car Bus Analyzer**: @@ -7,8 +7,9 @@ - 🩺 **`uds_async`** – UDS (ISO 14229) клиент и сервер (диагностика, чтение VIN и т.п.) - 🌐 **`TCP-bridge`** – удалённое подключение к адаптеру через сеть (как будто он воткнут локально) -> Python 3.11 и выше +> Python 3.10 и выше > Никаких «магических» зависимостей — всё на `asyncio`. +> Поддерживаемые интерфейсы: https://canhacker.ru/shop/ > _*Тестировалось на устройствах с Протоколом Версии 22_ --- @@ -81,21 +82,29 @@ async def main(): asyncio.run(main()) ```` -Возможность настройки канала через Bit Timing +## Настройка канала через Bit Timing ````python # CANFD+BRS 500/2000 kbit/s await dev.open_can_channel_custom( channel=1, - nominal=(15, 12, 3, 1), # Prescaler, tqSeg1, tqSeg2, SyncJW - data=(6, 7, 2, 1), # Prescaler, tqSeg1, tqSeg2, SyncJW + nominal_timing=CanTiming( + prescaler=15, + tq_seg1=12, + tq_seg2=3, + sjw=1 + ), + data_timing=CanTiming( + prescaler=6, + tq_seg1=7, + tq_seg2=2, + sjw=1 + ), fd=True, brs=True, ) ```` -## Информация об устройстве и фильтры - -Пример запроса DEVICE_INFO и настройки фильтров: +## Полученияе информации об устройстве: ````python info = await dev.get_device_info() @@ -103,6 +112,16 @@ print("HW:", info.hardware_name) print("FW:", info.firmware_version) print("Serial:", info.serial_int) +print("Features:", + "gateway" if info.feature_gateway else "", + "isotp" if info.feature_isotp else "", + "txbuf" if info.feature_tx_buffer else "", + "txtask" if info.feature_tx_task else "", + ) +```` + +## Пример настройки фильтров: +````python # очистить все фильтры на канале 1 await dev.clear_all_filters(1) @@ -113,15 +132,19 @@ await dev.set_std_id_filter( can_id=0x7E8, mask=0x7FF, ) +```` -# включить/выключить терминатор 120 Ω +## Управление терминатором 120 Ω: +````python await dev.set_terminator(channel=1, enabled=True) +await dev.set_terminator(channel=2, enabled=False) + ```` ## ISO-TP (isotp_async) ISO-TP канал строится поверх CarBusDevice: ````python -from isotp_async.transport import IsoTpChannel +from isotp_async import IsoTpChannel can_tr = CarBusCanTransport(dev, channel=1, rx_id=0x7E8) isotp = IsoTpChannel(can_tr, tx_id=0x7E0, rx_id=0x7E8) @@ -130,7 +153,7 @@ isotp = IsoTpChannel(can_tr, tx_id=0x7E0, rx_id=0x7E8) await isotp.send_pdu(b"\x22\xF1\x90") # получить полный ответ (single или multi-frame) -resp = await isotp.recv_pdu(timeout=30.0) +resp = await isotp.recv_pdu(timeout=5.0) print("ISO-TP:", resp.hex()) ```` @@ -139,7 +162,7 @@ print("ISO-TP:", resp.hex()) Клиент UDS использует IsoTpChannel: ````python from isotp_async import CarBusCanTransport -from isotp_async.transport import IsoTpChannel +from isotp_async import IsoTpChannel from uds_async import UdsClient can_tr = CarBusCanTransport(dev, channel=1, rx_id=0x7E8) diff --git a/carbus_analyser.py b/carbus_analyser.py deleted file mode 100644 index e69de29..0000000 diff --git a/carbus_async/device.py b/carbus_async/device.py index ec8d6fb..9b138c7 100644 --- a/carbus_async/device.py +++ b/carbus_async/device.py @@ -13,19 +13,6 @@ from .exceptions import CarBusError, SyncError, CommandError from .messages import CanMessage -from .protocol import ( - Command, - CommandHeader, - MsgCommandHeader, - HeaderFlags, - BusMessageFlags, - CC_MULTIWORD, - is_ack, - base_command_from_ack, - need_extended_header, -) - - NOMINAL_BITRATE_INDEX: Dict[int, int] = { 10_000: 0, 20_000: 1, @@ -54,6 +41,14 @@ DATA_BITRATE_INDEX: Dict[int, int] = { } +@dataclass(frozen=True) +class CanTiming: + prescaler: int + tq_seg1: int + tq_seg2: int + sjw: int + + @dataclass class DeviceInfoParam: header: int @@ -765,8 +760,8 @@ class CarBusDevice: self, channel: int = 1, *, - nominal: tuple[int, int, int, int] | None, - data: tuple[int, int, int, int] | None = None, + nominal_timing: CanTiming | None, + data_timing: CanTiming | None = None, fd: bool = False, brs: bool = False, listen_only: bool = False, @@ -810,8 +805,11 @@ class CarBusDevice: params: list[int] = [cc_can_mode, cc_can_frame] # 3) Nominal custom bitrate (CC_BUS_SPEED_N) - if nominal is not None: - presc, seg1, seg2, sjw = nominal + if nominal_timing is not None: + presc=nominal_timing.prescaler + seg1=nominal_timing.tq_seg1 + seg2=nominal_timing.tq_seg2 + sjw=nominal_timing.sjw header_n = 0x01000000 | CC_MULTIWORD | (2 << 16) params.append(header_n) @@ -820,8 +818,11 @@ class CarBusDevice: params.append(int.from_bytes(b[4:8], "little")) # 4) Data custom bitrate (CC_BUS_SPEED_D) - if fd and data is not None: - presc, seg1, seg2, sjw = data + if fd and data_timing is not None: + presc=data_timing.prescaler + seg1=data_timing.tq_seg1 + seg2=data_timing.tq_seg2 + sjw=data_timing.sjw header_d = 0x02000000 | CC_MULTIWORD | (2 << 16) params.append(header_d) diff --git a/isotp_async/__init__.py b/isotp_async/__init__.py index 0c5a3c8..4dfa22e 100644 --- a/isotp_async/__init__.py +++ b/isotp_async/__init__.py @@ -1,5 +1,7 @@ from .carbus_iface import CarBusCanTransport +from .transport import IsoTpChannel __all__ = [ "CarBusCanTransport", + "IsoTpChannel", ] \ No newline at end of file diff --git a/main.py b/main.py index 414716c..31a0c10 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,9 @@ import asyncio from carbus_async import CanMessage -from carbus_async.device import CarBusDevice +from carbus_async.device import CarBusDevice, CanTiming from isotp_async.carbus_iface import CarBusCanTransport -from isotp_async.transport import IsoTpChannel +from isotp_async import IsoTpChannel from uds_async.client import UdsClient import logging @@ -25,6 +25,24 @@ async def main(is_debug=False): nominal_bitrate=500_000, ) + # await dev.open_can_channel_custom( + # channel=1, + # nominal_timing=CanTiming( + # prescaler=15, + # tq_seg1=12, + # tq_seg2=3, + # sjw=1 + # ), + # data_timing=CanTiming( + # prescaler=6, + # tq_seg1=7, + # tq_seg2=2, + # sjw=1 + # ), + # fd=True, + # brs=True, + # ) + await dev.set_terminator(channel=1, enabled=True) await asyncio.sleep(0.5)