diff --git a/README.md b/README.md index 6dffcac..357c7e0 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ await dev.open_can_channel_custom( ) ```` -## Полученияе информации об устройстве: +## Получение информации об устройстве: ````python info = await dev.get_device_info() diff --git a/example/uds22_params.pkl b/example/uds22_params.pkl new file mode 100644 index 0000000..01b1f7a Binary files /dev/null and b/example/uds22_params.pkl differ diff --git a/example/uds_ecu_emulated.py b/example/uds_ecu_emulated.py new file mode 100644 index 0000000..d60bf05 --- /dev/null +++ b/example/uds_ecu_emulated.py @@ -0,0 +1,190 @@ +import asyncio +import pickle +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict + +from carbus_async.device import CarBusDevice +from isotp_async import open_isotp +from uds_async import UdsServer +from uds_async.exceptions import UdsNegativeResponse, UdsError + +# import logging +# +# logging.basicConfig( +# level=logging.DEBUG, +# format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +# ) + +TESTER_ID = 0x740 +ECU_ID = 0x760 + +DEFAULT_PORT = "COM6" +DEFAULT_BAUD = 115200 +DEFAULT_CAN_CH = 1 +DEFAULT_CAN_BITRATE = 500_000 + +DID_RANGE = range(0x10000) +IN_FILE = Path("uds22_params.pkl") + + +def save_dict_pickle(path: str | Path, data: dict[int, bytes]) -> None: + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("wb") as f: + pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL) + + +def load_dict_pickle(path: str | Path) -> dict[int, bytes]: + path = Path(path) + with path.open("rb") as f: + obj = pickle.load(f) + if not isinstance(obj, dict): + raise TypeError(f"Expected dict in pickle file, got {type(obj).__name__}") + return obj + + +async def setup_device( + port: str = DEFAULT_PORT, + baudrate: int = DEFAULT_BAUD, + can_channel: int = DEFAULT_CAN_CH, + nominal_bitrate: int = DEFAULT_CAN_BITRATE, +) -> CarBusDevice: + dev = await CarBusDevice.open(port, baudrate=baudrate) + + await dev.open_can_channel(channel=can_channel, nominal_bitrate=nominal_bitrate) + await dev.set_terminator(channel=can_channel, enabled=True) + + await dev.clear_all_filters(can_channel) + await dev.set_std_id_filter(channel=can_channel, index=0, can_id=0x700, mask=0x700) + + return dev + + +async def setup_uds(dev: CarBusDevice, can_channel: int = DEFAULT_CAN_CH) -> UdsServer: + ecu_isotp = await open_isotp(dev, channel=can_channel, tx_id=ECU_ID, rx_id=TESTER_ID) + ecu_uds = UdsServer(ecu_isotp) + return ecu_uds + + +NRC_INCORRECT_LENGTH = 0x13 +NRC_REQUEST_OUT_OF_RANGE = 0x31 + + +@dataclass +class UdsServiceState: + did_store: Dict[int, bytes] + + +def _require_len(req: bytes, n: int, sid: int) -> None: + if len(req) < n: + raise UdsNegativeResponse(sid, NRC_INCORRECT_LENGTH) + + +def _did_from_req(req: bytes, sid: int) -> int: + _require_len(req, 3, sid) + return (req[1] << 8) | req[2] + + +def _rdbi_positive(req: bytes, payload: bytes) -> bytes: + return bytes((0x62, req[1], req[2])) + payload + + +def _wdbi_positive(req: bytes) -> bytes: + return bytes((0x6E, req[1], req[2])) + + +async def services_init(uds, *, in_file: str | Path) -> UdsServiceState: + did_store: Dict[int, bytes] = load_dict_pickle(path=in_file) + state = UdsServiceState(did_store=did_store) + + @uds.service(0x10) + async def handle_session_control(req: bytes) -> bytes: + print(f"UDS OpenSession {hex(req[1])}") + _require_len(req, 2, 0x10) + session = req[1] + return bytes((0x50, session, 0x00, 0x32, 0x01, 0xF4)) + + @uds.service(0x11) + async def handle_session_control(req: bytes) -> bytes: + print(f"UDS ECU Reset {hex(req[1])}") + return bytes((0x51, req[1])) + + @uds.service(0x14) + async def handle_session_control(req: bytes) -> bytes: + return bytes((0x54, 0xFF, 0xFF, 0xFF, 0xFF)) + + @uds.service(0x19) + async def handle_session_control(req: bytes) -> bytes: + return bytes((0x59, 0x00)) + + @uds.service(0x22) + async def handle_rdbi(req: bytes) -> bytes: + did = _did_from_req(req, 0x22) + + payload = state.did_store.get(did) + + print(f"UDS Read Param {hex(did)}: {payload.hex()}") + + if payload is None: + raise UdsNegativeResponse(0x22, NRC_REQUEST_OUT_OF_RANGE) + + return _rdbi_positive(req, payload) + + @uds.service(0x27) + async def handle_security_access(req: bytes) -> bytes: + _require_len(req, 2, 0x27) + sub = req[1] + + is_seed_request = bool(sub & 0x01) + if is_seed_request: + return bytes((0x67, sub, 0x11, 0x22, 0x33, 0x44)) + + _require_len(req, 6, 0x27) + return bytes((0x67, sub)) + + @uds.service(0x2E) + async def handle_wdbi(req: bytes) -> bytes: + _require_len(req, 4, 0x2E) + did = (req[1] << 8) | req[2] + state.did_store[did] = bytes(req[3:]) + + print(f"UDS Write Param {hex(did)}: {bytes(req[3:]).hex()}") + + return _wdbi_positive(req) + + @uds.service(0x31) + async def handle_routine_control(req: bytes) -> bytes: + _require_len(req, 4, 0x31) + print(f"UDS Routine Control {hex(req[1])} {hex(req[2])} {hex(req[3])}") + return bytes((0x71, req[1], req[2], req[3])) + + @uds.service(0x3E) + async def handle_tester_present(req: bytes) -> bytes: + sub = req[1] if len(req) > 1 else 0x00 + return bytes((0x7E, sub)) + + return state + + +async def main() -> None: + dev: CarBusDevice | None = None + try: + dev = await setup_device() + uds_ecu= await setup_uds(dev) + await services_init(uds_ecu, in_file=IN_FILE) + + print("UDS server running") + + try: + await asyncio.gather(uds_ecu.serve_forever(),) + finally: + await dev.close() + + finally: + if dev is not None: + await dev.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/example/uds_id_scaner.py b/example/uds_id_scaner.py new file mode 100644 index 0000000..b1d9c86 --- /dev/null +++ b/example/uds_id_scaner.py @@ -0,0 +1,89 @@ +import asyncio +from dataclasses import dataclass +from typing import List, Tuple, Optional + +from tqdm import tqdm + +from carbus_async.device import CarBusDevice +from carbus_async.messages import CanMessage + + +@dataclass(frozen=True) +class CanScanConfig: + port: str = "COM6" + baudrate: int = 115200 + channel: int = 1 + nominal_bitrate: int = 500_000 + + base_id: int = 0x700 + count: int = 0x100 + timeout_s: float = 0.05 + + tester_present_sf: bytes = bytes((0x02, 0x3E, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00)) + + +async def setup_device(cfg: CanScanConfig) -> CarBusDevice: + dev = await CarBusDevice.open(cfg.port, baudrate=cfg.baudrate) + + await dev.open_can_channel(channel=cfg.channel, nominal_bitrate=cfg.nominal_bitrate) + await dev.set_terminator(channel=cfg.channel, enabled=True) + + await dev.clear_all_filters(cfg.channel) + await dev.set_std_id_filter( + channel=cfg.channel, + index=0, + can_id=0x700, + mask=0x700, + ) + + await dev.clear_receive_buffer() + return dev + + +def is_positive_tester_present_response(msg: CanMessage) -> bool: + return len(msg.data) >= 2 and msg.data[1] == 0x7E + + +async def scan_tester_present_ids( + dev: CarBusDevice, + cfg: CanScanConfig, +) -> List[Tuple[int, int]]: + msg_tx = CanMessage(can_id=0x00, data=cfg.tester_present_sf) + found: List[Tuple[int, int]] = [] + + for offset in tqdm(range(cfg.count), desc="Scan IDs"): + msg_tx.can_id = cfg.base_id + offset + + await dev.send_can(channel=cfg.channel, msg=msg_tx) + msg_rx: Optional[CanMessage] = await dev.receive_can_on_timeout(timeout=cfg.timeout_s) + + if msg_rx and is_positive_tester_present_response(msg_rx): + found.append((msg_tx.can_id, msg_rx.can_id)) + + return found + + +def print_found_pairs(pairs: List[Tuple[int, int]]) -> None: + if not pairs: + print("No responses found.") + return + + for tester_id, ecu_id in pairs: + print(f"TESTER ID: {tester_id:#05x} / ECU ID: {ecu_id:#05x}") + + +async def main() -> None: + cfg = CanScanConfig() + dev: CarBusDevice | None = None + + try: + dev = await setup_device(cfg) + pairs = await scan_tester_present_ids(dev, cfg) + print_found_pairs(pairs) + finally: + if dev is not None: + await dev.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/example/uds_service22_scaner.py b/example/uds_service22_scaner.py new file mode 100644 index 0000000..1786b7e --- /dev/null +++ b/example/uds_service22_scaner.py @@ -0,0 +1,138 @@ +import asyncio +import pickle +from pathlib import Path + +from tqdm import tqdm + +from carbus_async.device import CarBusDevice +from isotp_async import open_isotp +from uds_async import UdsClient +from uds_async.exceptions import UdsNegativeResponse, UdsError + + +TESTER_ID = 0x740 +ECU_ID = 0x760 + +DEFAULT_PORT = "COM6" +DEFAULT_BAUD = 115200 +DEFAULT_CAN_CH = 1 +DEFAULT_CAN_BITRATE = 500_000 + +DID_RANGE = range(0x10000) +OUT_FILE = Path("uds22_params.pkl") + + +def save_dict_pickle(path: str | Path, data: dict[int, bytes]) -> None: + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("wb") as f: + pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL) + + +def load_dict_pickle(path: str | Path) -> dict[int, bytes]: + path = Path(path) + with path.open("rb") as f: + obj = pickle.load(f) + if not isinstance(obj, dict): + raise TypeError(f"Expected dict in pickle file, got {type(obj).__name__}") + return obj + + +async def setup_device( + port: str = DEFAULT_PORT, + baudrate: int = DEFAULT_BAUD, + can_channel: int = DEFAULT_CAN_CH, + nominal_bitrate: int = DEFAULT_CAN_BITRATE, +) -> CarBusDevice: + dev = await CarBusDevice.open(port, baudrate=baudrate) + + await dev.open_can_channel(channel=can_channel, nominal_bitrate=nominal_bitrate) + await dev.set_terminator(channel=can_channel, enabled=True) + + await dev.clear_all_filters(can_channel) + await dev.set_std_id_filter(channel=can_channel, index=0, can_id=0x700, mask=0x700) + + return dev + + +async def setup_uds(dev: CarBusDevice, can_channel: int = DEFAULT_CAN_CH) -> UdsClient: + isotp = await open_isotp(dev, channel=can_channel, tx_id=TESTER_ID, rx_id=ECU_ID) + uds = UdsClient(isotp) + + await uds.diagnostic_session_control(session=0x03) + uds.p2_timeout = 0.05 + + return uds + + +async def read_did_safe( + uds, + did: int, + *, + retries: int = 5, + retry_delay: float = 0.05, +) -> bytes | None: + + for attempt in range(retries + 1): + try: + return await uds.read_data_by_identifier(did) + + except (UdsNegativeResponse, UdsError): + return None + + except TimeoutError: + if attempt >= retries: + return None + await asyncio.sleep(retry_delay * (2 ** attempt)) + + except asyncio.CancelledError: + raise + + +async def dump_dids(uds, dids, *, retries: int = 10) -> dict[int, bytes]: + results: dict[int, bytes] = {} + + for did in tqdm(dids, desc="UDS 0x22 scan"): + value = await read_did_safe(uds, did, retries=retries) + if value is not None: + results[did] = value + + return results + + +def bytes_to_ascii_preview(data: bytes, max_len: int = 64) -> str: + chunk = data[:max_len] + s = "".join(chr(b) if 32 <= b <= 126 else "." for b in chunk) + if len(data) > max_len: + s += "..." + return s + + +def print_results(results: dict[int, bytes]) -> None: + for did in sorted(results.keys()): + data = results[did] + print( + f"{did:#06x} | size={len(data):3d} | hex={data.hex()} | ascii='{bytes_to_ascii_preview(data)}'" + ) + + +async def main() -> None: + dev: CarBusDevice | None = None + try: + dev = await setup_device() + uds = await setup_uds(dev) + + results = await dump_dids(uds, DID_RANGE) + + save_dict_pickle(OUT_FILE, results) + print(f"\nSaved {len(results)} DIDs to: {OUT_FILE.resolve()}\n") + + print_results(results) + + finally: + if dev is not None: + await dev.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 38ed382..3fcbc67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "carbus-lib" -version = "0.1.1" +version = "0.1.2" description = "Async CAN / ISO-TP / UDS library for Car Bus Analyzer" readme = "README.md" requires-python = ">=3.10"