63 lines
1.8 KiB
Python
63 lines
1.8 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from dataclasses import dataclass, field
|
|
from typing import Awaitable, Callable, Dict, Optional
|
|
|
|
from isotp_async.transport import IsoTpChannel
|
|
from .exceptions import UdsNegativeResponse
|
|
|
|
UdsHandler = Callable[[bytes], Awaitable[Optional[bytes]]]
|
|
|
|
|
|
@dataclass
|
|
class UdsServer:
|
|
|
|
isotp: IsoTpChannel
|
|
p2_timeout: float = 1.0
|
|
handlers: Dict[int, UdsHandler] = field(default_factory=dict)
|
|
|
|
async def serve_forever(self) -> None:
|
|
while True:
|
|
try:
|
|
req = await self.isotp.recv_pdu(timeout=self.p2_timeout)
|
|
except asyncio.CancelledError:
|
|
break
|
|
|
|
if not req:
|
|
continue
|
|
|
|
sid = req[0]
|
|
handler = self.handlers.get(sid)
|
|
if handler is None:
|
|
# ServiceNotSupported
|
|
await self._send_negative_response(sid, 0x11)
|
|
continue
|
|
|
|
try:
|
|
resp = await handler(req)
|
|
if resp is None:
|
|
continue
|
|
|
|
await self.isotp.send_pdu(resp)
|
|
|
|
except UdsNegativeResponse as e:
|
|
await self._send_negative_response(e.req_sid, e.nrc)
|
|
|
|
except Exception:
|
|
# General programming failure (0x72)
|
|
await self._send_negative_response(sid, 0x72)
|
|
|
|
async def _send_negative_response(self, sid: int, nrc: int) -> None:
|
|
payload = bytes([0x7F, sid & 0xFF, nrc & 0xFF])
|
|
await self.isotp.send_pdu(payload)
|
|
|
|
def add_handler(self, sid: int, handler: UdsHandler) -> None:
|
|
self.handlers[sid & 0xFF] = handler
|
|
|
|
def service(self, sid: int):
|
|
def decorator(func: UdsHandler) -> UdsHandler:
|
|
self.add_handler(sid, func)
|
|
return func
|
|
return decorator
|