v0.1.5 add periodic messages

This commit is contained in:
controllerzz
2025-12-16 20:14:43 +03:00
parent e3ca999777
commit 8015b6b5f2
7 changed files with 234 additions and 11 deletions

View File

@@ -1,10 +1,7 @@
import asyncio
from carbus_async import CarBusDevice, CanMessage
from isotp_async import open_isotp
from uds_async import UdsClient
import signal
import logging
async def wait_forever() -> None:

View File

@@ -0,0 +1,48 @@
import asyncio
from carbus_async import CarBusDevice, PeriodicCanSender
async def main(is_debug=False):
dev = await CarBusDevice.open("COM6")
await dev.open_can_channel(
channel=1,
nominal_bitrate=500_000,
)
await dev.set_terminator(channel=1, enabled=True)
sender = PeriodicCanSender(dev)
def mod(tick, data):
b = bytearray(data)
b[0] = tick & 0xFF
return bytes(b)
sender.add(
"cnt",
channel=1,
can_id=0x100,
data=b"\x00" * 8,
period_s=0.5,
modify=mod)
sender.add(
"heartbeat",
channel=1,
can_id=0x123,
data=b"\x01\x02\x03\x04\x05\x06\x07\x08",
period_s=0.1,
)
try:
await asyncio.Event().wait()
finally:
await sender.stop_all()
await dev.close()
return
asyncio.run(main())

View File

@@ -4,18 +4,18 @@ from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict
from carbus_async import CanIdRouter, RoutedCarBusCanTransport
from carbus_async import CanIdRouter, RoutedCarBusCanTransport, PeriodicCanSender
from carbus_async.device import CarBusDevice
from isotp_async import open_isotp, IsoTpConnection
from uds_async import UdsServer
from uds_async.exceptions import UdsNegativeResponse, UdsError
# import logging
#
# logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
# )
import logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
TESTER_ID = 0x740
ECU_ID = 0x760