feat: 根据v1.2的文档完成代码迁移

This commit is contained in:
2026-05-28 12:20:10 +08:00
Unverified
parent 3991a8a2ac
commit 866814e94e
16 changed files with 245 additions and 40 deletions
+3 -3
View File
@@ -7,9 +7,9 @@
## 功能
- 基于 `pymodbus` 的 Modbus RTU 串口客户端
- 按协议实现 uint16、float32、XYZABC 位姿和 CRC16 编解码
- 按协议实现 uint16、uint32 时间戳、float32、XYZABC 位姿和 CRC16 编解码
- 提供无硬件内存模拟器用于测试
- 提供模式状态机校验和 20ms 轮询运行器
- 提供模式状态机校验和 50ms 轮询运行器
- 使用 TOML 作为运行配置入口
## 安装与测试
@@ -52,7 +52,7 @@ python -m uv run python examples/real_serial_read_status.py
python -m uv run line-laser-modbus read-status
```
配置文件包含 `[serial]``[polling]` 两段,其中 `[polling]` 用于配置 20ms 轮询周期和连续超时判定次数。
配置文件包含 `[serial]``[polling]` 两段,其中 `[polling]` 用于配置 50ms 轮询周期和连续超时判定次数。
## 类型标记
+1 -1
View File
@@ -6,7 +6,7 @@
- [ ] 增加 CLI 模拟模式测试,覆盖常用命令输出和参数校验。
- [ ] 增加轮询异常测试,覆盖连续超时、恢复计数和不可恢复错误。
- [ ] 增加真实串口硬件联调记录,确认状态字、当前位姿、模式写入、目标位姿和纠偏量读写。
- [ ] 验证 20ms 轮询稳定性,记录实际通信耗时、超时次数和重试表现。
- [ ] 验证 50ms 轮询稳定性,记录实际通信耗时、超时次数和重试表现。
- [ ] 完善发布元数据,包括 license、authors、classifiers 和 project URLs。
- [ ] 评估是否加入类型检查和覆盖率统计。
- [ ] 根据现场设备参数更新或补充配置示例。
+2 -2
View File
@@ -5,9 +5,9 @@ baudrate = 115200
bytesize = 8
parity = "N"
stopbits = 1
timeout = 0.05
timeout = 0.15
retries = 3
[polling]
interval_seconds = 0.02
interval_seconds = 0.05
max_timeouts = 3
+2 -1
View File
@@ -2,7 +2,7 @@
from line_laser_modbus.client import LineLaserClient
from line_laser_modbus.config import AppConfig, PollingConfig, SerialConfig
from line_laser_modbus.models import DeviceSnapshot, DeviceStatus, ModeCommand, Pose6D
from line_laser_modbus.models import DeviceSnapshot, DeviceStatus, ModeCommand, Pose6D, TimedPose6D
from line_laser_modbus.runner import PollingRunner
# 公开库调用时最常用的类型和入口。
@@ -16,4 +16,5 @@ __all__ = [
"PollingRunner",
"Pose6D",
"SerialConfig",
"TimedPose6D",
]
+34 -7
View File
@@ -8,7 +8,12 @@ from typing import Protocol, Self
from pymodbus import ModbusException
from pymodbus.client import ModbusSerialClient
from line_laser_modbus.codec import decode_pose, decode_u16, encode_pose, encode_u16
from line_laser_modbus.codec import (
decode_timed_pose,
decode_u16,
encode_timed_pose,
encode_u16,
)
from line_laser_modbus.config import SerialConfig
from line_laser_modbus.constants import (
ADDR_CORRECTION,
@@ -23,6 +28,7 @@ from line_laser_modbus.models import (
DeviceStatus,
ModeCommand,
Pose6D,
TimedPose6D,
ensure_mode,
ensure_status,
validate_mode_switch,
@@ -120,24 +126,45 @@ class LineLaserClient:
def read_current_pose(self) -> Pose6D:
"""读取控制器当前 XYZABC 位姿"""
return self.read_current_timed_pose().pose
def read_current_timed_pose(self) -> TimedPose6D:
"""读取控制器当前时间戳和 XYZABC 位姿"""
registers = self._read_registers(ADDR_CURRENT_POSE, REGISTER_COUNT_POSE)
return decode_pose(registers)
return decode_timed_pose(registers)
def read_snapshot(self) -> DeviceSnapshot:
"""读取模式状态和当前位姿组成一次逻辑快照"""
# 快照按协议关键字段顺序读取避免上层重复拼装状态
return DeviceSnapshot(self.read_mode(), self.read_status(), self.read_current_pose())
timed_pose = self.read_current_timed_pose()
return DeviceSnapshot(
self.read_mode(),
self.read_status(),
timed_pose.pose,
timed_pose.timestamp,
)
def write_target_pose(self, pose: Pose6D) -> None:
def write_target_pose(self, pose: Pose6D, *, timestamp: int = 0) -> None:
"""写入示教目标 XYZABC 位姿"""
self._write_registers(ADDR_TARGET_POSE, encode_pose(pose))
self.write_target_timed_pose(TimedPose6D.from_pose(pose, timestamp))
def write_correction(self, pose: Pose6D) -> None:
def write_target_timed_pose(self, data: TimedPose6D) -> None:
"""写入示教目标时间戳和 XYZABC 位姿"""
self._write_registers(ADDR_TARGET_POSE, encode_timed_pose(data))
def write_correction(self, pose: Pose6D, *, timestamp: int = 0) -> None:
"""写入实时 XYZABC 纠偏量"""
self._write_registers(ADDR_CORRECTION, encode_pose(pose))
self.write_timed_correction(TimedPose6D.from_pose(pose, timestamp))
def write_timed_correction(self, data: TimedPose6D) -> None:
"""写入实时时间戳和 XYZABC 纠偏量"""
self._write_registers(ADDR_CORRECTION, encode_timed_pose(data))
def _read_word(self, address: int) -> int:
"""从绝对地址读取单个 uint16 寄存器"""
+57 -6
View File
@@ -8,10 +8,12 @@ from line_laser_modbus.constants import (
AXIS_NAMES,
FUNC_READ_HOLDING_REGISTERS,
FUNC_WRITE_MULTIPLE_REGISTERS,
REGISTER_COUNT_AXES,
REGISTER_COUNT_POSE,
REGISTER_COUNT_TIMESTAMP,
SLAVE_ID,
)
from line_laser_modbus.models import Pose6D
from line_laser_modbus.models import Pose6D, TimedPose6D
def encode_u16(value: int) -> int:
@@ -29,6 +31,26 @@ def decode_u16(register: int) -> int:
return encode_u16(register)
def encode_u32(value: int) -> list[int]:
"""将一个 uint32 编码为两个大端寄存器"""
if not 0 <= value <= 0xFFFFFFFF:
msg = f"Value out of uint32 range: {value}"
raise ValueError(msg)
high, low = struct.unpack(">HH", struct.pack(">I", value))
return [high, low]
def decode_u32(registers: list[int] | tuple[int, int]) -> int:
"""将两个大端寄存器解析为一个 uint32"""
if len(registers) != REGISTER_COUNT_TIMESTAMP:
msg = f"uint32 requires 2 registers, got {len(registers)}"
raise ValueError(msg)
raw = struct.pack(">HH", encode_u16(registers[0]), encode_u16(registers[1]))
return struct.unpack(">I", raw)[0]
def encode_f32(value: float) -> list[int]:
"""将一个 float32 编码为两个大端寄存器"""
@@ -49,7 +71,13 @@ def decode_f32(registers: list[int] | tuple[int, int]) -> float:
def encode_pose(pose: Pose6D) -> list[int]:
"""将 XYZABC 位姿编码为十二个保持寄存器"""
"""将 XYZABC 数据按 V1.2 编码为时间戳 0 加十二个寄存器"""
return encode_timed_pose(TimedPose6D.from_pose(pose))
def encode_axes(pose: Pose6D) -> list[int]:
"""将 XYZABC 六轴数据编码为十二个保持寄存器"""
registers: list[int] = []
for value in pose.as_tuple():
@@ -57,16 +85,39 @@ def encode_pose(pose: Pose6D) -> list[int]:
return registers
def decode_pose(registers: list[int] | tuple[int, ...]) -> Pose6D:
"""将十二个保持寄存器解析为 XYZABC 位姿"""
def decode_axes(registers: list[int] | tuple[int, ...]) -> Pose6D:
"""将十二个寄存器解析为 XYZABC 数据"""
if len(registers) != REGISTER_COUNT_POSE:
msg = f"Pose requires {REGISTER_COUNT_POSE} registers, got {len(registers)}"
if len(registers) != REGISTER_COUNT_AXES:
msg = f"Pose axes require {REGISTER_COUNT_AXES} registers, got {len(registers)}"
raise ValueError(msg)
values = [decode_f32(registers[index : index + 2]) for index in range(0, len(registers), 2)]
return Pose6D(**dict(zip(AXIS_NAMES, values, strict=True)))
def encode_timed_pose(data: TimedPose6D) -> list[int]:
"""将时间戳和 XYZABC 编码为十四个保持寄存器"""
return [*encode_u32(data.timestamp), *encode_axes(data.pose)]
def decode_timed_pose(registers: list[int] | tuple[int, ...]) -> TimedPose6D:
"""将十四个保持寄存器解析为时间戳和 XYZABC 数据"""
if len(registers) != REGISTER_COUNT_POSE:
msg = f"Timed pose requires {REGISTER_COUNT_POSE} registers, got {len(registers)}"
raise ValueError(msg)
timestamp = decode_u32(registers[:REGISTER_COUNT_TIMESTAMP])
pose = decode_axes(registers[REGISTER_COUNT_TIMESTAMP:])
return TimedPose6D(timestamp, pose)
def decode_pose(registers: list[int] | tuple[int, ...]) -> Pose6D:
"""将 V1.2 时间戳加六轴数据块解析为 XYZABC 数据"""
return decode_timed_pose(registers).pose
def crc16(data: bytes) -> int:
"""计算原始帧字节的 Modbus RTU CRC16"""
+2 -1
View File
@@ -10,6 +10,7 @@ from line_laser_modbus.constants import (
DEFAULT_BAUDRATE,
DEFAULT_BYTESIZE,
DEFAULT_PARITY,
DEFAULT_POLLING_INTERVAL_SECONDS,
DEFAULT_RETRIES,
DEFAULT_STOPBITS,
DEFAULT_TIMEOUT_SECONDS,
@@ -44,7 +45,7 @@ class SerialConfig:
class PollingConfig:
"""轮询运行配置"""
interval_seconds: float = 0.02
interval_seconds: float = DEFAULT_POLLING_INTERVAL_SECONDS
max_timeouts: int = 3
+18 -6
View File
@@ -9,15 +9,26 @@ FUNC_WRITE_MULTIPLE_REGISTERS = 0x10
# 保持寄存器绝对地址。
ADDR_MODE_COMMAND = 0xD000
ADDR_DEVICE_STATUS = 0xD001
ADDR_EXTENSION_RESERVED_1_START = 0xD002
ADDR_EXTENSION_RESERVED_1_END = 0xD009
ADDR_CURRENT_POSE = 0xD00A
ADDR_TARGET_POSE = 0xD016
ADDR_CORRECTION = 0xD022
ADDR_CALIBRATION_RESERVED_START = 0xD02E
ADDR_CALIBRATION_RESERVED_END = 0xD04D
ADDR_EXTENSION_RESERVED_2_START = 0xD018
ADDR_EXTENSION_RESERVED_2_END = 0xD01F
ADDR_TARGET_POSE = 0xD020
ADDR_EXTENSION_RESERVED_3_START = 0xD02E
ADDR_EXTENSION_RESERVED_3_END = 0xD035
ADDR_CORRECTION = 0xD036
ADDR_EXTENSION_RESERVED_4_START = 0xD044
ADDR_EXTENSION_RESERVED_4_END = 0xD04B
ADDR_CALIBRATION_RESERVED_START = 0xD04C
ADDR_CALIBRATION_RESERVED_END = 0xD06B
# 协议数据宽度和位姿字段顺序。
REGISTER_COUNT_WORD = 1
REGISTER_COUNT_POSE = 12
REGISTER_COUNT_TIMESTAMP = 2
REGISTER_COUNT_AXES = 12
REGISTER_COUNT_POSE = 14
REGISTER_COUNT_EXTENSION_RESERVED = 8
REGISTER_COUNT_CALIBRATION_RESERVED = 32
AXIS_NAMES = ("x", "y", "z", "a", "b", "c")
@@ -26,5 +37,6 @@ DEFAULT_BAUDRATE = 115200
DEFAULT_BYTESIZE = 8
DEFAULT_PARITY = "N"
DEFAULT_STOPBITS = 1
DEFAULT_TIMEOUT_SECONDS = 0.05
DEFAULT_TIMEOUT_SECONDS = 0.15
DEFAULT_RETRIES = 3
DEFAULT_POLLING_INTERVAL_SECONDS = 0.05
+16 -1
View File
@@ -37,6 +37,7 @@ class DeviceSnapshot:
mode: ModeCommand
status: DeviceStatus
pose: Pose6D
timestamp: int = 0
@dataclass(frozen=True, slots=True)
@@ -71,6 +72,20 @@ class Pose6D:
return (self.x, self.y, self.z, self.a, self.b, self.c)
@dataclass(frozen=True, slots=True)
class TimedPose6D:
"""V1.2 协议中的时间戳加 XYZABC 数据块"""
timestamp: int
pose: Pose6D
@classmethod
def from_pose(cls, pose: Pose6D, timestamp: int = 0) -> Self:
"""从普通六轴数据创建带时间戳数据块"""
return cls(timestamp, pose)
def ensure_mode(value: int | ModeCommand) -> ModeCommand:
"""校验并转换模式命令字"""
@@ -107,7 +122,7 @@ def can_switch_mode(
if current_mode is ModeCommand.EMERGENCY_STOP:
return False
if current_status is DeviceStatus.CALIBRATION_DONE:
# 标定完成后由控制器内部回待机,不直接进入运行模式。
# V1.2 状态机要求标定完成后回待机,不直接进入运行模式。
return False
if current_status is DeviceStatus.TEACHING_DONE:
# 示教完成后可进入依赖标准轨迹的运行模式。
+6 -3
View File
@@ -7,7 +7,7 @@ from collections.abc import Callable
from line_laser_modbus.client import LineLaserClient
from line_laser_modbus.config import PollingConfig
from line_laser_modbus.constants import DEFAULT_TIMEOUT_SECONDS
from line_laser_modbus.constants import DEFAULT_POLLING_INTERVAL_SECONDS, DEFAULT_TIMEOUT_SECONDS
from line_laser_modbus.models import DeviceSnapshot, ModeCommand, Pose6D
CorrectionProvider = Callable[[DeviceSnapshot], Pose6D]
@@ -82,5 +82,8 @@ class PollingRunner:
def default_polling_config() -> PollingConfig:
"""按协议默认超时时间生成轮询配置"""
max_timeouts = max(1, round(DEFAULT_TIMEOUT_SECONDS / 0.02))
return PollingConfig(interval_seconds=0.02, max_timeouts=max_timeouts)
max_timeouts = max(1, round(DEFAULT_TIMEOUT_SECONDS / DEFAULT_POLLING_INTERVAL_SECONDS))
return PollingConfig(
interval_seconds=DEFAULT_POLLING_INTERVAL_SECONDS,
max_timeouts=max_timeouts,
)
+2 -4
View File
@@ -4,7 +4,7 @@ from __future__ import annotations
from dataclasses import dataclass
from line_laser_modbus.codec import encode_pose
from line_laser_modbus.codec import decode_pose, encode_pose
from line_laser_modbus.constants import (
ADDR_CORRECTION,
ADDR_CURRENT_POSE,
@@ -105,9 +105,7 @@ class SimulatedModbusBackend:
def _read_pose(self, address: int) -> Pose6D:
"""从指定地址读取一组模拟位姿寄存器"""
from line_laser_modbus.codec import decode_pose
return decode_pose([self.registers.get(address + offset, 0) for offset in range(12)])
return decode_pose([self.registers.get(address + offset, 0) for offset in range(14)])
def _ensure_ready(self, device_id: int) -> None:
"""检查连接状态和从站地址"""
+32 -1
View File
@@ -2,7 +2,11 @@ import pytest
from line_laser_modbus.client import LineLaserClient
from line_laser_modbus.config import SerialConfig
from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D
from line_laser_modbus.constants import (
ADDR_CORRECTION,
ADDR_TARGET_POSE,
)
from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D, TimedPose6D
from line_laser_modbus.simulator import SimulatedModbusBackend
@@ -16,6 +20,9 @@ def test_client_reads_seeded_status_and_pose_from_simulator() -> None:
with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client:
assert client.read_status() is DeviceStatus.TRACKING_OK
assert client.read_current_pose() == pose
timed_pose = client.read_current_timed_pose()
assert timed_pose.timestamp == 0
assert timed_pose.pose == pose
def test_client_writes_mode_and_correction_to_simulator() -> None:
@@ -30,6 +37,30 @@ def test_client_writes_mode_and_correction_to_simulator() -> None:
assert backend.correction() == correction
def test_client_writes_timed_target_pose_to_simulator() -> None:
backend = SimulatedModbusBackend()
target = TimedPose6D(1234, Pose6D(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))
with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client:
client.write_target_timed_pose(target)
assert backend.target_pose() == target.pose
assert backend.registers[ADDR_TARGET_POSE] == 0x0000
assert backend.registers[ADDR_TARGET_POSE + 1] == 0x04D2
def test_client_writes_timed_correction_to_simulator() -> None:
backend = SimulatedModbusBackend()
correction = TimedPose6D(1000, Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0))
with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client:
client.write_timed_correction(correction)
assert backend.correction() == correction.pose
assert backend.registers[ADDR_CORRECTION] == 0x0000
assert backend.registers[ADDR_CORRECTION + 1] == 0x03E8
def test_simulator_rejects_wrong_slave_id() -> None:
backend = SimulatedModbusBackend()
+27 -1
View File
@@ -2,9 +2,11 @@ from line_laser_modbus.codec import (
build_read_frame,
build_write_frame,
decode_pose,
decode_timed_pose,
encode_pose,
encode_timed_pose,
)
from line_laser_modbus.models import Pose6D
from line_laser_modbus.models import Pose6D, TimedPose6D
def test_read_frame_matches_readme_example() -> None:
@@ -20,3 +22,27 @@ def test_mode_write_frame_matches_readme_example() -> None:
def test_pose_float_register_roundtrip() -> None:
pose = Pose6D(1.25, -2.5, 3.0, 0.0, 45.5, -90.0)
assert decode_pose(encode_pose(pose)) == pose
def test_timed_pose_register_roundtrip() -> None:
data = TimedPose6D(1000, Pose6D(1.25, -2.5, 3.0, 0.0, 45.5, -90.0))
registers = encode_timed_pose(data)
assert len(registers) == 14
assert decode_timed_pose(registers) == data
def test_current_pose_read_frame_matches_v12_example() -> None:
frame = build_read_frame(0xD00A, 14)
assert frame.hex(" ").upper() == "08 03 D0 0A 00 0E DC 55"
def test_correction_write_frame_matches_v12_docx_example() -> None:
data = TimedPose6D(1000, Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0))
frame = build_write_frame(0xD036, encode_timed_pose(data), slave_id=0x01)
assert (
frame.hex(" ").upper()
== "01 10 D0 36 00 0E 1C 00 00 03 E8 3F 80 00 00 40 00 00 00 "
"40 40 00 00 00 00 00 00 3F 80 00 00 40 00 00 00 D0 72"
)
+6
View File
@@ -23,3 +23,9 @@ max_timeouts = 5
assert config.serial.port == "COM9"
assert config.polling.interval_seconds == 0.01
assert config.polling.max_timeouts == 5
def test_default_config_matches_v12_timing() -> None:
config = AppConfig()
assert config.serial.timeout == 0.15
assert config.polling.interval_seconds == 0.05
+36 -3
View File
@@ -1,11 +1,44 @@
from line_laser_modbus.constants import (
ADDR_CALIBRATION_RESERVED_END,
ADDR_CALIBRATION_RESERVED_START,
ADDR_CORRECTION,
ADDR_CURRENT_POSE,
ADDR_EXTENSION_RESERVED_1_END,
ADDR_EXTENSION_RESERVED_1_START,
ADDR_EXTENSION_RESERVED_2_END,
ADDR_EXTENSION_RESERVED_2_START,
ADDR_EXTENSION_RESERVED_3_END,
ADDR_EXTENSION_RESERVED_3_START,
ADDR_EXTENSION_RESERVED_4_END,
ADDR_EXTENSION_RESERVED_4_START,
ADDR_TARGET_POSE,
REGISTER_COUNT_CALIBRATION_RESERVED,
REGISTER_COUNT_EXTENSION_RESERVED,
REGISTER_COUNT_POSE,
)
def test_calibration_reserved_range_matches_protocol() -> None:
assert ADDR_CALIBRATION_RESERVED_START == 0xD02E
assert ADDR_CALIBRATION_RESERVED_END == 0xD04D
assert ADDR_CALIBRATION_RESERVED_START + REGISTER_COUNT_CALIBRATION_RESERVED - 1 == 0xD04D
assert ADDR_CALIBRATION_RESERVED_START == 0xD04C
assert ADDR_CALIBRATION_RESERVED_END == 0xD06B
assert ADDR_CALIBRATION_RESERVED_START + REGISTER_COUNT_CALIBRATION_RESERVED - 1 == 0xD06B
def test_v12_pose_ranges_match_protocol() -> None:
assert ADDR_CURRENT_POSE == 0xD00A
assert ADDR_TARGET_POSE == 0xD020
assert ADDR_CORRECTION == 0xD036
assert REGISTER_COUNT_POSE == 14
def test_v12_extension_reserved_ranges_match_protocol() -> None:
ranges = [
(ADDR_EXTENSION_RESERVED_1_START, ADDR_EXTENSION_RESERVED_1_END, 0xD002, 0xD009),
(ADDR_EXTENSION_RESERVED_2_START, ADDR_EXTENSION_RESERVED_2_END, 0xD018, 0xD01F),
(ADDR_EXTENSION_RESERVED_3_START, ADDR_EXTENSION_RESERVED_3_END, 0xD02E, 0xD035),
(ADDR_EXTENSION_RESERVED_4_START, ADDR_EXTENSION_RESERVED_4_END, 0xD044, 0xD04B),
]
for start, end, expected_start, expected_end in ranges:
assert start == expected_start
assert end == expected_end
assert end - start + 1 == REGISTER_COUNT_EXTENSION_RESERVED
+1
View File
@@ -17,6 +17,7 @@ def test_polling_runner_writes_tracking_correction() -> None:
snapshot = PollingRunner(client, correction_provider=pose_delta(target)).run_once()
assert snapshot.status is DeviceStatus.TRACKING_OK
assert snapshot.timestamp == 0
assert backend.correction() == Pose6D(1.0, 2.0, 3.0, 0.0, -1.0, 3.0)