carbus_lib/example/uds_id_scaner.py

89 lines
2.3 KiB
Python

import asyncio
from dataclasses import dataclass
from typing import List, Tuple, Optional
from tqdm import tqdm
from carbus_async.device import CarBusDevice
from carbus_async.messages import CanMessage
@dataclass(frozen=True)
class CanScanConfig:
port: str = "COM6"
baudrate: int = 115200
channel: int = 1
nominal_bitrate: int = 500_000
base_id: int = 0x700
count: int = 0x100
timeout_s: float = 0.05
tester_present_sf: bytes = bytes((0x02, 0x3E, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00))
async def setup_device(cfg: CanScanConfig) -> CarBusDevice:
dev = await CarBusDevice.open(cfg.port, baudrate=cfg.baudrate)
await dev.open_can_channel(channel=cfg.channel, nominal_bitrate=cfg.nominal_bitrate)
await dev.set_terminator(channel=cfg.channel, enabled=True)
await dev.clear_all_filters(cfg.channel)
await dev.set_std_id_filter(
channel=cfg.channel,
index=0,
can_id=0x700,
mask=0x700,
)
return dev
def is_positive_tester_present_response(msg: CanMessage) -> bool:
return len(msg.data) >= 2 and msg.data[1] == 0x7E
async def scan_tester_present_ids(
dev: CarBusDevice,
cfg: CanScanConfig,
) -> List[Tuple[int, int]]:
msg_tx = CanMessage(can_id=0x00, data=cfg.tester_present_sf)
found: List[Tuple[int, int]] = []
for offset in tqdm(range(cfg.count), desc="Scan IDs"):
msg_tx.can_id = cfg.base_id + offset
await dev.send_can(channel=cfg.channel, msg=msg_tx)
msg_rx: Optional[CanMessage] = await dev.receive_can_on_timeout(timeout=cfg.timeout_s)
if msg_rx and is_positive_tester_present_response(msg_rx):
found.append((msg_tx.can_id, msg_rx.can_id))
return found
def print_found_pairs(pairs: List[Tuple[int, int]]) -> None:
if not pairs:
print("No responses found.")
return
for tester_id, ecu_id in pairs:
print(f"TESTER ID: {tester_id:#05x} / ECU ID: {ecu_id:#05x}")
async def main() -> None:
cfg = CanScanConfig()
dev: CarBusDevice | None = None
try:
dev = await setup_device(cfg)
pairs = await scan_tester_present_ids(dev, cfg)
print_found_pairs(pairs)
finally:
if dev is not None:
await dev.close()
if __name__ == "__main__":
asyncio.run(main())