From da3d41a7b77eb811404630936c0e504bec973c55 Mon Sep 17 00:00:00 2001 From: chuan Date: Wed, 13 May 2026 13:48:37 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=8C=E6=88=90=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF=E4=B8=8E=E6=A8=A1=E6=8B=9F=E5=99=A8=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/line_laser_modbus/__init__.py | 3 +- src/line_laser_modbus/cli.py | 42 ++++++++++ src/line_laser_modbus/client.py | 123 +++++++++++++++++++++++++++++ src/line_laser_modbus/simulator.py | 91 +++++++++++++++++++++ 4 files changed, 258 insertions(+), 1 deletion(-) create mode 100644 src/line_laser_modbus/cli.py create mode 100644 src/line_laser_modbus/client.py create mode 100644 src/line_laser_modbus/simulator.py diff --git a/src/line_laser_modbus/__init__.py b/src/line_laser_modbus/__init__.py index 6d2e706..f19f8a5 100644 --- a/src/line_laser_modbus/__init__.py +++ b/src/line_laser_modbus/__init__.py @@ -1,6 +1,7 @@ """Line laser Modbus protocol package.""" +from line_laser_modbus.client import LineLaserClient from line_laser_modbus.config import SerialConfig from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D -__all__ = ["DeviceStatus", "ModeCommand", "Pose6D", "SerialConfig"] +__all__ = ["DeviceStatus", "LineLaserClient", "ModeCommand", "Pose6D", "SerialConfig"] diff --git a/src/line_laser_modbus/cli.py b/src/line_laser_modbus/cli.py new file mode 100644 index 0000000..bb0fafe --- /dev/null +++ b/src/line_laser_modbus/cli.py @@ -0,0 +1,42 @@ +"""Command line helpers.""" + +from __future__ import annotations + +import argparse + +from line_laser_modbus.client import LineLaserClient +from line_laser_modbus.config import SerialConfig +from line_laser_modbus.models import ModeCommand, Pose6D +from line_laser_modbus.simulator import SimulatedModbusBackend + + +def main() -> None: + parser = argparse.ArgumentParser(prog="line-laser-modbus") + parser.add_argument("--config", help="TOML config path") + parser.add_argument("--port", default="COM1", help="Serial port when no config is provided") + parser.add_argument("--simulate", action="store_true", help="Use in-memory simulator") + subparsers = parser.add_subparsers(dest="command", required=True) + + subparsers.add_parser("read-status") + write_mode = subparsers.add_parser("write-mode") + write_mode.add_argument("mode", type=int, choices=[mode.value for mode in ModeCommand]) + subparsers.add_parser("demo") + + args = parser.parse_args() + config = SerialConfig.from_toml(args.config) if args.config else SerialConfig(port=args.port) + backend = SimulatedModbusBackend() if args.simulate else None + + with LineLaserClient(config, backend=backend) as client: + if args.command == "read-status": + print(client.read_status().name) + elif args.command == "write-mode": + client.write_mode(args.mode) + print(client.read_mode().name) + elif args.command == "demo": + client.write_mode(ModeCommand.ONLINE_TRACKING) + client.write_correction(Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0)) + print(client.read_mode().name) + + +if __name__ == "__main__": + main() diff --git a/src/line_laser_modbus/client.py b/src/line_laser_modbus/client.py new file mode 100644 index 0000000..ea44a0b --- /dev/null +++ b/src/line_laser_modbus/client.py @@ -0,0 +1,123 @@ +"""High-level Modbus client for the line laser protocol.""" + +from __future__ import annotations + +from types import TracebackType +from typing import Protocol, Self + +from pymodbus import ModbusException +from pymodbus.client import ModbusSerialClient + +from line_laser_modbus.codec import decode_pose, decode_u16, encode_pose, encode_u16 +from line_laser_modbus.config import SerialConfig +from line_laser_modbus.constants import ( + ADDR_CORRECTION, + ADDR_CURRENT_POSE, + ADDR_DEVICE_STATUS, + ADDR_MODE_COMMAND, + ADDR_TARGET_POSE, + REGISTER_COUNT_POSE, +) +from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D, ensure_mode, ensure_status + + +class ModbusBackend(Protocol): + def connect(self) -> bool: ... + + def close(self) -> None: ... + + def read_holding_registers(self, address: int, *, count: int, device_id: int): ... + + def write_registers(self, address: int, values: list[int], *, device_id: int): ... + + +class LineLaserClient: + """Protocol-specific client with typed read/write helpers.""" + + def __init__(self, config: SerialConfig, backend: ModbusBackend | None = None) -> None: + self.config = config + self._backend = backend or ModbusSerialClient( + port=config.port, + baudrate=config.baudrate, + bytesize=config.bytesize, + parity=config.parity, + stopbits=config.stopbits, + timeout=config.timeout, + retries=config.retries, + ) + + def __enter__(self) -> Self: + self.connect() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + traceback: TracebackType | None, + ) -> None: + self.close() + + def connect(self) -> None: + if not self._backend.connect(): + msg = f"Unable to connect Modbus device on {self.config.port}" + raise ConnectionError(msg) + + def close(self) -> None: + self._backend.close() + + def read_mode(self) -> ModeCommand: + return ensure_mode(self._read_word(ADDR_MODE_COMMAND)) + + def write_mode(self, mode: int | ModeCommand) -> None: + self._write_registers(ADDR_MODE_COMMAND, [encode_u16(ensure_mode(mode).value)]) + + def read_status(self) -> DeviceStatus: + return ensure_status(self._read_word(ADDR_DEVICE_STATUS)) + + def read_current_pose(self) -> Pose6D: + registers = self._read_registers(ADDR_CURRENT_POSE, REGISTER_COUNT_POSE) + return decode_pose(registers) + + def write_target_pose(self, pose: Pose6D) -> None: + self._write_registers(ADDR_TARGET_POSE, encode_pose(pose)) + + def write_correction(self, pose: Pose6D) -> None: + self._write_registers(ADDR_CORRECTION, encode_pose(pose)) + + def _read_word(self, address: int) -> int: + return decode_u16(self._read_registers(address, 1)[0]) + + def _read_registers(self, address: int, count: int) -> list[int]: + try: + response = self._backend.read_holding_registers( + address, + count=count, + device_id=self.config.slave_id, + ) + except ModbusException as exc: + msg = f"Modbus read failed at 0x{address:04X}" + raise RuntimeError(msg) from exc + + self._raise_on_error(response, f"Modbus read error at 0x{address:04X}") + return [decode_u16(register) for register in response.registers] + + def _write_registers(self, address: int, registers: list[int]) -> None: + safe_registers = [encode_u16(register) for register in registers] + try: + response = self._backend.write_registers( + address, + safe_registers, + device_id=self.config.slave_id, + ) + except ModbusException as exc: + msg = f"Modbus write failed at 0x{address:04X}" + raise RuntimeError(msg) from exc + + self._raise_on_error(response, f"Modbus write error at 0x{address:04X}") + + @staticmethod + def _raise_on_error(response, message: str) -> None: + # pymodbus 的异常响应不是 Python 异常,需要显式判断 + if response is None or response.isError(): + raise RuntimeError(message) diff --git a/src/line_laser_modbus/simulator.py b/src/line_laser_modbus/simulator.py new file mode 100644 index 0000000..b45cf97 --- /dev/null +++ b/src/line_laser_modbus/simulator.py @@ -0,0 +1,91 @@ +"""In-memory Modbus simulator used by tests and demos.""" + +from __future__ import annotations + +from dataclasses import dataclass + +from line_laser_modbus.codec import encode_pose +from line_laser_modbus.constants import ( + ADDR_CORRECTION, + ADDR_CURRENT_POSE, + ADDR_DEVICE_STATUS, + ADDR_MODE_COMMAND, + ADDR_TARGET_POSE, + SLAVE_ID, +) +from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D + + +@dataclass(slots=True) +class _ReadResponse: + registers: list[int] + + def isError(self) -> bool: + return False + + +@dataclass(slots=True) +class _WriteResponse: + address: int + count: int + + def isError(self) -> bool: + return False + + +class SimulatedModbusBackend: + """Small pymodbus-compatible backend without hardware.""" + + def __init__( + self, + *, + slave_id: int = SLAVE_ID, + mode: ModeCommand = ModeCommand.STANDBY_RESET, + status: DeviceStatus = DeviceStatus.STANDBY_READY, + current_pose: Pose6D | None = None, + ) -> None: + self.slave_id = slave_id + self.connected = False + self.registers: dict[int, int] = {} + self._seed(mode, status, current_pose or Pose6D.zeros()) + + def connect(self) -> bool: + self.connected = True + return True + + def close(self) -> None: + self.connected = False + + def read_holding_registers(self, address: int, *, count: int, device_id: int) -> _ReadResponse: + self._ensure_ready(device_id) + return _ReadResponse([self.registers.get(address + offset, 0) for offset in range(count)]) + + def write_registers(self, address: int, values: list[int], *, device_id: int) -> _WriteResponse: + self._ensure_ready(device_id) + for offset, value in enumerate(values): + self.registers[address + offset] = value + return _WriteResponse(address, len(values)) + + def target_pose(self) -> Pose6D: + return self._read_pose(ADDR_TARGET_POSE) + + def correction(self) -> Pose6D: + return self._read_pose(ADDR_CORRECTION) + + def _seed(self, mode: ModeCommand, status: DeviceStatus, pose: Pose6D) -> None: + self.registers[ADDR_MODE_COMMAND] = mode.value + self.registers[ADDR_DEVICE_STATUS] = status.value + for offset, value in enumerate(encode_pose(pose)): + self.registers[ADDR_CURRENT_POSE + offset] = value + + def _read_pose(self, address: int) -> Pose6D: + from line_laser_modbus.codec import decode_pose + + return decode_pose([self.registers.get(address + offset, 0) for offset in range(12)]) + + def _ensure_ready(self, device_id: int) -> None: + # 模拟器也校验从站地址,避免测试漏掉 Unit ID 配置 + if not self.connected: + raise ConnectionError("Simulated backend is not connected") + if device_id != self.slave_id: + raise ConnectionError(f"Unexpected slave id: {device_id}")