carbus_lib/carbus_async/can_router.py

72 lines
2.3 KiB
Python

import asyncio
import contextlib
from typing import Dict, Optional, Tuple
from carbus_async import CanMessage
from isotp_async.iface import CanTransport
class CanIdRouter:
def __init__(self, dev, channel: int, queue_size: int = 256):
self._dev = dev
self._channel = channel
self._queues: Dict[int, asyncio.Queue] = {}
self._queue_size = queue_size
self._task: Optional[asyncio.Task] = None
self._stop = asyncio.Event()
def get_queue(self, can_id: int) -> asyncio.Queue:
q = self._queues.get(can_id)
if q is None:
q = asyncio.Queue(maxsize=self._queue_size)
self._queues[can_id] = q
return q
async def start(self):
if self._task is None:
self._task = asyncio.create_task(self._run())
async def stop(self):
self._stop.set()
if self._task:
self._task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._task
self._task = None
async def _run(self):
while not self._stop.is_set():
ch, msg = await self._dev.receive_can()
if ch != self._channel:
continue
q = self._queues.get(msg.can_id)
if q is None:
# никто не подписан на этот CAN-ID — просто игнор
continue
# если очередь забита — можно дропать самый старый или новый
if q.full():
_ = q.get_nowait()
q.put_nowait(msg)
class RoutedCarBusCanTransport(CanTransport):
def __init__(self, dev, channel: int, rx_id: int, router: CanIdRouter) -> None:
self._dev = dev
self._channel = channel
self._rx_id = rx_id
self._router = router
self._queue = router.get_queue(rx_id)
async def send(self, msg: CanMessage) -> None:
await self._dev.send_can(msg, channel=self._channel, confirm=False, echo=False)
async def recv(self, timeout: Optional[float] = None) -> Optional[CanMessage]:
try:
if timeout is None:
return await self._queue.get()
return await asyncio.wait_for(self._queue.get(), timeout=timeout)
except asyncio.TimeoutError:
return None