carbus_lib/isotp_async/api.py

63 lines
1.9 KiB
Python

# isotp_async/api.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
# from carbus_async import CanIdRouter, RoutedCarBusCanTransport
from isotp_async import IsoTpChannel, CarBusCanTransport, IsoTpConnection
@dataclass(frozen=True)
class IsoTpEndpoint:
tx_id: int
rx_id: int
channel: int = 1
async def open_isotp(
dev: Any,
*,
endpoint: IsoTpEndpoint | None = None,
channel: int = 1,
tx_id: int | None = None,
rx_id: int | None = None,
router: Any | None = None,
**channel_kwargs,
) -> IsoTpChannel:
if endpoint is not None:
channel = endpoint.channel
tx_id = endpoint.tx_id
rx_id = endpoint.rx_id
if tx_id is None or rx_id is None:
raise ValueError("tx_id and rx_id are required (or pass endpoint=...)")
# ЛЕНИВЫЕ ИМПОРТЫ, чтобы не было circular import:
if router is None:
from isotp_async.carbus_iface import CarBusCanTransport # <-- подстрой путь под твой проект
can_tr = CarBusCanTransport(dev, channel=channel, rx_id=rx_id)
else:
from carbus_async.can_router import RoutedCarBusCanTransport # <-- подстрой путь
can_tr = RoutedCarBusCanTransport(dev, channel=channel, rx_id=rx_id, router=router)
return IsoTpChannel(can_tr, tx_id=tx_id, rx_id=rx_id, **channel_kwargs)
@dataclass(frozen=True)
class IsoTpCanEndpoint:
rx_id: int # что слушаем (request -> ECU)
tx_id: int # куда отвечаем (ECU -> tester)
class IsoTpNetwork:
def __init__(self, base_send, router: CanIdRouter):
self._send = base_send
self._router = router
def endpoint(self, ep: IsoTpCanEndpoint) -> IsoTpConnection:
transport = RoutedCarBusCanTransport(
dev=..., channel=..., rx_id=ep.rx_id, router=self._router
)
return IsoTpConnection(transport=transport, tx_id=ep.tx_id)