add open_can_channel_custom

This commit is contained in:
controllerzz 2025-12-11 15:10:58 +03:00
parent 3f1efdf970
commit 0accfa2d3e
3 changed files with 116 additions and 11 deletions

View File

@ -42,7 +42,7 @@ pip install -e .
--- ---
## 1. Работа с CAN ## Работа с CAN
Простейший пример: открыть устройство, настроить канал и отправить / принять кадр. Простейший пример: открыть устройство, настроить канал и отправить / принять кадр.
@ -60,7 +60,7 @@ async def main():
nominal_bitrate=500_000, nominal_bitrate=500_000,
) )
# включаем терминатор 120 Ом на канале 1 # включаем терминатор 120 Ω на канале 1
await dev.set_terminator(channel=1, enabled=True) await dev.set_terminator(channel=1, enabled=True)
# отправка кадра 0x7E0 8 байт # отправка кадра 0x7E0 8 байт
@ -76,8 +76,19 @@ async def main():
asyncio.run(main()) asyncio.run(main())
```` ````
Возможность настройки канала через Bit Timing
````python
# CANFD+BRS 500/2000 kbit/s
await dev.open_can_channel_custom(
channel=1,
nominal=(15, 12, 3, 1), # Prescaler, tqSeg1, tqSeg2, SyncJW
data=(6, 7, 2, 1), # Prescaler, tqSeg1, tqSeg2, SyncJW
fd=True,
brs=True,
)
````
## 2. Информация об устройстве и фильтры ## Информация об устройстве и фильтры
Пример запроса DEVICE_INFO и настройки фильтров: Пример запроса DEVICE_INFO и настройки фильтров:
````python ````python
@ -102,7 +113,7 @@ await dev.set_std_id_filter(
await dev.set_terminator(channel=1, enabled=True) await dev.set_terminator(channel=1, enabled=True)
```` ````
## 3. ISO-TP (isotp_async) ## ISO-TP (isotp_async)
ISO-TP канал строится поверх CarBusDevice: ISO-TP канал строится поверх CarBusDevice:
````python ````python
from isotp_async.transport import IsoTpChannel from isotp_async.transport import IsoTpChannel
@ -118,7 +129,7 @@ resp = await isotp.recv_pdu(timeout=30.0)
print("ISO-TP:", resp.hex()) print("ISO-TP:", resp.hex())
```` ````
## 4. UDS Client (uds_async.client) ## UDS Client (uds_async.client)
Клиент UDS использует IsoTpChannel: Клиент UDS использует IsoTpChannel:
````python ````python
@ -136,9 +147,9 @@ print("VIN:", vin.decode(errors="ignore"))
## 5. Удалённая работа через TCP (tcp_bridge) ## Удалённая работа через TCP (tcp_bridge)
### 5.1. Сервер (рядом с адаптером) ### 1. Сервер (рядом с адаптером)
На машине, где физически подключён CAN-адаптер: На машине, где физически подключён CAN-адаптер:
@ -146,7 +157,7 @@ print("VIN:", vin.decode(errors="ignore"))
Адаптер открывается локально, а поверх него поднимается TCP-мост. Адаптер открывается локально, а поверх него поднимается TCP-мост.
### 5.2. Клиент (удалённая машина) ### 2. Клиент (удалённая машина)
На другой машине можно использовать тот же API, как с локальным COM, но через `open_tcp`: На другой машине можно использовать тот же API, как с локальным COM, но через `open_tcp`:
````python ````python
@ -174,7 +185,7 @@ async def main():
asyncio.run(main()) asyncio.run(main())
```` ````
## 6. Логирование ## Логирование
Для отладки удобно включить подробное логирование: Для отладки удобно включить подробное логирование:
````python ````python
@ -193,7 +204,7 @@ asyncio.run(main())
--- ---
## 7. Лицензия ## Лицензия
MIT. MIT.

View File

@ -761,6 +761,96 @@ class CarBusDevice:
f"Unexpected CHANNEL_OPEN response: cmd=0x{cmd:02X}, flags=0x{flags:04X}" f"Unexpected CHANNEL_OPEN response: cmd=0x{cmd:02X}, flags=0x{flags:04X}"
) )
async def open_can_channel_custom(
self,
channel: int = 1,
*,
nominal: tuple[int, int, int, int] | None,
data: tuple[int, int, int, int] | None = None,
fd: bool = False,
brs: bool = False,
listen_only: bool = False,
loopback: bool = False,
retransmit: bool = False,
non_iso: bool = False,
) -> None:
def _build_bus_custom_baudrate_words(
base_cc: int,
prescaler: int,
seg1: int,
seg2: int,
sjw: int,
) -> list[int]:
packed = struct.pack("<HHHH", prescaler, seg1, seg2, sjw)
word1 = int.from_bytes(packed[0:4], "little")
word2 = int.from_bytes(packed[4:8], "little")
length_words = 2
header = base_cc | CC_MULTIWORD | ((length_words & 0xFF) << 16)
return [header, word1, word2]
# 1) CAN mode
if loopback:
mode_val = 0x02
elif listen_only:
mode_val = 0x01
else:
mode_val = 0x00
cc_can_mode = 0x11000000 | mode_val
# 2) CAN frame (classic/FD/BRS)
if not fd:
frame_mode = 0x00 # classic
else:
frame_mode = 0x02 if brs else 0x01
cc_can_frame = 0x12000000 | frame_mode
params: list[int] = [cc_can_mode, cc_can_frame]
# 3) Nominal custom bitrate (CC_BUS_SPEED_N)
if nominal is not None:
presc, seg1, seg2, sjw = nominal
header_n = 0x01000000 | CC_MULTIWORD | (2 << 16)
params.append(header_n)
b = struct.pack("<HHHH", presc, seg1, seg2, sjw) # BusCustomBaudRate
params.append(int.from_bytes(b[0:4], "little"))
params.append(int.from_bytes(b[4:8], "little"))
# 4) Data custom bitrate (CC_BUS_SPEED_D)
if fd and data is not None:
presc, seg1, seg2, sjw = data
header_d = 0x02000000 | CC_MULTIWORD | (2 << 16)
params.append(header_d)
b = struct.pack("<HHHH", presc, seg1, seg2, sjw)
params.append(int.from_bytes(b[0:4], "little"))
params.append(int.from_bytes(b[4:8], "little"))
# доп. опции
if retransmit:
params.append(0x13000001)
if non_iso:
params.append(0x14000001)
payload = b"".join(p.to_bytes(4, "little") for p in params)
header_flags = (channel & 0x0F) * 0x20
cmd, flags, resp_payload = await self._send_raw(
Command.CHANNEL_OPEN,
header_flags=header_flags,
payload=payload,
expect_response=True,
)
if not is_ack(cmd) or base_command_from_ack(cmd) != Command.CHANNEL_OPEN:
raise CommandError(
f"Unexpected CHANNEL_OPEN response: cmd=0x{cmd:02X}, flags=0x{flags:04X}"
)
async def set_can_filter( async def set_can_filter(
self, self,
channel: int, channel: int,

View File

@ -1,5 +1,6 @@
import asyncio import asyncio
from carbus_async import CanMessage
from carbus_async.device import CarBusDevice from carbus_async.device import CarBusDevice
from isotp_async.carbus_iface import CarBusCanTransport from isotp_async.carbus_iface import CarBusCanTransport
from isotp_async.transport import IsoTpChannel from isotp_async.transport import IsoTpChannel
@ -28,6 +29,9 @@ async def main(is_debug=True):
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
msg = CanMessage(can_id=0x7E0, data=b"\x02\x3E\x00\x00\x00\x00\x00\x00",fd=True, brs=True)
await dev.send_can(msg, channel=1)
info = await dev.get_device_info() info = await dev.get_device_info()
print("HW:", info.hardware_id, info.hardware_name) print("HW:", info.hardware_id, info.hardware_name)
print("FW:", info.firmware_version) print("FW:", info.firmware_version)
@ -53,4 +57,4 @@ async def main(is_debug=True):
await dev.close() await dev.close()
asyncio.run(main(is_debug=False)) asyncio.run(main(is_debug=True))