feat: 更新代码到 v1.3

This commit is contained in:
2026-06-18 17:24:59 +08:00
Unverified
parent 27237ee149
commit 6e80d5fa41
14 changed files with 143 additions and 32 deletions
+1
View File
@@ -25,6 +25,7 @@ python -m uv run ruff check
```powershell
python -m uv run line-laser-modbus --simulate read-status
python -m uv run line-laser-modbus --simulate write-mode 3
python -m uv run line-laser-modbus --simulate emergency-stop
python -m uv run line-laser-modbus --simulate poll-once --target 1 2 3 0 1 2
python -m uv run line-laser-modbus --simulate demo
```
+6
View File
@@ -33,6 +33,12 @@
- “示教完成可切待机、在线跟踪、批量复现”调整为“示教完成可切空闲、在线跟踪、批量复现”。
- 状态机规则序号由 6 条调整为 4 条。
### 代码适配要点
- 普通模式切换按第 4.1 节限定为 `0~4`,急停值 `5` 仅作为专用急停写入命令保留。
- 手动示教模式下轮询只读取模式命令字和设备状态字,不读取当前位姿寄存器。
- 在线全轴跟踪模式和轨迹批量复现模式均允许由上层纠偏策略写入 6 轴纠偏量。
## V1.2 更新(2026-05-25
基于 `docs/线激光视觉设备与运动控制器 Modbus RTU 通信协议_V1.2.docx` 更新 `docs/proto.md`
+10 -2
View File
@@ -7,7 +7,7 @@ from pathlib import Path
from line_laser_modbus.client import LineLaserClient
from line_laser_modbus.config import AppConfig
from line_laser_modbus.models import ModeCommand, Pose6D
from line_laser_modbus.models import NORMAL_MODE_COMMANDS, ModeCommand, Pose6D
from line_laser_modbus.runner import PollingRunner, pose_delta
from line_laser_modbus.simulator import SimulatedModbusBackend
@@ -22,7 +22,12 @@ def main() -> None:
subparsers.add_parser("read-status")
write_mode = subparsers.add_parser("write-mode")
write_mode.add_argument("mode", type=int, choices=[mode.value for mode in ModeCommand])
write_mode.add_argument(
"mode",
type=int,
choices=sorted(mode.value for mode in NORMAL_MODE_COMMANDS),
)
subparsers.add_parser("emergency-stop")
poll = subparsers.add_parser("poll-once")
poll.add_argument("--target", nargs=6, type=float, metavar=("X", "Y", "Z", "A", "B", "C"))
subparsers.add_parser("demo")
@@ -37,6 +42,9 @@ def main() -> None:
elif args.command == "write-mode":
client.write_mode(args.mode)
print(client.read_mode().name)
elif args.command == "emergency-stop":
client.trigger_emergency_stop()
print("EMERGENCY_STOP")
elif args.command == "poll-once":
target = Pose6D.from_iterable(args.target) if args.target else Pose6D.zeros()
snapshot = PollingRunner(
+18 -4
View File
@@ -24,6 +24,7 @@ from line_laser_modbus.constants import (
REGISTER_COUNT_POSE,
)
from line_laser_modbus.models import (
NORMAL_MODE_COMMANDS,
DeviceSnapshot,
DeviceStatus,
ModeCommand,
@@ -108,7 +109,16 @@ class LineLaserClient:
def write_mode(self, mode: int | ModeCommand) -> None:
"""不做状态机校验直接写入模式命令字"""
self._write_registers(ADDR_MODE_COMMAND, [encode_u16(ensure_mode(mode).value)])
mode_command = ensure_mode(mode)
if mode_command not in NORMAL_MODE_COMMANDS:
msg = "Mode command 5 is reserved for emergency stop; use trigger_emergency_stop()"
raise ValueError(msg)
self._write_registers(ADDR_MODE_COMMAND, [encode_u16(mode_command.value)])
def trigger_emergency_stop(self) -> None:
"""按 V1.3 示例帧向模式命令字写入急停值 5"""
self._write_registers(ADDR_MODE_COMMAND, [encode_u16(ModeCommand.EMERGENCY_STOP.value)])
def switch_mode(self, mode: int | ModeCommand) -> None:
"""按协议状态机规则校验后切换模式"""
@@ -137,11 +147,15 @@ class LineLaserClient:
def read_snapshot(self) -> DeviceSnapshot:
"""读取模式状态和当前位姿组成一次逻辑快照"""
# 快照按协议关键字段顺序读取避免上层重复拼装状态
mode = self.read_mode()
status = self.read_status()
if mode is ModeCommand.MANUAL_TEACHING or status is DeviceStatus.EMERGENCY_TRIGGERED:
return DeviceSnapshot(mode, status, Pose6D.zeros(), 0)
timed_pose = self.read_current_timed_pose()
return DeviceSnapshot(
self.read_mode(),
self.read_status(),
mode,
status,
timed_pose.pose,
timed_pose.timestamp,
)
+2 -2
View File
@@ -71,7 +71,7 @@ def decode_f32(registers: list[int] | tuple[int, int]) -> float:
def encode_pose(pose: Pose6D) -> list[int]:
"""将 XYZABC 数据按 V1.2 编码为时间戳 0 加十二个轴寄存器"""
"""将 XYZABC 数据编码为时间戳 0 加十二个轴寄存器"""
return encode_timed_pose(TimedPose6D.from_pose(pose))
@@ -113,7 +113,7 @@ def decode_timed_pose(registers: list[int] | tuple[int, ...]) -> TimedPose6D:
def decode_pose(registers: list[int] | tuple[int, ...]) -> Pose6D:
""" V1.2 时间戳加六轴数据块解析为 XYZABC 数据"""
"""将时间戳加六轴数据块解析为 XYZABC 数据"""
return decode_timed_pose(registers).pose
+18 -8
View File
@@ -10,6 +10,7 @@ from typing import Self
class ModeCommand(IntEnum):
"""0xD000 模式命令字"""
MANUAL_TEACHING = 0
STANDBY_RESET = 0
CALIBRATION = 1
PRE_WELD_TEACHING = 2
@@ -21,6 +22,7 @@ class ModeCommand(IntEnum):
class DeviceStatus(IntEnum):
"""0xD001 设备状态字"""
IDLE = 0
STANDBY_READY = 0
RUNNING = 1
TEACHING_DONE = 2
@@ -30,6 +32,17 @@ class DeviceStatus(IntEnum):
EMERGENCY_TRIGGERED = 6
NORMAL_MODE_COMMANDS = frozenset(
{
ModeCommand.MANUAL_TEACHING,
ModeCommand.CALIBRATION,
ModeCommand.PRE_WELD_TEACHING,
ModeCommand.ONLINE_TRACKING,
ModeCommand.TRAJECTORY_REPLAY,
}
)
@dataclass(frozen=True, slots=True)
class DeviceSnapshot:
"""单次逻辑周期读取到的控制器状态"""
@@ -74,7 +87,7 @@ class Pose6D:
@dataclass(frozen=True, slots=True)
class TimedPose6D:
"""V1.2 协议中的时间戳加 XYZABC 数据块"""
"""协议中的时间戳加 XYZABC 数据块"""
timestamp: int
pose: Pose6D
@@ -117,22 +130,19 @@ def can_switch_mode(
target_mode = ensure_mode(target)
current_status = ensure_status(status) if status is not None else None
if target_mode in {ModeCommand.STANDBY_RESET, ModeCommand.EMERGENCY_STOP}:
return True
if current_mode is ModeCommand.EMERGENCY_STOP:
if target_mode not in NORMAL_MODE_COMMANDS:
return False
if current_status is DeviceStatus.CALIBRATION_DONE:
# V1.2 状态机要求标定完成后回待机,不直接进入运行模式。
return False
return target_mode is ModeCommand.MANUAL_TEACHING
if current_status is DeviceStatus.TEACHING_DONE:
# 示教完成后可进入依赖标准轨迹的运行模式。
return target_mode in {
ModeCommand.MANUAL_TEACHING,
ModeCommand.ONLINE_TRACKING,
ModeCommand.TRAJECTORY_REPLAY,
}
if current_mode in {ModeCommand.ONLINE_TRACKING, ModeCommand.TRAJECTORY_REPLAY}:
return target_mode in {ModeCommand.ONLINE_TRACKING, ModeCommand.TRAJECTORY_REPLAY}
return target_mode in {current_mode, ModeCommand.CALIBRATION, ModeCommand.PRE_WELD_TEACHING}
return True
def validate_mode_switch(
+2 -1
View File
@@ -12,6 +12,7 @@ from line_laser_modbus.models import DeviceSnapshot, ModeCommand, Pose6D
CorrectionProvider = Callable[[DeviceSnapshot], Pose6D]
SnapshotHandler = Callable[[DeviceSnapshot], None]
CORRECTION_MODES = frozenset({ModeCommand.ONLINE_TRACKING, ModeCommand.TRAJECTORY_REPLAY})
def pose_delta(target: Pose6D) -> CorrectionProvider:
@@ -60,7 +61,7 @@ class PollingRunner:
self.timeout_count = 0
if self.snapshot_handler:
self.snapshot_handler(snapshot)
if snapshot.mode is ModeCommand.ONLINE_TRACKING and self.correction_provider:
if snapshot.mode in CORRECTION_MODES and self.correction_provider:
self.client.write_correction(self.correction_provider(snapshot))
return snapshot
+2 -2
View File
@@ -48,8 +48,8 @@ class SimulatedModbusBackend:
self,
*,
slave_id: int = SLAVE_ID,
mode: ModeCommand = ModeCommand.STANDBY_RESET,
status: DeviceStatus = DeviceStatus.STANDBY_READY,
mode: ModeCommand = ModeCommand.MANUAL_TEACHING,
status: DeviceStatus = DeviceStatus.IDLE,
current_pose: Pose6D | None = None,
) -> None:
"""创建模拟后端并写入初始寄存器值"""
+43
View File
@@ -4,12 +4,24 @@ from line_laser_modbus.client import LineLaserClient
from line_laser_modbus.config import SerialConfig
from line_laser_modbus.constants import (
ADDR_CORRECTION,
ADDR_CURRENT_POSE,
ADDR_MODE_COMMAND,
ADDR_TARGET_POSE,
)
from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D, TimedPose6D
from line_laser_modbus.simulator import SimulatedModbusBackend
class CountingBackend(SimulatedModbusBackend):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.read_addresses: list[int] = []
def read_holding_registers(self, address: int, *, count: int, device_id: int):
self.read_addresses.append(address)
return super().read_holding_registers(address, count=count, device_id=device_id)
def test_client_reads_seeded_status_and_pose_from_simulator() -> None:
pose = Pose6D(10.0, 20.0, 30.0, 1.0, 2.0, 3.0)
backend = SimulatedModbusBackend(
@@ -37,6 +49,37 @@ def test_client_writes_mode_and_correction_to_simulator() -> None:
assert backend.correction() == correction
def test_client_rejects_emergency_stop_as_normal_mode_write() -> None:
backend = SimulatedModbusBackend()
with (
pytest.raises(ValueError, match="reserved for emergency stop"),
LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client,
):
client.write_mode(ModeCommand.EMERGENCY_STOP)
def test_client_triggers_emergency_stop_as_special_command() -> None:
backend = SimulatedModbusBackend()
with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client:
client.trigger_emergency_stop()
assert backend.registers[ADDR_MODE_COMMAND] == ModeCommand.EMERGENCY_STOP.value
def test_snapshot_skips_pose_read_in_manual_teaching_mode() -> None:
backend = CountingBackend(mode=ModeCommand.MANUAL_TEACHING, status=DeviceStatus.IDLE)
with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client:
snapshot = client.read_snapshot()
assert snapshot.mode is ModeCommand.MANUAL_TEACHING
assert snapshot.status is DeviceStatus.IDLE
assert snapshot.pose == Pose6D.zeros()
assert ADDR_CURRENT_POSE not in backend.read_addresses
def test_client_writes_timed_target_pose_to_simulator() -> None:
backend = SimulatedModbusBackend()
target = TimedPose6D(1234, Pose6D(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))
+2 -2
View File
@@ -32,12 +32,12 @@ def test_timed_pose_register_roundtrip() -> None:
assert decode_timed_pose(registers) == data
def test_current_pose_read_frame_matches_v12_example() -> None:
def test_current_pose_read_frame_matches_protocol_example() -> None:
frame = build_read_frame(0xD00A, 14)
assert frame.hex(" ").upper() == "08 03 D0 0A 00 0E DC 55"
def test_correction_write_frame_matches_v12_docx_example() -> None:
def test_correction_write_frame_matches_protocol_docx_example() -> None:
data = TimedPose6D(1000, Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0))
frame = build_write_frame(0xD036, encode_timed_pose(data), slave_id=0x01)
+1 -1
View File
@@ -25,7 +25,7 @@ max_timeouts = 5
assert config.polling.max_timeouts == 5
def test_default_config_matches_v12_timing() -> None:
def test_default_config_matches_protocol_timing() -> None:
config = AppConfig()
assert config.serial.timeout == 0.15
assert config.polling.interval_seconds == 0.05
+2 -2
View File
@@ -24,14 +24,14 @@ def test_calibration_reserved_range_matches_protocol() -> None:
assert ADDR_CALIBRATION_RESERVED_START + REGISTER_COUNT_CALIBRATION_RESERVED - 1 == 0xD06B
def test_v12_pose_ranges_match_protocol() -> None:
def test_pose_ranges_match_protocol() -> None:
assert ADDR_CURRENT_POSE == 0xD00A
assert ADDR_TARGET_POSE == 0xD020
assert ADDR_CORRECTION == 0xD036
assert REGISTER_COUNT_POSE == 14
def test_v12_extension_reserved_ranges_match_protocol() -> None:
def test_extension_reserved_ranges_match_protocol() -> None:
ranges = [
(ADDR_EXTENSION_RESERVED_1_START, ADDR_EXTENSION_RESERVED_1_END, 0xD002, 0xD009),
(ADDR_EXTENSION_RESERVED_2_START, ADDR_EXTENSION_RESERVED_2_END, 0xD018, 0xD01F),
+15 -1
View File
@@ -21,7 +21,21 @@ def test_polling_runner_writes_tracking_correction() -> None:
assert backend.correction() == Pose6D(1.0, 2.0, 3.0, 0.0, -1.0, 3.0)
def test_polling_runner_does_not_write_correction_outside_tracking() -> None:
def test_polling_runner_writes_replay_correction() -> None:
backend = SimulatedModbusBackend(
mode=ModeCommand.TRAJECTORY_REPLAY,
status=DeviceStatus.RUNNING,
current_pose=Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0),
)
target = Pose6D(2.0, 4.0, 6.0, 0.0, 0.0, 5.0)
with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client:
PollingRunner(client, correction_provider=pose_delta(target)).run_once()
assert backend.correction() == Pose6D(1.0, 2.0, 3.0, 0.0, -1.0, 3.0)
def test_polling_runner_does_not_write_correction_outside_correction_modes() -> None:
backend = SimulatedModbusBackend(current_pose=Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0))
with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client:
+21 -7
View File
@@ -1,6 +1,7 @@
import pytest
from line_laser_modbus.models import (
NORMAL_MODE_COMMANDS,
DeviceStatus,
ModeCommand,
can_switch_mode,
@@ -8,13 +9,20 @@ from line_laser_modbus.models import (
)
def test_emergency_can_only_return_to_standby_or_stay_emergency() -> None:
assert can_switch_mode(ModeCommand.EMERGENCY_STOP, ModeCommand.STANDBY_RESET)
assert can_switch_mode(ModeCommand.EMERGENCY_STOP, ModeCommand.EMERGENCY_STOP)
assert not can_switch_mode(ModeCommand.EMERGENCY_STOP, ModeCommand.ONLINE_TRACKING)
def test_normal_mode_commands_match_v13_range() -> None:
assert {mode.value for mode in NORMAL_MODE_COMMANDS} == {0, 1, 2, 3, 4}
def test_emergency_stop_is_not_a_normal_mode_switch_target() -> None:
assert not can_switch_mode(ModeCommand.MANUAL_TEACHING, ModeCommand.EMERGENCY_STOP)
def test_teaching_done_can_enter_tracking_or_replay() -> None:
assert can_switch_mode(
ModeCommand.PRE_WELD_TEACHING,
ModeCommand.MANUAL_TEACHING,
DeviceStatus.TEACHING_DONE,
)
assert can_switch_mode(
ModeCommand.PRE_WELD_TEACHING,
ModeCommand.ONLINE_TRACKING,
@@ -27,10 +35,10 @@ def test_teaching_done_can_enter_tracking_or_replay() -> None:
)
def test_calibration_done_waits_for_controller_to_return_standby() -> None:
def test_calibration_done_returns_to_idle_command() -> None:
assert can_switch_mode(
ModeCommand.CALIBRATION,
ModeCommand.STANDBY_RESET,
ModeCommand.MANUAL_TEACHING,
DeviceStatus.CALIBRATION_DONE,
)
assert not can_switch_mode(
@@ -45,6 +53,12 @@ def test_calibration_done_waits_for_controller_to_return_standby() -> None:
)
def test_tracking_and_replay_only_switch_to_each_other() -> None:
assert can_switch_mode(ModeCommand.ONLINE_TRACKING, ModeCommand.TRAJECTORY_REPLAY)
assert can_switch_mode(ModeCommand.TRAJECTORY_REPLAY, ModeCommand.ONLINE_TRACKING)
assert not can_switch_mode(ModeCommand.ONLINE_TRACKING, ModeCommand.MANUAL_TEACHING)
def test_invalid_transition_raises() -> None:
with pytest.raises(ValueError, match="Illegal mode switch"):
validate_mode_switch(ModeCommand.EMERGENCY_STOP, ModeCommand.ONLINE_TRACKING)
validate_mode_switch(ModeCommand.MANUAL_TEACHING, ModeCommand.EMERGENCY_STOP)