add CanTiming

This commit is contained in:
controllerzz 2025-12-12 18:59:57 +03:00
parent 0a592db55e
commit 010b2cc1a2
5 changed files with 77 additions and 33 deletions

View File

@ -1,4 +1,4 @@
# car-bus-lib (async CAN / ISO-TP / UDS stack)
# carbus-lib (async CAN / ISO-TP / UDS stack)
Асинхронная библиотека на Python для работы с CAN-адаптером **CAN-Hacker / Car Bus Analyzer**:
@ -7,8 +7,9 @@
- 🩺 **`uds_async`** UDS (ISO 14229) клиент и сервер (диагностика, чтение VIN и т.п.)
- 🌐 **`TCP-bridge`** удалённое подключение к адаптеру через сеть (как будто он воткнут локально)
> Python 3.11 и выше
> Python 3.10 и выше
> Никаких «магических» зависимостей — всё на `asyncio`.
> Поддерживаемые интерфейсы: https://canhacker.ru/shop/
> _*Тестировалось на устройствах с Протоколом Версии 22_
---
@ -81,21 +82,29 @@ async def main():
asyncio.run(main())
````
Возможность настройки канала через Bit Timing
## Настройка канала через Bit Timing
````python
# CANFD+BRS 500/2000 kbit/s
await dev.open_can_channel_custom(
channel=1,
nominal=(15, 12, 3, 1), # Prescaler, tqSeg1, tqSeg2, SyncJW
data=(6, 7, 2, 1), # Prescaler, tqSeg1, tqSeg2, SyncJW
nominal_timing=CanTiming(
prescaler=15,
tq_seg1=12,
tq_seg2=3,
sjw=1
),
data_timing=CanTiming(
prescaler=6,
tq_seg1=7,
tq_seg2=2,
sjw=1
),
fd=True,
brs=True,
)
````
## Информация об устройстве и фильтры
Пример запроса DEVICE_INFO и настройки фильтров:
## Полученияе информации об устройстве:
````python
info = await dev.get_device_info()
@ -103,6 +112,16 @@ print("HW:", info.hardware_name)
print("FW:", info.firmware_version)
print("Serial:", info.serial_int)
print("Features:",
"gateway" if info.feature_gateway else "",
"isotp" if info.feature_isotp else "",
"txbuf" if info.feature_tx_buffer else "",
"txtask" if info.feature_tx_task else "",
)
````
## Пример настройки фильтров:
````python
# очистить все фильтры на канале 1
await dev.clear_all_filters(1)
@ -113,15 +132,19 @@ await dev.set_std_id_filter(
can_id=0x7E8,
mask=0x7FF,
)
````
# включить/выключить терминатор 120 Ω
## Управление терминатором 120 Ω:
````python
await dev.set_terminator(channel=1, enabled=True)
await dev.set_terminator(channel=2, enabled=False)
````
## ISO-TP (isotp_async)
ISO-TP канал строится поверх CarBusDevice:
````python
from isotp_async.transport import IsoTpChannel
from isotp_async import IsoTpChannel
can_tr = CarBusCanTransport(dev, channel=1, rx_id=0x7E8)
isotp = IsoTpChannel(can_tr, tx_id=0x7E0, rx_id=0x7E8)
@ -130,7 +153,7 @@ isotp = IsoTpChannel(can_tr, tx_id=0x7E0, rx_id=0x7E8)
await isotp.send_pdu(b"\x22\xF1\x90")
# получить полный ответ (single или multi-frame)
resp = await isotp.recv_pdu(timeout=30.0)
resp = await isotp.recv_pdu(timeout=5.0)
print("ISO-TP:", resp.hex())
````
@ -139,7 +162,7 @@ print("ISO-TP:", resp.hex())
Клиент UDS использует IsoTpChannel:
````python
from isotp_async import CarBusCanTransport
from isotp_async.transport import IsoTpChannel
from isotp_async import IsoTpChannel
from uds_async import UdsClient
can_tr = CarBusCanTransport(dev, channel=1, rx_id=0x7E8)

View File

View File

@ -13,19 +13,6 @@ from .exceptions import CarBusError, SyncError, CommandError
from .messages import CanMessage
from .protocol import (
Command,
CommandHeader,
MsgCommandHeader,
HeaderFlags,
BusMessageFlags,
CC_MULTIWORD,
is_ack,
base_command_from_ack,
need_extended_header,
)
NOMINAL_BITRATE_INDEX: Dict[int, int] = {
10_000: 0,
20_000: 1,
@ -54,6 +41,14 @@ DATA_BITRATE_INDEX: Dict[int, int] = {
}
@dataclass(frozen=True)
class CanTiming:
prescaler: int
tq_seg1: int
tq_seg2: int
sjw: int
@dataclass
class DeviceInfoParam:
header: int
@ -765,8 +760,8 @@ class CarBusDevice:
self,
channel: int = 1,
*,
nominal: tuple[int, int, int, int] | None,
data: tuple[int, int, int, int] | None = None,
nominal_timing: CanTiming | None,
data_timing: CanTiming | None = None,
fd: bool = False,
brs: bool = False,
listen_only: bool = False,
@ -810,8 +805,11 @@ class CarBusDevice:
params: list[int] = [cc_can_mode, cc_can_frame]
# 3) Nominal custom bitrate (CC_BUS_SPEED_N)
if nominal is not None:
presc, seg1, seg2, sjw = nominal
if nominal_timing is not None:
presc=nominal_timing.prescaler
seg1=nominal_timing.tq_seg1
seg2=nominal_timing.tq_seg2
sjw=nominal_timing.sjw
header_n = 0x01000000 | CC_MULTIWORD | (2 << 16)
params.append(header_n)
@ -820,8 +818,11 @@ class CarBusDevice:
params.append(int.from_bytes(b[4:8], "little"))
# 4) Data custom bitrate (CC_BUS_SPEED_D)
if fd and data is not None:
presc, seg1, seg2, sjw = data
if fd and data_timing is not None:
presc=data_timing.prescaler
seg1=data_timing.tq_seg1
seg2=data_timing.tq_seg2
sjw=data_timing.sjw
header_d = 0x02000000 | CC_MULTIWORD | (2 << 16)
params.append(header_d)

View File

@ -1,5 +1,7 @@
from .carbus_iface import CarBusCanTransport
from .transport import IsoTpChannel
__all__ = [
"CarBusCanTransport",
"IsoTpChannel",
]

22
main.py
View File

@ -1,9 +1,9 @@
import asyncio
from carbus_async import CanMessage
from carbus_async.device import CarBusDevice
from carbus_async.device import CarBusDevice, CanTiming
from isotp_async.carbus_iface import CarBusCanTransport
from isotp_async.transport import IsoTpChannel
from isotp_async import IsoTpChannel
from uds_async.client import UdsClient
import logging
@ -25,6 +25,24 @@ async def main(is_debug=False):
nominal_bitrate=500_000,
)
# await dev.open_can_channel_custom(
# channel=1,
# nominal_timing=CanTiming(
# prescaler=15,
# tq_seg1=12,
# tq_seg2=3,
# sjw=1
# ),
# data_timing=CanTiming(
# prescaler=6,
# tq_seg1=7,
# tq_seg2=2,
# sjw=1
# ),
# fd=True,
# brs=True,
# )
await dev.set_terminator(channel=1, enabled=True)
await asyncio.sleep(0.5)