diff --git a/README.md b/README.md index 28073f2..6dffcac 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ > Python 3.10 и выше > Никаких «магических» зависимостей — всё на `asyncio`. -> Поддерживаемые интерфейсы: https://canhacker.ru/shop/ +> Поддерживаемые интерфейсы: https://canhacker.ru/shop/ > _*Тестировалось на устройствах с Протоколом Версии 22_ --- @@ -144,10 +144,9 @@ await dev.set_terminator(channel=2, enabled=False) ## ISO-TP (isotp_async) ISO-TP канал строится поверх CarBusDevice: ````python -from isotp_async import IsoTpChannel +from isotp_async import open_isotp -can_tr = CarBusCanTransport(dev, channel=1, rx_id=0x7E8) -isotp = IsoTpChannel(can_tr, tx_id=0x7E0, rx_id=0x7E8) +isotp = await open_isotp(dev, channel=1, tx_id=0x7E0, rx_id=0x7E8) # отправить запрос ReadDataByIdentifier F190 (VIN) await isotp.send_pdu(b"\x22\xF1\x90") @@ -161,12 +160,10 @@ print("ISO-TP:", resp.hex()) Клиент UDS использует IsoTpChannel: ````python -from isotp_async import CarBusCanTransport -from isotp_async import IsoTpChannel +from isotp_async import open_isotp from uds_async import UdsClient -can_tr = CarBusCanTransport(dev, channel=1, rx_id=0x7E8) -isotp = IsoTpChannel(can_tr, tx_id=0x7E0, rx_id=0x7E8) +isotp = await open_isotp(dev, channel=1, tx_id=0x7E0, rx_id=0x7E8) uds = UdsClient(isotp) vin = await uds.read_data_by_identifier(0xF190) @@ -174,7 +171,6 @@ print("VIN:", vin.decode(errors="ignore")) ```` - ## Удалённая работа через TCP (tcp_bridge) ### 1. Сервер (рядом с адаптером) diff --git a/isotp_async/__init__.py b/isotp_async/__init__.py index 4dfa22e..a510d5a 100644 --- a/isotp_async/__init__.py +++ b/isotp_async/__init__.py @@ -1,7 +1,9 @@ from .carbus_iface import CarBusCanTransport from .transport import IsoTpChannel +from .api import open_isotp __all__ = [ "CarBusCanTransport", "IsoTpChannel", + "open_isotp", ] \ No newline at end of file diff --git a/isotp_async/api.py b/isotp_async/api.py new file mode 100644 index 0000000..4fdbf57 --- /dev/null +++ b/isotp_async/api.py @@ -0,0 +1,29 @@ +# isotp_async/api.py +from __future__ import annotations +from dataclasses import dataclass +from typing import Any + +from isotp_async import IsoTpChannel, CarBusCanTransport + + +@dataclass(frozen=True) +class IsoTpEndpoint: + tx_id: int + rx_id: int + channel: int = 1 + + +async def open_isotp(dev: Any, *, endpoint: IsoTpEndpoint | None = None, + channel: int = 1, tx_id: int | None = None, rx_id: int | None = None, + **channel_kwargs) -> IsoTpChannel: + + if endpoint is not None: + channel = endpoint.channel + tx_id = endpoint.tx_id + rx_id = endpoint.rx_id + + if tx_id is None or rx_id is None: + raise ValueError("tx_id and rx_id are required (or pass endpoint=...)") + + can_tr = CarBusCanTransport(dev, channel=channel, rx_id=rx_id) + return IsoTpChannel(can_tr, tx_id=tx_id, rx_id=rx_id, **channel_kwargs) diff --git a/main.py b/main.py index 31a0c10..24770cc 100644 --- a/main.py +++ b/main.py @@ -1,13 +1,15 @@ import asyncio -from carbus_async import CanMessage -from carbus_async.device import CarBusDevice, CanTiming -from isotp_async.carbus_iface import CarBusCanTransport -from isotp_async import IsoTpChannel -from uds_async.client import UdsClient +from carbus_async import CarBusDevice +from isotp_async import open_isotp +from uds_async import UdsClient import logging +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) async def main(is_debug=False): @@ -25,48 +27,9 @@ async def main(is_debug=False): nominal_bitrate=500_000, ) - # await dev.open_can_channel_custom( - # channel=1, - # nominal_timing=CanTiming( - # prescaler=15, - # tq_seg1=12, - # tq_seg2=3, - # sjw=1 - # ), - # data_timing=CanTiming( - # prescaler=6, - # tq_seg1=7, - # tq_seg2=2, - # sjw=1 - # ), - # fd=True, - # brs=True, - # ) - await dev.set_terminator(channel=1, enabled=True) - await asyncio.sleep(0.5) - - msg = CanMessage(can_id=0x7E0, data=b"\x02\x3E\x00\x00\x00\x00\x00\x00",fd=True, brs=True) - await dev.send_can(msg, channel=1) - - info = await dev.get_device_info() - print("HW:", info.hardware_id, info.hardware_name) - print("FW:", info.firmware_version) - print("Serial #", info.serial_int) - - print("Features:", - "gateway" if info.feature_gateway else "", - "isotp" if info.feature_isotp else "", - "txbuf" if info.feature_tx_buffer else "", - "txtask" if info.feature_tx_task else "", - ) - - await dev.clear_all_filters(1) - await dev.set_std_id_filter(1, index=0, can_id=0x7E8, mask=0x7FF) - - can_tr = CarBusCanTransport(dev, channel=1, rx_id=0x7E8) - isotp = IsoTpChannel(can_tr, tx_id=0x7E0, rx_id=0x7E8) + isotp = await open_isotp(dev, channel=1, tx_id=0x7E0, rx_id=0x7E8) uds = UdsClient(isotp) vin = await uds.read_data_by_identifier(0xF190)