carbus_lib/example/can_message_hook.py

49 lines
1.0 KiB
Python

import asyncio
from carbus_async import CarBusDevice, CanMessage
import signal
async def wait_forever() -> None:
stop = asyncio.Event()
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, stop.set)
except NotImplementedError:
pass
try:
await stop.wait()
finally:
return
async def main(is_debug=False):
dev = await CarBusDevice.open("COM6")
await dev.open_can_channel(
channel=1,
nominal_bitrate=500_000,
)
await dev.set_terminator(channel=1, enabled=True)
@dev.on_can_id(0x7E0)
async def hook(ch: int, msg: CanMessage):
print("RX", ch, hex(msg.can_id), bytes(msg.data).hex())
@dev.on_can_match(can_id=0x7E0, value=b"\x02\x3E\x00")
async def tp(ch: int, msg: CanMessage):
print("TesterPresent!")
print("Running. Press Ctrl+C to stop.")
try:
await wait_forever()
finally:
await dev.close()
asyncio.run(main())