Files
line-laser-modbus/src/line_laser_modbus/client.py
T
2026-06-22 09:52:30 +08:00

236 lines
7.5 KiB
Python

"""线激光 Modbus 协议客户端"""
from __future__ import annotations
from types import TracebackType
from typing import Protocol, Self
from pymodbus import ModbusException
from pymodbus.client import ModbusSerialClient
from line_laser_modbus.codec import (
decode_timed_pose,
decode_u16,
encode_timed_pose,
encode_u16,
)
from line_laser_modbus.config import SerialConfig
from line_laser_modbus.constants import (
ADDR_AVAILABLE_CACHE_COUNT,
ADDR_CORRECTION,
ADDR_CURRENT_POSE,
ADDR_DEVICE_STATUS,
ADDR_MODE_COMMAND,
ADDR_TARGET_POSE,
REGISTER_COUNT_POSE,
)
from line_laser_modbus.models import (
NORMAL_MODE_COMMANDS,
DeviceSnapshot,
DeviceStatus,
ModeCommand,
Pose6D,
TimedPose6D,
ensure_mode,
ensure_status,
validate_mode_switch,
)
class ModbusBackend(Protocol):
"""协议客户端需要的最小后端接口"""
def connect(self) -> bool:
"""打开后端连接"""
...
def close(self) -> None:
"""关闭后端连接"""
...
def read_holding_registers(self, address: int, *, count: int, device_id: int):
"""从设备读取保持寄存器"""
...
def write_registers(self, address: int, values: list[int], *, device_id: int):
"""向设备写入保持寄存器"""
...
class LineLaserClient:
"""面向协议语义的类型化读写客户端"""
def __init__(self, config: SerialConfig, backend: ModbusBackend | None = None) -> None:
"""创建真实串口客户端或注入模拟后端"""
self.config = config
self._backend = backend or ModbusSerialClient(
port=config.port,
baudrate=config.baudrate,
bytesize=config.bytesize,
parity=config.parity,
stopbits=config.stopbits,
timeout=config.timeout,
retries=config.retries,
)
def __enter__(self) -> Self:
"""进入上下文管理器时连接后端"""
self.connect()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
traceback: TracebackType | None,
) -> None:
"""退出上下文管理器时关闭后端"""
self.close()
def connect(self) -> None:
"""连接配置中的 Modbus 设备"""
if not self._backend.connect():
msg = f"Unable to connect Modbus device on {self.config.port}"
raise ConnectionError(msg)
def close(self) -> None:
"""关闭当前 Modbus 连接"""
self._backend.close()
def read_mode(self) -> ModeCommand:
"""读取 0xD000 模式命令字"""
return ensure_mode(self._read_word(ADDR_MODE_COMMAND))
def write_mode(self, mode: int | ModeCommand) -> None:
"""不做状态机校验直接写入模式命令字"""
mode_command = ensure_mode(mode)
if mode_command not in NORMAL_MODE_COMMANDS:
msg = "Mode command 5 is reserved for emergency stop; use trigger_emergency_stop()"
raise ValueError(msg)
self._write_registers(ADDR_MODE_COMMAND, [encode_u16(mode_command.value)])
def trigger_emergency_stop(self) -> None:
"""按 V1.3 示例帧向模式命令字写入急停值 5"""
self._write_registers(ADDR_MODE_COMMAND, [encode_u16(ModeCommand.EMERGENCY_STOP.value)])
def switch_mode(self, mode: int | ModeCommand) -> None:
"""按协议状态机规则校验后切换模式"""
current = self.read_mode()
status = self.read_status()
target = validate_mode_switch(current, mode, status)
self.write_mode(target)
def read_status(self) -> DeviceStatus:
"""读取 0xD001 设备状态字"""
return ensure_status(self._read_word(ADDR_DEVICE_STATUS))
def read_available_cache_count(self) -> int:
"""读取 0xD002 目标位姿可用缓存数量"""
return self._read_word(ADDR_AVAILABLE_CACHE_COUNT)
def read_current_pose(self) -> Pose6D:
"""读取控制器当前 XYZABC 位姿"""
return self.read_current_timed_pose().pose
def read_current_timed_pose(self) -> TimedPose6D:
"""读取控制器当前时间戳和 XYZABC 位姿"""
registers = self._read_registers(ADDR_CURRENT_POSE, REGISTER_COUNT_POSE)
return decode_timed_pose(registers)
def read_snapshot(self) -> DeviceSnapshot:
"""读取模式状态和当前位姿组成一次逻辑快照"""
mode = self.read_mode()
status = self.read_status()
if mode is ModeCommand.MANUAL_TEACHING or status is DeviceStatus.EMERGENCY_TRIGGERED:
return DeviceSnapshot(mode, status, Pose6D.zeros(), 0)
timed_pose = self.read_current_timed_pose()
return DeviceSnapshot(
mode,
status,
timed_pose.pose,
timed_pose.timestamp,
)
def write_target_pose(self, pose: Pose6D, *, timestamp: int = 0) -> None:
"""写入示教目标 XYZABC 位姿"""
self.write_target_timed_pose(TimedPose6D.from_pose(pose, timestamp))
def write_target_timed_pose(self, data: TimedPose6D) -> None:
"""写入示教目标时间戳和 XYZABC 位姿"""
if self.read_available_cache_count() <= 0:
msg = "Target pose cache is full; available cache count at 0xD002 is 0"
raise RuntimeError(msg)
self._write_registers(ADDR_TARGET_POSE, encode_timed_pose(data))
def write_correction(self, pose: Pose6D, *, timestamp: int = 0) -> None:
"""写入实时 XYZABC 纠偏量"""
self.write_timed_correction(TimedPose6D.from_pose(pose, timestamp))
def write_timed_correction(self, data: TimedPose6D) -> None:
"""写入实时时间戳和 XYZABC 纠偏量"""
self._write_registers(ADDR_CORRECTION, encode_timed_pose(data))
def _read_word(self, address: int) -> int:
"""从绝对地址读取单个 uint16 寄存器"""
return decode_u16(self._read_registers(address, 1)[0])
def _read_registers(self, address: int, count: int) -> list[int]:
"""读取并校验连续保持寄存器"""
try:
response = self._backend.read_holding_registers(
address,
count=count,
device_id=self.config.slave_id,
)
except ModbusException as exc:
msg = f"Modbus read failed at 0x{address:04X}"
raise RuntimeError(msg) from exc
self._raise_on_error(response, f"Modbus read error at 0x{address:04X}")
return [decode_u16(register) for register in response.registers]
def _write_registers(self, address: int, registers: list[int]) -> None:
"""校验并写入连续保持寄存器"""
safe_registers = [encode_u16(register) for register in registers]
try:
response = self._backend.write_registers(
address,
safe_registers,
device_id=self.config.slave_id,
)
except ModbusException as exc:
msg = f"Modbus write failed at 0x{address:04X}"
raise RuntimeError(msg) from exc
self._raise_on_error(response, f"Modbus write error at 0x{address:04X}")
@staticmethod
def _raise_on_error(response, message: str) -> None:
"""在 pymodbus 返回异常响应时抛出运行时错误"""
# pymodbus 的异常响应不是 Python 异常需要显式判断
if response is None or response.isError():
raise RuntimeError(message)