feat: 完成客户端与模拟器实现

This commit is contained in:
2026-05-13 13:48:37 +08:00
Unverified
parent 7d779095a9
commit da3d41a7b7
4 changed files with 258 additions and 1 deletions
+2 -1
View File
@@ -1,6 +1,7 @@
"""Line laser Modbus protocol package."""
from line_laser_modbus.client import LineLaserClient
from line_laser_modbus.config import SerialConfig
from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D
__all__ = ["DeviceStatus", "ModeCommand", "Pose6D", "SerialConfig"]
__all__ = ["DeviceStatus", "LineLaserClient", "ModeCommand", "Pose6D", "SerialConfig"]
+42
View File
@@ -0,0 +1,42 @@
"""Command line helpers."""
from __future__ import annotations
import argparse
from line_laser_modbus.client import LineLaserClient
from line_laser_modbus.config import SerialConfig
from line_laser_modbus.models import ModeCommand, Pose6D
from line_laser_modbus.simulator import SimulatedModbusBackend
def main() -> None:
parser = argparse.ArgumentParser(prog="line-laser-modbus")
parser.add_argument("--config", help="TOML config path")
parser.add_argument("--port", default="COM1", help="Serial port when no config is provided")
parser.add_argument("--simulate", action="store_true", help="Use in-memory simulator")
subparsers = parser.add_subparsers(dest="command", required=True)
subparsers.add_parser("read-status")
write_mode = subparsers.add_parser("write-mode")
write_mode.add_argument("mode", type=int, choices=[mode.value for mode in ModeCommand])
subparsers.add_parser("demo")
args = parser.parse_args()
config = SerialConfig.from_toml(args.config) if args.config else SerialConfig(port=args.port)
backend = SimulatedModbusBackend() if args.simulate else None
with LineLaserClient(config, backend=backend) as client:
if args.command == "read-status":
print(client.read_status().name)
elif args.command == "write-mode":
client.write_mode(args.mode)
print(client.read_mode().name)
elif args.command == "demo":
client.write_mode(ModeCommand.ONLINE_TRACKING)
client.write_correction(Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0))
print(client.read_mode().name)
if __name__ == "__main__":
main()
+123
View File
@@ -0,0 +1,123 @@
"""High-level Modbus client for the line laser protocol."""
from __future__ import annotations
from types import TracebackType
from typing import Protocol, Self
from pymodbus import ModbusException
from pymodbus.client import ModbusSerialClient
from line_laser_modbus.codec import decode_pose, decode_u16, encode_pose, encode_u16
from line_laser_modbus.config import SerialConfig
from line_laser_modbus.constants import (
ADDR_CORRECTION,
ADDR_CURRENT_POSE,
ADDR_DEVICE_STATUS,
ADDR_MODE_COMMAND,
ADDR_TARGET_POSE,
REGISTER_COUNT_POSE,
)
from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D, ensure_mode, ensure_status
class ModbusBackend(Protocol):
def connect(self) -> bool: ...
def close(self) -> None: ...
def read_holding_registers(self, address: int, *, count: int, device_id: int): ...
def write_registers(self, address: int, values: list[int], *, device_id: int): ...
class LineLaserClient:
"""Protocol-specific client with typed read/write helpers."""
def __init__(self, config: SerialConfig, backend: ModbusBackend | None = None) -> None:
self.config = config
self._backend = backend or ModbusSerialClient(
port=config.port,
baudrate=config.baudrate,
bytesize=config.bytesize,
parity=config.parity,
stopbits=config.stopbits,
timeout=config.timeout,
retries=config.retries,
)
def __enter__(self) -> Self:
self.connect()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.close()
def connect(self) -> None:
if not self._backend.connect():
msg = f"Unable to connect Modbus device on {self.config.port}"
raise ConnectionError(msg)
def close(self) -> None:
self._backend.close()
def read_mode(self) -> ModeCommand:
return ensure_mode(self._read_word(ADDR_MODE_COMMAND))
def write_mode(self, mode: int | ModeCommand) -> None:
self._write_registers(ADDR_MODE_COMMAND, [encode_u16(ensure_mode(mode).value)])
def read_status(self) -> DeviceStatus:
return ensure_status(self._read_word(ADDR_DEVICE_STATUS))
def read_current_pose(self) -> Pose6D:
registers = self._read_registers(ADDR_CURRENT_POSE, REGISTER_COUNT_POSE)
return decode_pose(registers)
def write_target_pose(self, pose: Pose6D) -> None:
self._write_registers(ADDR_TARGET_POSE, encode_pose(pose))
def write_correction(self, pose: Pose6D) -> None:
self._write_registers(ADDR_CORRECTION, encode_pose(pose))
def _read_word(self, address: int) -> int:
return decode_u16(self._read_registers(address, 1)[0])
def _read_registers(self, address: int, count: int) -> list[int]:
try:
response = self._backend.read_holding_registers(
address,
count=count,
device_id=self.config.slave_id,
)
except ModbusException as exc:
msg = f"Modbus read failed at 0x{address:04X}"
raise RuntimeError(msg) from exc
self._raise_on_error(response, f"Modbus read error at 0x{address:04X}")
return [decode_u16(register) for register in response.registers]
def _write_registers(self, address: int, registers: list[int]) -> None:
safe_registers = [encode_u16(register) for register in registers]
try:
response = self._backend.write_registers(
address,
safe_registers,
device_id=self.config.slave_id,
)
except ModbusException as exc:
msg = f"Modbus write failed at 0x{address:04X}"
raise RuntimeError(msg) from exc
self._raise_on_error(response, f"Modbus write error at 0x{address:04X}")
@staticmethod
def _raise_on_error(response, message: str) -> None:
# pymodbus 的异常响应不是 Python 异常,需要显式判断
if response is None or response.isError():
raise RuntimeError(message)
+91
View File
@@ -0,0 +1,91 @@
"""In-memory Modbus simulator used by tests and demos."""
from __future__ import annotations
from dataclasses import dataclass
from line_laser_modbus.codec import encode_pose
from line_laser_modbus.constants import (
ADDR_CORRECTION,
ADDR_CURRENT_POSE,
ADDR_DEVICE_STATUS,
ADDR_MODE_COMMAND,
ADDR_TARGET_POSE,
SLAVE_ID,
)
from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D
@dataclass(slots=True)
class _ReadResponse:
registers: list[int]
def isError(self) -> bool:
return False
@dataclass(slots=True)
class _WriteResponse:
address: int
count: int
def isError(self) -> bool:
return False
class SimulatedModbusBackend:
"""Small pymodbus-compatible backend without hardware."""
def __init__(
self,
*,
slave_id: int = SLAVE_ID,
mode: ModeCommand = ModeCommand.STANDBY_RESET,
status: DeviceStatus = DeviceStatus.STANDBY_READY,
current_pose: Pose6D | None = None,
) -> None:
self.slave_id = slave_id
self.connected = False
self.registers: dict[int, int] = {}
self._seed(mode, status, current_pose or Pose6D.zeros())
def connect(self) -> bool:
self.connected = True
return True
def close(self) -> None:
self.connected = False
def read_holding_registers(self, address: int, *, count: int, device_id: int) -> _ReadResponse:
self._ensure_ready(device_id)
return _ReadResponse([self.registers.get(address + offset, 0) for offset in range(count)])
def write_registers(self, address: int, values: list[int], *, device_id: int) -> _WriteResponse:
self._ensure_ready(device_id)
for offset, value in enumerate(values):
self.registers[address + offset] = value
return _WriteResponse(address, len(values))
def target_pose(self) -> Pose6D:
return self._read_pose(ADDR_TARGET_POSE)
def correction(self) -> Pose6D:
return self._read_pose(ADDR_CORRECTION)
def _seed(self, mode: ModeCommand, status: DeviceStatus, pose: Pose6D) -> None:
self.registers[ADDR_MODE_COMMAND] = mode.value
self.registers[ADDR_DEVICE_STATUS] = status.value
for offset, value in enumerate(encode_pose(pose)):
self.registers[ADDR_CURRENT_POSE + offset] = value
def _read_pose(self, address: int) -> Pose6D:
from line_laser_modbus.codec import decode_pose
return decode_pose([self.registers.get(address + offset, 0) for offset in range(12)])
def _ensure_ready(self, device_id: int) -> None:
# 模拟器也校验从站地址,避免测试漏掉 Unit ID 配置
if not self.connected:
raise ConnectionError("Simulated backend is not connected")
if device_id != self.slave_id:
raise ConnectionError(f"Unexpected slave id: {device_id}")