diff --git a/README.md b/README.md index 657af05..4e7b3e0 100644 --- a/README.md +++ b/README.md @@ -124,7 +124,7 @@ print("Features:", ## Пример настройки фильтров: 11 bit фильтры имеют index от 0 до 27 включительно, -29 bit фитры имеют index от 28 до 35 включительно +29 bit фильтры имеют index от 28 до 35 включительно ````python # очистить все фильтры на канале 1 await dev.clear_all_filters(1) @@ -143,11 +143,24 @@ await dev.set_std_id_filter( ````python await dev.set_terminator(channel=1, enabled=True) await dev.set_terminator(channel=2, enabled=False) +```` +Проверка наличия внутреннего терминатора у девайса +````python +if await dev.has_terminator(): + await dev.set_terminator(channel=1, enabled=True) + print("Device has an internal terminator") +else: + print("Device does not have an internal terminator") +```` + +Включаем терминатор на канале 1, если это поддерживает девайс +````python +await dev.ensure_terminator(channel=1, enabled=True) ```` ## Отправка периодичных сообщений: -С константными данными +Отправка сообщений с статичными данными и периодом 100мс ````python from carbus_async import PeriodicCanSender @@ -161,7 +174,7 @@ sender.add( ) ```` -С модификацией данных +Отправка сообщений с модификацией данных и периодом 500мс ````python from carbus_async import PeriodicCanSender diff --git a/carbus_async/__init__.py b/carbus_async/__init__.py index 9d434fe..2bb3e29 100644 --- a/carbus_async/__init__.py +++ b/carbus_async/__init__.py @@ -3,6 +3,7 @@ from .messages import CanMessage, MessageDirection from .exceptions import CarBusError, CommandError, SyncError from .can_router import CanIdRouter, RoutedCarBusCanTransport from .periodic import PeriodicCanSender, PeriodicJob +from .remote.client import open_remote_device __all__ = [ "CarBusDevice", @@ -15,4 +16,5 @@ __all__ = [ "RoutedCarBusCanTransport", "PeriodicCanSender", "PeriodicJob", + "open_remote_device", ] diff --git a/carbus_async/device.py b/carbus_async/device.py index 65097d8..2f1b889 100644 --- a/carbus_async/device.py +++ b/carbus_async/device.py @@ -472,6 +472,39 @@ class CarBusDevice: async def open_tcp(cls, host: str, port: int, **kwargs) -> "CarBusDevice": return await cls.open(f"socket://{host}:{port}", **kwargs) + @classmethod + async def open_stream( + cls, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + *, + logical_port: str = "stream://remote", + baudrate: int = 115200, + loop: Optional[asyncio.AbstractEventLoop] = None, + use_can: bool = True, + use_lin: bool = False, + ) -> "CarBusDevice": + self = cls(port=logical_port, baudrate=baudrate, loop=loop) + + self._log = logging.getLogger(f"carbus_async.device.{logical_port}") + self._wire_log = logging.getLogger(f"carbus_async.wire.{logical_port}") + + self._reader = reader + self._writer = writer + + self._rx_queue = asyncio.Queue() + self._rx_channel_queues = {} + self._pending = {} + self._seq_counter = 0 + self._reader_task = None + self._closed = False + + await self.sync() + self._start_reader() + await self.device_open(use_can=use_can, use_lin=use_lin) + return self + + async def _connect(self) -> None: loop = self.loop or asyncio.get_running_loop() @@ -721,6 +754,22 @@ class CarBusDevice: return DeviceInfo.from_payload(payload) + async def has_terminator(self, channel=1) -> bool: + info = await self.get_device_info() + feat = info.channel_features.get(channel) + return bool(feat and feat.get("terminator", False)) + + async def ensure_terminator(self, channel: int = 1, enabled: bool = True): + if not await self.has_terminator(channel): + return False + await self.set_terminator(channel, enabled=enabled) + + async def get_serial(self) -> bool: + info = await self.get_device_info() + return info.serial_int; + + + async def device_open(self, *, use_can: bool = True, use_lin: bool = False) -> None: if use_can and use_lin: mode_val = 0x00 # FULL diff --git a/carbus_async/remote/__init__.py b/carbus_async/remote/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/carbus_async/remote/agent.py b/carbus_async/remote/agent.py new file mode 100644 index 0000000..fe6ca26 --- /dev/null +++ b/carbus_async/remote/agent.py @@ -0,0 +1,207 @@ +from __future__ import annotations + +import argparse +import asyncio +import json +import logging +from typing import Optional + +import serial_asyncio + +log = logging.getLogger("carbus_remote.agent") + + +def parse_hostport(s: str) -> tuple[str, int]: + if ":" not in s: + raise ValueError("server must be host:port") + host, p = s.rsplit(":", 1) + return host, int(p) + + +async def pipe_bidirectional_streams( + a_reader: asyncio.StreamReader, + a_writer: asyncio.StreamWriter, + b_reader: asyncio.StreamReader, + b_writer: asyncio.StreamWriter, + *, + bufsize: int = 4096, +) -> None: + + async def pump(src: asyncio.StreamReader, dst: asyncio.StreamWriter) -> None: + try: + while True: + data = await src.read(bufsize) + if not data: + break + dst.write(data) + await dst.drain() + except Exception: + pass + finally: + try: + dst.close() + except Exception: + pass + + t1 = asyncio.create_task(pump(a_reader, b_writer), name="pipe_a_to_b") + t2 = asyncio.create_task(pump(b_reader, a_writer), name="pipe_b_to_a") + + await asyncio.wait({t1, t2}, return_when=asyncio.FIRST_COMPLETED) + + for w in (a_writer, b_writer): + try: + w.close() + except Exception: + pass + + await asyncio.gather(t1, t2, return_exceptions=True) + for w in (a_writer, b_writer): + try: + await w.wait_closed() + except Exception: + pass + + +async def open_serial_with_retry(port: str, baudrate: int, *, attempts: int = 3) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: + last: Optional[BaseException] = None + for i in range(attempts): + try: + return await serial_asyncio.open_serial_connection(url=port, baudrate=baudrate) + except Exception as e: + last = e + await asyncio.sleep(0.25 + 0.25 * i) + assert last is not None + raise last + + +async def agent_run( + *, + port: str, + baudrate: int, + server: str, + serial: str, + password: str, +) -> None: + server_host, server_port = parse_hostport(server) + + session_lock = asyncio.Lock() + + async def open_data_session(session: str) -> None: + async with session_lock: + net_r: Optional[asyncio.StreamReader] = None + net_w: Optional[asyncio.StreamWriter] = None + dev_r: Optional[asyncio.StreamReader] = None + dev_w: Optional[asyncio.StreamWriter] = None + + try: + log.info("Opening data session %s", session) + + net_r, net_w = await asyncio.wait_for( + asyncio.open_connection(server_host, server_port), + timeout=5.0, + ) + + net_w.write((json.dumps({"role": "agent_data", "session": session}) + "\n").encode("utf-8")) + await net_w.drain() + + line = await asyncio.wait_for(net_r.readline(), timeout=5.0) + resp = json.loads(line.decode("utf-8", errors="ignore") or "{}") + if not resp.get("ok"): + log.error("Data session refused %s: %s", session, resp) + return + + log.info("Opening COM %s @ %d for session %s", port, baudrate, session) + dev_r, dev_w = await open_serial_with_retry(port, baudrate, attempts=3) + + log.info("Session %s accepted. Piping bytes (COM <-> relay).", session) + await pipe_bidirectional_streams(dev_r, dev_w, net_r, net_w) + + log.info("Session %s finished.", session) + + except Exception: + log.exception("Data session %s crashed", session) + + finally: + if net_w is not None: + try: + net_w.close() + await net_w.wait_closed() + except Exception: + pass + + if dev_w is not None: + try: + dev_w.close() + await dev_w.wait_closed() + except Exception: + pass + + log.info("Connecting to relay %s:%d (control)", server_host, server_port) + ctrl_reader, ctrl_writer = await asyncio.open_connection(server_host, server_port) + + ctrl_writer.write((json.dumps({"role": "agent", "serial": serial, "password": password}) + "\n").encode("utf-8")) + await ctrl_writer.drain() + + line = await asyncio.wait_for(ctrl_reader.readline(), timeout=10.0) + resp = json.loads(line.decode("utf-8", errors="ignore") or "{}") + if not resp.get("ok"): + ctrl_writer.close() + try: + await ctrl_writer.wait_closed() + except Exception: + pass + raise RuntimeError(f"relay refused agent: {resp}") + + log.info("Agent registered OK. Waiting for sessions... (serial=%s)", serial) + + try: + while True: + line = await ctrl_reader.readline() + if not line: + log.warning("Control connection closed by relay.") + break + + try: + msg = json.loads(line.decode("utf-8", errors="ignore") or "{}") + except Exception: + continue + + if msg.get("cmd") == "open_session": + session = str(msg.get("session", "")).strip() + if session: + asyncio.create_task(open_data_session(session), name=f"agent_data_session_{session}") + + finally: + try: + ctrl_writer.close() + await ctrl_writer.wait_closed() + except Exception: + pass + + log.info("Agent stopped.") + + +async def main() -> None: + ap = argparse.ArgumentParser() + ap.add_argument("--port", required=True, help="COM port, e.g. COM6") + ap.add_argument("--baudrate", type=int, default=115200) + ap.add_argument("--server", required=True, help="relay host:port, e.g. 1.2.3.4:9000") + ap.add_argument("--serial", required=True, help="device serial number (string/int)") + ap.add_argument("--password", required=True, help="shared password for this serial") + args = ap.parse_args() + + await agent_run( + port=args.port, + baudrate=args.baudrate, + server=args.server, + serial=str(args.serial), + password=str(args.password), + ) + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) + asyncio.run(main()) diff --git a/carbus_async/remote/client.py b/carbus_async/remote/client.py new file mode 100644 index 0000000..3a7bdcb --- /dev/null +++ b/carbus_async/remote/client.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import asyncio +import json +from typing import Optional + +from carbus_async.device import CarBusDevice + + +async def open_remote_device( + host: str, + port: int, + *, + serial: str, + password: str, + use_can: bool = True, + use_lin: bool = False, +) -> CarBusDevice: + reader, writer = await asyncio.open_connection(host, port) + + hello = {"role": "client", "serial": str(serial), "password": str(password)} + writer.write((json.dumps(hello) + "\n").encode("utf-8")) + await writer.drain() + + line = await asyncio.wait_for(reader.readline(), timeout=10.0) + resp = json.loads(line.decode("utf-8", errors="ignore") or "{}") + if not resp.get("ok"): + writer.close() + try: + await writer.wait_closed() + except Exception: + pass + raise RuntimeError(f"relay refused client: {resp}") + + dev = await CarBusDevice.open_stream( + reader, + writer, + logical_port=f"remote://{host}:{port}/{serial}", + use_can=use_can, + use_lin=use_lin, + ) + return dev diff --git a/carbus_async/remote/server.py b/carbus_async/remote/server.py new file mode 100644 index 0000000..e145359 --- /dev/null +++ b/carbus_async/remote/server.py @@ -0,0 +1,237 @@ +from __future__ import annotations + +import asyncio +import json +import logging +import secrets +from dataclasses import dataclass, field +from typing import Dict, Optional + +log = logging.getLogger("carbus_remote.server") + + +@dataclass +class AgentControl: + serial: str + password: str + reader: asyncio.StreamReader + writer: asyncio.StreamWriter + + +@dataclass +class PendingSession: + serial: str + client_reader: asyncio.StreamReader + client_writer: asyncio.StreamWriter + ready: asyncio.Event = field(default_factory=asyncio.Event) # agent_data подключился + done: asyncio.Future = field(default_factory=lambda: asyncio.get_event_loop().create_future()) # pipe завершён + + +class RelayServer: + def __init__(self) -> None: + self._agents: Dict[str, AgentControl] = {} + self._pending: Dict[str, PendingSession] = {} + self._lock = asyncio.Lock() + + async def handle_conn(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + peer = writer.get_extra_info("peername") + try: + line = await asyncio.wait_for(reader.readline(), timeout=10.0) + if not line: + return + + try: + hello = json.loads(line.decode("utf-8", errors="ignore").strip()) + except Exception: + await self._send_json(writer, {"ok": False, "error": "bad_handshake"}) + return + + role = hello.get("role") + + if role == "agent": + await self._handle_agent_control(reader, writer, hello, peer) + return + + if role == "client": + await self._handle_client(reader, writer, hello, peer) + return + + if role == "agent_data": + await self._handle_agent_data(reader, writer, hello, peer) + return + + await self._send_json(writer, {"ok": False, "error": "bad_role"}) + + except asyncio.TimeoutError: + await self._send_json(writer, {"ok": False, "error": "handshake_timeout"}) + except Exception: + log.exception("Connection error from %s", peer) + finally: + if getattr(writer, "_carbus_piped", False): + return + if not writer.is_closing(): + writer.close() + try: + await writer.wait_closed() + except Exception: + pass + + async def _handle_agent_control(self, reader, writer, hello, peer) -> None: + serial = str(hello.get("serial", "")).strip() + password = str(hello.get("password", "")).strip() + if not serial or not password: + await self._send_json(writer, {"ok": False, "error": "bad_handshake"}) + return + + async with self._lock: + old = self._agents.get(serial) + if old: + try: + old.writer.close() + except Exception: + pass + self._agents[serial] = AgentControl(serial, password, reader, writer) + + await self._send_json(writer, {"ok": True}) + log.info("Agent online serial=%s from %s", serial, peer) + + try: + while True: + line = await reader.readline() + if not line: + break + finally: + async with self._lock: + cur = self._agents.get(serial) + if cur and cur.writer is writer: + self._agents.pop(serial, None) + log.info("Agent offline serial=%s", serial) + + async def _handle_client(self, reader, writer, hello, peer) -> None: + serial = str(hello.get("serial", "")).strip() + password = str(hello.get("password", "")).strip() + if not serial or not password: + await self._send_json(writer, {"ok": False, "error": "bad_handshake"}) + return + + async with self._lock: + agent = self._agents.get(serial) + if agent is None: + await self._send_json(writer, {"ok": False, "error": "agent_offline"}) + return + if agent.password != password: + await self._send_json(writer, {"ok": False, "error": "unauthorized"}) + return + + session = secrets.token_hex(8) + ps = PendingSession(serial=serial, client_reader=reader, client_writer=writer) + self._pending[session] = ps + + try: + agent.writer.write((json.dumps({"cmd": "open_session", "session": session}) + "\n").encode("utf-8")) + await agent.writer.drain() + except Exception: + self._pending.pop(session, None) + await self._send_json(writer, {"ok": False, "error": "agent_write_failed"}) + return + + await self._send_json(writer, {"ok": True, "session": session}) + log.info("Client accepted serial=%s session=%s from %s", serial, session, peer) + + try: + await asyncio.wait_for(ps.ready.wait(), timeout=10.0) + except asyncio.TimeoutError: + async with self._lock: + self._pending.pop(session, None) + await self._send_json(writer, {"ok": False, "error": "agent_data_timeout"}) + return + + try: + await ps.done + finally: + async with self._lock: + self._pending.pop(session, None) + + async def _handle_agent_data(self, reader, writer, hello, peer) -> None: + session = str(hello.get("session", "")).strip() + if not session: + await self._send_json(writer, {"ok": False, "error": "bad_handshake"}) + return + + async with self._lock: + ps = self._pending.get(session) + + if ps is None: + await self._send_json(writer, {"ok": False, "error": "unknown_session"}) + return + + await self._send_json(writer, {"ok": True}) + log.info("Agent data connected session=%s from %s (pairing)", session, peer) + + setattr(ps.client_writer, "_carbus_piped", True) + setattr(writer, "_carbus_piped", True) + + ps.ready.set() + + try: + await self._pipe(ps.client_reader, ps.client_writer, reader, writer) + finally: + if not ps.done.done(): + ps.done.set_result(None) + + async def _pipe(self, a_reader, a_writer, b_reader, b_writer, bufsize: int = 4096) -> None: + async def pump(src, dst): + try: + while True: + data = await src.read(bufsize) + if not data: + break + dst.write(data) + await dst.drain() + except Exception: + pass + finally: + try: + dst.close() + except Exception: + pass + + t1 = asyncio.create_task(pump(a_reader, b_writer)) + t2 = asyncio.create_task(pump(b_reader, a_writer)) + await asyncio.wait({t1, t2}, return_when=asyncio.FIRST_COMPLETED) + + for t in (t1, t2): + t.cancel() + try: + await t + except Exception: + pass + + for w in (a_writer, b_writer): + try: + w.close() + except Exception: + pass + for w in (a_writer, b_writer): + try: + await w.wait_closed() + except Exception: + pass + + @staticmethod + async def _send_json(writer, obj: dict) -> None: + writer.write((json.dumps(obj) + "\n").encode("utf-8")) + await writer.drain() + + +async def main(host: str = "0.0.0.0", port: int = 9000) -> None: + rs = RelayServer() + srv = await asyncio.start_server(rs.handle_conn, host, port) + log.info("Relay server listening on %s", ", ".join(str(s.getsockname()) for s in srv.sockets or [])) + async with srv: + await srv.serve_forever() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") + asyncio.run(main()) diff --git a/example/remote_relay.py b/example/remote_relay.py new file mode 100644 index 0000000..bf5c462 --- /dev/null +++ b/example/remote_relay.py @@ -0,0 +1,39 @@ +import asyncio + +from carbus_async import CarBusDevice, open_remote_device +from isotp_async import open_isotp +from uds_async import UdsClient + +import logging + + +async def main(is_debug=False): + + if is_debug: + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) + logging.getLogger("carbus_async.wire").setLevel(logging.DEBUG) + + dev = await open_remote_device("127.0.0.1", 9000, serial=5957, password="1234") + + print(f"Devise SN: {await dev.get_serial()}") + + await dev.open_can_channel( + channel=1, + nominal_bitrate=500_000, + ) + + await dev.ensure_terminator(channel=1, enabled=True) + + isotp = await open_isotp(dev, channel=1, tx_id=0x7E0, rx_id=0x7E8) + uds = UdsClient(isotp) + + vin = await uds.read_data_by_identifier(0xF190) + print("VIN:", vin.decode(errors="ignore")) + + await dev.close() + + +asyncio.run(main(is_debug=True)) diff --git a/main.py b/main.py index c4faa6e..18ba8ce 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ from uds_async import UdsClient import logging + async def main(is_debug=False): if is_debug: @@ -17,12 +18,14 @@ async def main(is_debug=False): dev = await CarBusDevice.open("COM6") + print(f"Devise SN: {await dev.get_serial()}") + await dev.open_can_channel( channel=1, nominal_bitrate=500_000, ) - await dev.set_terminator(channel=1, enabled=True) + await dev.ensure_terminator(channel=1, enabled=True) isotp = await open_isotp(dev, channel=1, tx_id=0x7E0, rx_id=0x7E8) uds = UdsClient(isotp) diff --git a/pyproject.toml b/pyproject.toml index eac3a0e..3005c2c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "carbus-lib" -version = "0.1.5" +version = "0.1.6" description = "Async CAN / ISO-TP / UDS library for Car Bus Analyzer" readme = "README.md" requires-python = ">=3.10"