From 2f642b3cf709dc85aea862453b55032e5462ae67 Mon Sep 17 00:00:00 2001 From: chuan Date: Wed, 13 May 2026 14:27:58 +0800 Subject: [PATCH] =?UTF-8?q?docs:=20=E8=A1=A5=E5=85=85=E4=B8=AD=E6=96=87?= =?UTF-8?q?=E6=B3=A8=E9=87=8A=E5=92=8C=E4=BD=BF=E7=94=A8=E7=A4=BA=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 15 ++++++++ examples/README.md | 22 +++++++++++ examples/real_serial_read_status.py | 27 +++++++++++++ examples/simulated_basic.py | 43 +++++++++++++++++++++ examples/simulated_polling.py | 38 ++++++++++++++++++ src/line_laser_modbus/__init__.py | 2 +- src/line_laser_modbus/cli.py | 4 +- src/line_laser_modbus/client.py | 60 +++++++++++++++++++++++++---- src/line_laser_modbus/codec.py | 22 ++++++++++- src/line_laser_modbus/config.py | 14 ++++++- src/line_laser_modbus/constants.py | 2 +- src/line_laser_modbus/models.py | 24 +++++++++--- src/line_laser_modbus/runner.py | 18 ++++++++- src/line_laser_modbus/simulator.py | 34 ++++++++++++++-- 14 files changed, 301 insertions(+), 24 deletions(-) create mode 100644 examples/README.md create mode 100644 examples/real_serial_read_status.py create mode 100644 examples/simulated_basic.py create mode 100644 examples/simulated_polling.py diff --git a/README.md b/README.md index d844687..92c0496 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,21 @@ python -m uv run line-laser-modbus --simulate poll-once --target 1 2 3 0 1 2 python -m uv run line-laser-modbus --simulate demo ``` +## 示例代码 + +示例代码在 `examples/` 目录下,优先阅读模拟示例了解库的调用方式: + +```powershell +python -m uv run python examples/simulated_basic.py +python -m uv run python examples/simulated_polling.py +``` + +连接真实串口前先修改 `config.toml`,再运行: + +```powershell +python -m uv run python examples/real_serial_read_status.py +``` + ## TOML 配置 运行前按实际串口修改 `config.toml`。默认会读取当前目录下的 `config.toml`: diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..c4b3245 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,22 @@ +# 使用示例 + +这些示例用于说明 `line_laser_modbus` 作为库时的常见用法 + +运行前先同步依赖 + +```powershell +python -m uv sync --default-index https://mirrors.ustc.edu.cn/pypi/simple +``` + +无硬件环境优先运行模拟示例 + +```powershell +python -m uv run python examples/simulated_basic.py +python -m uv run python examples/simulated_polling.py +``` + +连接真实串口前先修改根目录 `config.toml` + +```powershell +python -m uv run python examples/real_serial_read_status.py +``` diff --git a/examples/real_serial_read_status.py b/examples/real_serial_read_status.py new file mode 100644 index 0000000..54d1a95 --- /dev/null +++ b/examples/real_serial_read_status.py @@ -0,0 +1,27 @@ +"""真实串口读取状态示例""" + +from pathlib import Path + +from line_laser_modbus import AppConfig, LineLaserClient + + +def main() -> None: + """从 config.toml 读取串口配置并读取设备状态""" + + # 运行这个示例前先修改项目根目录的 config.toml + # port 需要改成现场电脑实际看到的串口名 + # Windows 常见格式是 COM1 或 COM3 + config = AppConfig.from_toml(Path("config.toml")) + + # 真实串口路径不注入 backend + # LineLaserClient 会自动创建 pymodbus 串口客户端 + with LineLaserClient(config.serial) as client: + status = client.read_status() + pose = client.read_current_pose() + + print("设备状态", status.name) + print("当前位姿", pose) + + +if __name__ == "__main__": + main() diff --git a/examples/simulated_basic.py b/examples/simulated_basic.py new file mode 100644 index 0000000..e5602c2 --- /dev/null +++ b/examples/simulated_basic.py @@ -0,0 +1,43 @@ +"""模拟环境中的基础读写示例""" + +from line_laser_modbus import LineLaserClient, ModeCommand, Pose6D, SerialConfig +from line_laser_modbus.simulator import SimulatedModbusBackend + + +def main() -> None: + """演示不连接硬件时如何使用客户端""" + + # 模拟后端实现了客户端需要的最小 Modbus 接口 + # 这让开发和测试可以在没有串口和控制器的机器上运行 + backend = SimulatedModbusBackend() + + # 端口名在模拟模式下不会真正打开 + # 这里保留 SIM 只是让配置含义更清晰 + config = SerialConfig(port="SIM") + + # 客户端支持上下文管理器 + # 进入 with 时连接后端退出 with 时关闭后端 + with LineLaserClient(config, backend=backend) as client: + print("初始状态", client.read_status().name) + + # write_mode 会直接写寄存器不做状态机校验 + # 如果业务需要遵守协议切换规则可以使用 switch_mode + client.write_mode(ModeCommand.ONLINE_TRACKING) + print("当前模式", client.read_mode().name) + + # 目标示教位姿和纠偏量都使用 Pose6D 表达 + # 字段顺序固定为 X Y Z A B C + target_pose = Pose6D(100.0, 20.0, 30.0, 0.0, 15.0, 90.0) + correction = Pose6D(1.0, 0.0, -0.5, 0.0, 0.0, 2.0) + + client.write_target_pose(target_pose) + client.write_correction(correction) + + # 模拟后端提供便捷读取方法 + # 真实设备运行时通常由控制器消费这些寄存器 + print("模拟目标位姿", backend.target_pose()) + print("模拟纠偏量", backend.correction()) + + +if __name__ == "__main__": + main() diff --git a/examples/simulated_polling.py b/examples/simulated_polling.py new file mode 100644 index 0000000..f27244b --- /dev/null +++ b/examples/simulated_polling.py @@ -0,0 +1,38 @@ +"""模拟环境中的单周期轮询示例""" + +from line_laser_modbus import DeviceStatus, LineLaserClient, ModeCommand, Pose6D, SerialConfig +from line_laser_modbus.runner import PollingRunner, pose_delta +from line_laser_modbus.simulator import SimulatedModbusBackend + + +def main() -> None: + """演示轮询运行器如何读取快照并写入纠偏量""" + + # 先让模拟控制器处于在线跟踪模式 + # 只有 ONLINE_TRACKING 模式下 PollingRunner 才会写入纠偏寄存器 + backend = SimulatedModbusBackend( + mode=ModeCommand.ONLINE_TRACKING, + status=DeviceStatus.TRACKING_OK, + current_pose=Pose6D(10.0, 20.0, 30.0, 0.0, 1.0, 2.0), + ) + + # 这里假设目标轨迹点是下面这个位姿 + # pose_delta 会生成一个函数用于计算 目标位姿减当前位姿 + target_pose = Pose6D(11.0, 22.0, 33.0, 0.0, 1.5, 1.0) + correction_provider = pose_delta(target_pose) + + with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client: + runner = PollingRunner(client, correction_provider=correction_provider) + + # run_once 只执行一个周期 + # 测试和无硬件演示建议用它避免进入无限循环 + snapshot = runner.run_once() + + print("读取模式", snapshot.mode.name) + print("读取状态", snapshot.status.name) + print("读取当前位姿", snapshot.pose) + print("写入纠偏量", backend.correction()) + + +if __name__ == "__main__": + main() diff --git a/src/line_laser_modbus/__init__.py b/src/line_laser_modbus/__init__.py index e65228f..cb7da1f 100644 --- a/src/line_laser_modbus/__init__.py +++ b/src/line_laser_modbus/__init__.py @@ -1,4 +1,4 @@ -"""Line laser Modbus protocol package.""" +"""线激光 Modbus 协议包""" from line_laser_modbus.client import LineLaserClient from line_laser_modbus.config import AppConfig, PollingConfig, SerialConfig diff --git a/src/line_laser_modbus/cli.py b/src/line_laser_modbus/cli.py index cd95589..0e4c96e 100644 --- a/src/line_laser_modbus/cli.py +++ b/src/line_laser_modbus/cli.py @@ -1,4 +1,4 @@ -"""Command line helpers.""" +"""命令行入口""" from __future__ import annotations @@ -13,6 +13,8 @@ from line_laser_modbus.simulator import SimulatedModbusBackend def main() -> None: + """解析命令行参数并执行对应协议动作""" + parser = argparse.ArgumentParser(prog="line-laser-modbus") parser.add_argument("--config", default="config.toml", help="TOML config path") parser.add_argument("--simulate", action="store_true", help="Use in-memory simulator") diff --git a/src/line_laser_modbus/client.py b/src/line_laser_modbus/client.py index 7e4320f..24d85b7 100644 --- a/src/line_laser_modbus/client.py +++ b/src/line_laser_modbus/client.py @@ -1,4 +1,4 @@ -"""High-level Modbus client for the line laser protocol.""" +"""线激光 Modbus 协议客户端""" from __future__ import annotations @@ -30,19 +30,31 @@ from line_laser_modbus.models import ( class ModbusBackend(Protocol): - def connect(self) -> bool: ... + """协议客户端需要的最小后端接口""" - def close(self) -> None: ... + def connect(self) -> bool: + """打开后端连接""" + ... - def read_holding_registers(self, address: int, *, count: int, device_id: int): ... + def close(self) -> None: + """关闭后端连接""" + ... - def write_registers(self, address: int, values: list[int], *, device_id: int): ... + def read_holding_registers(self, address: int, *, count: int, device_id: int): + """从设备读取保持寄存器""" + ... + + def write_registers(self, address: int, values: list[int], *, device_id: int): + """向设备写入保持寄存器""" + ... class LineLaserClient: - """Protocol-specific client with typed read/write helpers.""" + """面向协议语义的类型化读写客户端""" def __init__(self, config: SerialConfig, backend: ModbusBackend | None = None) -> None: + """创建真实串口客户端或注入模拟后端""" + self.config = config self._backend = backend or ModbusSerialClient( port=config.port, @@ -55,6 +67,8 @@ class LineLaserClient: ) def __enter__(self) -> Self: + """进入上下文管理器时连接后端""" + self.connect() return self @@ -64,49 +78,75 @@ class LineLaserClient: exc: BaseException | None, traceback: TracebackType | None, ) -> None: + """退出上下文管理器时关闭后端""" + self.close() def connect(self) -> None: + """连接配置中的 Modbus 设备""" + if not self._backend.connect(): msg = f"Unable to connect Modbus device on {self.config.port}" raise ConnectionError(msg) def close(self) -> None: + """关闭当前 Modbus 连接""" + self._backend.close() def read_mode(self) -> ModeCommand: + """读取 0xD000 模式命令字""" + return ensure_mode(self._read_word(ADDR_MODE_COMMAND)) def write_mode(self, mode: int | ModeCommand) -> None: + """不做状态机校验直接写入模式命令字""" + self._write_registers(ADDR_MODE_COMMAND, [encode_u16(ensure_mode(mode).value)]) def switch_mode(self, mode: int | ModeCommand) -> None: + """按协议状态机规则校验后切换模式""" + current = self.read_mode() status = self.read_status() target = validate_mode_switch(current, mode, status) self.write_mode(target) def read_status(self) -> DeviceStatus: + """读取 0xD001 设备状态字""" + return ensure_status(self._read_word(ADDR_DEVICE_STATUS)) def read_current_pose(self) -> Pose6D: + """读取控制器当前 XYZABC 位姿""" + registers = self._read_registers(ADDR_CURRENT_POSE, REGISTER_COUNT_POSE) return decode_pose(registers) def read_snapshot(self) -> DeviceSnapshot: - # 快照按协议关键字段顺序读取,避免上层重复拼装状态 + """读取模式状态和当前位姿组成一次逻辑快照""" + + # 快照按协议关键字段顺序读取避免上层重复拼装状态 return DeviceSnapshot(self.read_mode(), self.read_status(), self.read_current_pose()) def write_target_pose(self, pose: Pose6D) -> None: + """写入示教目标 XYZABC 位姿""" + self._write_registers(ADDR_TARGET_POSE, encode_pose(pose)) def write_correction(self, pose: Pose6D) -> None: + """写入实时 XYZABC 纠偏量""" + self._write_registers(ADDR_CORRECTION, encode_pose(pose)) def _read_word(self, address: int) -> int: + """从绝对地址读取单个 uint16 寄存器""" + return decode_u16(self._read_registers(address, 1)[0]) def _read_registers(self, address: int, count: int) -> list[int]: + """读取并校验连续保持寄存器""" + try: response = self._backend.read_holding_registers( address, @@ -121,6 +161,8 @@ class LineLaserClient: return [decode_u16(register) for register in response.registers] def _write_registers(self, address: int, registers: list[int]) -> None: + """校验并写入连续保持寄存器""" + safe_registers = [encode_u16(register) for register in registers] try: response = self._backend.write_registers( @@ -136,6 +178,8 @@ class LineLaserClient: @staticmethod def _raise_on_error(response, message: str) -> None: - # pymodbus 的异常响应不是 Python 异常,需要显式判断 + """在 pymodbus 返回异常响应时抛出运行时错误""" + + # pymodbus 的异常响应不是 Python 异常需要显式判断 if response is None or response.isError(): raise RuntimeError(message) diff --git a/src/line_laser_modbus/codec.py b/src/line_laser_modbus/codec.py index bc9157e..86924b9 100644 --- a/src/line_laser_modbus/codec.py +++ b/src/line_laser_modbus/codec.py @@ -1,4 +1,4 @@ -"""Register and RTU frame codec helpers.""" +"""寄存器和 RTU 帧编解码工具""" from __future__ import annotations @@ -15,6 +15,8 @@ from line_laser_modbus.models import Pose6D def encode_u16(value: int) -> int: + """校验并返回一个可放入单个 Modbus 寄存器的值""" + if not 0 <= value <= 0xFFFF: msg = f"Value out of uint16 range: {value}" raise ValueError(msg) @@ -22,10 +24,14 @@ def encode_u16(value: int) -> int: def decode_u16(register: int) -> int: + """校验并解析单个无符号 16 位寄存器""" + return encode_u16(register) def encode_f32(value: float) -> list[int]: + """将一个 float32 编码为两个大端寄存器""" + # 按协议要求使用大端字节序拆成两个保持寄存器 raw = struct.pack(">f", float(value)) high, low = struct.unpack(">HH", raw) @@ -33,6 +39,8 @@ def encode_f32(value: float) -> list[int]: def decode_f32(registers: list[int] | tuple[int, int]) -> float: + """将两个大端寄存器解析为一个 float32""" + if len(registers) != 2: msg = f"float32 requires 2 registers, got {len(registers)}" raise ValueError(msg) @@ -41,6 +49,8 @@ def decode_f32(registers: list[int] | tuple[int, int]) -> float: def encode_pose(pose: Pose6D) -> list[int]: + """将 XYZABC 位姿编码为十二个保持寄存器""" + registers: list[int] = [] for value in pose.as_tuple(): registers.extend(encode_f32(value)) @@ -48,6 +58,8 @@ def encode_pose(pose: Pose6D) -> list[int]: def decode_pose(registers: list[int] | tuple[int, ...]) -> Pose6D: + """将十二个保持寄存器解析为 XYZABC 位姿""" + if len(registers) != REGISTER_COUNT_POSE: msg = f"Pose requires {REGISTER_COUNT_POSE} registers, got {len(registers)}" raise ValueError(msg) @@ -56,6 +68,8 @@ def decode_pose(registers: list[int] | tuple[int, ...]) -> Pose6D: def crc16(data: bytes) -> int: + """计算原始帧字节的 Modbus RTU CRC16""" + # Modbus RTU CRC16 低字节先发但整数内部按正常高低位保存 crc = 0xFFFF for byte in data: @@ -69,16 +83,22 @@ def crc16(data: bytes) -> int: def append_crc(data: bytes) -> bytes: + """给帧载荷追加 Modbus RTU CRC16 字节""" + crc = crc16(data) return data + bytes((crc & 0xFF, crc >> 8)) def build_read_frame(address: int, count: int, slave_id: int = SLAVE_ID) -> bytes: + """构造功能码 0x03 的原始读保持寄存器请求""" + payload = struct.pack(">BBHH", slave_id, FUNC_READ_HOLDING_REGISTERS, address, count) return append_crc(payload) def build_write_frame(address: int, registers: list[int], slave_id: int = SLAVE_ID) -> bytes: + """构造功能码 0x10 的原始写多个寄存器请求""" + count = len(registers) body = b"".join(struct.pack(">H", encode_u16(register)) for register in registers) payload = struct.pack( diff --git a/src/line_laser_modbus/config.py b/src/line_laser_modbus/config.py index 8f94b6d..0e07481 100644 --- a/src/line_laser_modbus/config.py +++ b/src/line_laser_modbus/config.py @@ -1,4 +1,4 @@ -"""Runtime configuration.""" +"""运行配置模型""" from __future__ import annotations @@ -19,6 +19,8 @@ from line_laser_modbus.constants import ( @dataclass(frozen=True, slots=True) class SerialConfig: + """串口通信配置""" + port: str = "COM1" slave_id: int = SLAVE_ID baudrate: int = DEFAULT_BAUDRATE @@ -30,6 +32,8 @@ class SerialConfig: @classmethod def from_toml(cls, path: str | Path) -> SerialConfig: + """从 TOML 文件读取串口配置""" + with Path(path).open("rb") as file: data = tomllib.load(file) serial = data.get("serial", {}) @@ -38,20 +42,26 @@ class SerialConfig: @dataclass(frozen=True, slots=True) class PollingConfig: + """轮询运行配置""" + interval_seconds: float = 0.02 max_timeouts: int = 3 @dataclass(frozen=True, slots=True) class AppConfig: + """应用运行配置""" + serial: SerialConfig = SerialConfig() polling: PollingConfig = PollingConfig() @classmethod def from_toml(cls, path: str | Path) -> AppConfig: + """从 TOML 文件读取完整应用配置""" + with Path(path).open("rb") as file: data = tomllib.load(file) - # 配置文件只暴露运行需要的最小入口,避免协议常量被外部误改 + # 配置文件只暴露运行需要的最小入口避免协议常量被外部误改 return cls( serial=SerialConfig(**data.get("serial", {})), polling=PollingConfig(**data.get("polling", {})), diff --git a/src/line_laser_modbus/constants.py b/src/line_laser_modbus/constants.py index 1742fe4..23bcc9b 100644 --- a/src/line_laser_modbus/constants.py +++ b/src/line_laser_modbus/constants.py @@ -1,4 +1,4 @@ -"""Protocol constants from docs/proto.md.""" +"""来自 docs/proto.md 的协议常量""" SLAVE_ID = 0x08 diff --git a/src/line_laser_modbus/models.py b/src/line_laser_modbus/models.py index 0e2358b..1ee8bbb 100644 --- a/src/line_laser_modbus/models.py +++ b/src/line_laser_modbus/models.py @@ -1,4 +1,4 @@ -"""Typed protocol models.""" +"""协议类型模型""" from __future__ import annotations @@ -8,7 +8,7 @@ from typing import Self class ModeCommand(IntEnum): - """Mode command word at 0xD000.""" + """0xD000 模式命令字""" STANDBY_RESET = 0 CALIBRATION = 1 @@ -19,7 +19,7 @@ class ModeCommand(IntEnum): class DeviceStatus(IntEnum): - """Device status word at 0xD001.""" + """0xD001 设备状态字""" STANDBY_READY = 0 RUNNING = 1 @@ -32,7 +32,7 @@ class DeviceStatus(IntEnum): @dataclass(frozen=True, slots=True) class DeviceSnapshot: - """Current controller state read in one logical cycle.""" + """单次逻辑周期读取到的控制器状态""" mode: ModeCommand status: DeviceStatus @@ -41,7 +41,7 @@ class DeviceSnapshot: @dataclass(frozen=True, slots=True) class Pose6D: - """XYZABC pose or correction vector.""" + """XYZABC 位姿或纠偏向量""" x: float y: float @@ -52,20 +52,28 @@ class Pose6D: @classmethod def zeros(cls) -> Self: + """创建全零位姿""" + return cls(0.0, 0.0, 0.0, 0.0, 0.0, 0.0) @classmethod def from_iterable(cls, values: list[float] | tuple[float, ...]) -> Self: + """从六个数值创建 XYZABC 位姿""" + if len(values) != 6: msg = f"Pose6D requires 6 values, got {len(values)}" raise ValueError(msg) return cls(*(float(value) for value in values)) def as_tuple(self) -> tuple[float, float, float, float, float, float]: + """按协议顺序返回六轴元组""" + return (self.x, self.y, self.z, self.a, self.b, self.c) def ensure_mode(value: int | ModeCommand) -> ModeCommand: + """校验并转换模式命令字""" + try: return ModeCommand(value) except ValueError as exc: @@ -74,6 +82,8 @@ def ensure_mode(value: int | ModeCommand) -> ModeCommand: def ensure_status(value: int | DeviceStatus) -> DeviceStatus: + """校验并转换设备状态字""" + try: return DeviceStatus(value) except ValueError as exc: @@ -86,6 +96,8 @@ def can_switch_mode( target: int | ModeCommand, status: int | DeviceStatus | None = None, ) -> bool: + """判断模式切换是否符合协议状态机规则""" + current_mode = ensure_mode(current) target_mode = ensure_mode(target) current_status = ensure_status(status) if status is not None else None @@ -109,6 +121,8 @@ def validate_mode_switch( target: int | ModeCommand, status: int | DeviceStatus | None = None, ) -> ModeCommand: + """校验模式切换并返回目标模式""" + target_mode = ensure_mode(target) if not can_switch_mode(current, target_mode, status): msg = f"Illegal mode switch: {ensure_mode(current).name} -> {target_mode.name}" diff --git a/src/line_laser_modbus/runner.py b/src/line_laser_modbus/runner.py index 843149b..a8eb684 100644 --- a/src/line_laser_modbus/runner.py +++ b/src/line_laser_modbus/runner.py @@ -1,4 +1,4 @@ -"""Polling loop helpers for production and simulation.""" +"""生产和模拟环境共用的轮询运行器""" from __future__ import annotations @@ -15,8 +15,12 @@ SnapshotHandler = Callable[[DeviceSnapshot], None] def pose_delta(target: Pose6D) -> CorrectionProvider: + """创建按目标位姿减当前位姿计算纠偏量的函数""" + def calculate(snapshot: DeviceSnapshot) -> Pose6D: - # 纠偏量按目标位姿减当前位姿计算,实际项目可替换为轨迹规划结果 + """根据状态快照计算纠偏量""" + + # 纠偏量按目标位姿减当前位姿计算实际项目可替换为轨迹规划结果 return Pose6D.from_iterable( [ target_value - current @@ -32,6 +36,8 @@ def pose_delta(target: Pose6D) -> CorrectionProvider: class PollingRunner: + """按固定周期读取状态并在跟踪模式下写入纠偏量""" + def __init__( self, client: LineLaserClient, @@ -39,6 +45,8 @@ class PollingRunner: snapshot_handler: SnapshotHandler | None = None, config: PollingConfig | None = None, ) -> None: + """创建轮询运行器""" + self.client = client self.correction_provider = correction_provider self.snapshot_handler = snapshot_handler @@ -46,6 +54,8 @@ class PollingRunner: self.timeout_count = 0 def run_once(self) -> DeviceSnapshot: + """执行一次读取和可选纠偏写入""" + snapshot = self.client.read_snapshot() self.timeout_count = 0 if self.snapshot_handler: @@ -55,6 +65,8 @@ class PollingRunner: return snapshot def run_forever(self) -> None: + """按配置周期持续运行直到出现不可恢复超时""" + while True: started = time.monotonic() try: @@ -68,5 +80,7 @@ class PollingRunner: def default_polling_config() -> PollingConfig: + """按协议默认超时时间生成轮询配置""" + max_timeouts = max(1, round(DEFAULT_TIMEOUT_SECONDS / 0.02)) return PollingConfig(interval_seconds=0.02, max_timeouts=max_timeouts) diff --git a/src/line_laser_modbus/simulator.py b/src/line_laser_modbus/simulator.py index b45cf97..0d155c9 100644 --- a/src/line_laser_modbus/simulator.py +++ b/src/line_laser_modbus/simulator.py @@ -1,4 +1,4 @@ -"""In-memory Modbus simulator used by tests and demos.""" +"""测试和演示使用的内存 Modbus 模拟器""" from __future__ import annotations @@ -18,23 +18,31 @@ from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D @dataclass(slots=True) class _ReadResponse: + """模拟 pymodbus 读响应""" + registers: list[int] def isError(self) -> bool: + """返回响应是否为异常响应""" + return False @dataclass(slots=True) class _WriteResponse: + """模拟 pymodbus 写响应""" + address: int count: int def isError(self) -> bool: + """返回响应是否为异常响应""" + return False class SimulatedModbusBackend: - """Small pymodbus-compatible backend without hardware.""" + """不依赖硬件的 pymodbus 兼容后端""" def __init__( self, @@ -44,47 +52,67 @@ class SimulatedModbusBackend: status: DeviceStatus = DeviceStatus.STANDBY_READY, current_pose: Pose6D | None = None, ) -> None: + """创建模拟后端并写入初始寄存器值""" + self.slave_id = slave_id self.connected = False self.registers: dict[int, int] = {} self._seed(mode, status, current_pose or Pose6D.zeros()) def connect(self) -> bool: + """标记模拟后端为已连接""" + self.connected = True return True def close(self) -> None: + """标记模拟后端为已关闭""" + self.connected = False def read_holding_registers(self, address: int, *, count: int, device_id: int) -> _ReadResponse: + """从模拟寄存器表读取连续保持寄存器""" + self._ensure_ready(device_id) return _ReadResponse([self.registers.get(address + offset, 0) for offset in range(count)]) def write_registers(self, address: int, values: list[int], *, device_id: int) -> _WriteResponse: + """向模拟寄存器表写入连续保持寄存器""" + self._ensure_ready(device_id) for offset, value in enumerate(values): self.registers[address + offset] = value return _WriteResponse(address, len(values)) def target_pose(self) -> Pose6D: + """读取模拟器中保存的目标示教位姿""" + return self._read_pose(ADDR_TARGET_POSE) def correction(self) -> Pose6D: + """读取模拟器中保存的实时纠偏量""" + return self._read_pose(ADDR_CORRECTION) def _seed(self, mode: ModeCommand, status: DeviceStatus, pose: Pose6D) -> None: + """写入模拟器初始模式状态和当前位姿""" + self.registers[ADDR_MODE_COMMAND] = mode.value self.registers[ADDR_DEVICE_STATUS] = status.value for offset, value in enumerate(encode_pose(pose)): self.registers[ADDR_CURRENT_POSE + offset] = value def _read_pose(self, address: int) -> Pose6D: + """从指定地址读取一组模拟位姿寄存器""" + from line_laser_modbus.codec import decode_pose return decode_pose([self.registers.get(address + offset, 0) for offset in range(12)]) def _ensure_ready(self, device_id: int) -> None: - # 模拟器也校验从站地址,避免测试漏掉 Unit ID 配置 + """检查连接状态和从站地址""" + + # 模拟器也校验从站地址避免测试漏掉 Unit ID 配置 if not self.connected: raise ConnectionError("Simulated backend is not connected") if device_id != self.slave_id: