diff --git a/README.md b/README.md index e756a3f..43d3e77 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ python -m uv run ruff check ```powershell python -m uv run line-laser-modbus --simulate read-status python -m uv run line-laser-modbus --simulate write-mode 3 +python -m uv run line-laser-modbus --simulate emergency-stop python -m uv run line-laser-modbus --simulate poll-once --target 1 2 3 0 1 2 python -m uv run line-laser-modbus --simulate demo ``` diff --git a/docs/changes.md b/docs/changes.md index 1eaf9aa..d9dc3e4 100644 --- a/docs/changes.md +++ b/docs/changes.md @@ -33,6 +33,12 @@ - “示教完成可切待机、在线跟踪、批量复现”调整为“示教完成可切空闲、在线跟踪、批量复现”。 - 状态机规则序号由 6 条调整为 4 条。 +### 代码适配要点 + +- 普通模式切换按第 4.1 节限定为 `0~4`,急停值 `5` 仅作为专用急停写入命令保留。 +- 手动示教模式下轮询只读取模式命令字和设备状态字,不读取当前位姿寄存器。 +- 在线全轴跟踪模式和轨迹批量复现模式均允许由上层纠偏策略写入 6 轴纠偏量。 + ## V1.2 更新(2026-05-25) 基于 `docs/线激光视觉设备与运动控制器 Modbus RTU 通信协议_V1.2.docx` 更新 `docs/proto.md`。 diff --git a/src/line_laser_modbus/cli.py b/src/line_laser_modbus/cli.py index 0e4c96e..fe25abb 100644 --- a/src/line_laser_modbus/cli.py +++ b/src/line_laser_modbus/cli.py @@ -7,7 +7,7 @@ from pathlib import Path from line_laser_modbus.client import LineLaserClient from line_laser_modbus.config import AppConfig -from line_laser_modbus.models import ModeCommand, Pose6D +from line_laser_modbus.models import NORMAL_MODE_COMMANDS, ModeCommand, Pose6D from line_laser_modbus.runner import PollingRunner, pose_delta from line_laser_modbus.simulator import SimulatedModbusBackend @@ -22,7 +22,12 @@ def main() -> None: subparsers.add_parser("read-status") write_mode = subparsers.add_parser("write-mode") - write_mode.add_argument("mode", type=int, choices=[mode.value for mode in ModeCommand]) + write_mode.add_argument( + "mode", + type=int, + choices=sorted(mode.value for mode in NORMAL_MODE_COMMANDS), + ) + subparsers.add_parser("emergency-stop") poll = subparsers.add_parser("poll-once") poll.add_argument("--target", nargs=6, type=float, metavar=("X", "Y", "Z", "A", "B", "C")) subparsers.add_parser("demo") @@ -37,6 +42,9 @@ def main() -> None: elif args.command == "write-mode": client.write_mode(args.mode) print(client.read_mode().name) + elif args.command == "emergency-stop": + client.trigger_emergency_stop() + print("EMERGENCY_STOP") elif args.command == "poll-once": target = Pose6D.from_iterable(args.target) if args.target else Pose6D.zeros() snapshot = PollingRunner( diff --git a/src/line_laser_modbus/client.py b/src/line_laser_modbus/client.py index 5df3d06..03518e4 100644 --- a/src/line_laser_modbus/client.py +++ b/src/line_laser_modbus/client.py @@ -24,6 +24,7 @@ from line_laser_modbus.constants import ( REGISTER_COUNT_POSE, ) from line_laser_modbus.models import ( + NORMAL_MODE_COMMANDS, DeviceSnapshot, DeviceStatus, ModeCommand, @@ -108,7 +109,16 @@ class LineLaserClient: def write_mode(self, mode: int | ModeCommand) -> None: """不做状态机校验直接写入模式命令字""" - self._write_registers(ADDR_MODE_COMMAND, [encode_u16(ensure_mode(mode).value)]) + mode_command = ensure_mode(mode) + if mode_command not in NORMAL_MODE_COMMANDS: + msg = "Mode command 5 is reserved for emergency stop; use trigger_emergency_stop()" + raise ValueError(msg) + self._write_registers(ADDR_MODE_COMMAND, [encode_u16(mode_command.value)]) + + def trigger_emergency_stop(self) -> None: + """按 V1.3 示例帧向模式命令字写入急停值 5""" + + self._write_registers(ADDR_MODE_COMMAND, [encode_u16(ModeCommand.EMERGENCY_STOP.value)]) def switch_mode(self, mode: int | ModeCommand) -> None: """按协议状态机规则校验后切换模式""" @@ -137,11 +147,15 @@ class LineLaserClient: def read_snapshot(self) -> DeviceSnapshot: """读取模式状态和当前位姿组成一次逻辑快照""" - # 快照按协议关键字段顺序读取避免上层重复拼装状态 + mode = self.read_mode() + status = self.read_status() + if mode is ModeCommand.MANUAL_TEACHING or status is DeviceStatus.EMERGENCY_TRIGGERED: + return DeviceSnapshot(mode, status, Pose6D.zeros(), 0) + timed_pose = self.read_current_timed_pose() return DeviceSnapshot( - self.read_mode(), - self.read_status(), + mode, + status, timed_pose.pose, timed_pose.timestamp, ) diff --git a/src/line_laser_modbus/codec.py b/src/line_laser_modbus/codec.py index e708eb4..afcd93c 100644 --- a/src/line_laser_modbus/codec.py +++ b/src/line_laser_modbus/codec.py @@ -71,7 +71,7 @@ def decode_f32(registers: list[int] | tuple[int, int]) -> float: def encode_pose(pose: Pose6D) -> list[int]: - """将 XYZABC 数据按 V1.2 编码为时间戳 0 加十二个轴寄存器""" + """将 XYZABC 数据编码为时间戳 0 加十二个轴寄存器""" return encode_timed_pose(TimedPose6D.from_pose(pose)) @@ -113,7 +113,7 @@ def decode_timed_pose(registers: list[int] | tuple[int, ...]) -> TimedPose6D: def decode_pose(registers: list[int] | tuple[int, ...]) -> Pose6D: - """将 V1.2 时间戳加六轴数据块解析为 XYZABC 数据""" + """将时间戳加六轴数据块解析为 XYZABC 数据""" return decode_timed_pose(registers).pose diff --git a/src/line_laser_modbus/models.py b/src/line_laser_modbus/models.py index 42b995e..5b40b25 100644 --- a/src/line_laser_modbus/models.py +++ b/src/line_laser_modbus/models.py @@ -10,6 +10,7 @@ from typing import Self class ModeCommand(IntEnum): """0xD000 模式命令字""" + MANUAL_TEACHING = 0 STANDBY_RESET = 0 CALIBRATION = 1 PRE_WELD_TEACHING = 2 @@ -21,6 +22,7 @@ class ModeCommand(IntEnum): class DeviceStatus(IntEnum): """0xD001 设备状态字""" + IDLE = 0 STANDBY_READY = 0 RUNNING = 1 TEACHING_DONE = 2 @@ -30,6 +32,17 @@ class DeviceStatus(IntEnum): EMERGENCY_TRIGGERED = 6 +NORMAL_MODE_COMMANDS = frozenset( + { + ModeCommand.MANUAL_TEACHING, + ModeCommand.CALIBRATION, + ModeCommand.PRE_WELD_TEACHING, + ModeCommand.ONLINE_TRACKING, + ModeCommand.TRAJECTORY_REPLAY, + } +) + + @dataclass(frozen=True, slots=True) class DeviceSnapshot: """单次逻辑周期读取到的控制器状态""" @@ -74,7 +87,7 @@ class Pose6D: @dataclass(frozen=True, slots=True) class TimedPose6D: - """V1.2 协议中的时间戳加 XYZABC 数据块""" + """协议中的时间戳加 XYZABC 数据块""" timestamp: int pose: Pose6D @@ -117,22 +130,19 @@ def can_switch_mode( target_mode = ensure_mode(target) current_status = ensure_status(status) if status is not None else None - if target_mode in {ModeCommand.STANDBY_RESET, ModeCommand.EMERGENCY_STOP}: - return True - if current_mode is ModeCommand.EMERGENCY_STOP: + if target_mode not in NORMAL_MODE_COMMANDS: return False if current_status is DeviceStatus.CALIBRATION_DONE: - # V1.2 状态机要求标定完成后回待机,不直接进入运行模式。 - return False + return target_mode is ModeCommand.MANUAL_TEACHING if current_status is DeviceStatus.TEACHING_DONE: - # 示教完成后可进入依赖标准轨迹的运行模式。 return target_mode in { + ModeCommand.MANUAL_TEACHING, ModeCommand.ONLINE_TRACKING, ModeCommand.TRAJECTORY_REPLAY, } if current_mode in {ModeCommand.ONLINE_TRACKING, ModeCommand.TRAJECTORY_REPLAY}: return target_mode in {ModeCommand.ONLINE_TRACKING, ModeCommand.TRAJECTORY_REPLAY} - return target_mode in {current_mode, ModeCommand.CALIBRATION, ModeCommand.PRE_WELD_TEACHING} + return True def validate_mode_switch( diff --git a/src/line_laser_modbus/runner.py b/src/line_laser_modbus/runner.py index 6eb9658..d295b01 100644 --- a/src/line_laser_modbus/runner.py +++ b/src/line_laser_modbus/runner.py @@ -12,6 +12,7 @@ from line_laser_modbus.models import DeviceSnapshot, ModeCommand, Pose6D CorrectionProvider = Callable[[DeviceSnapshot], Pose6D] SnapshotHandler = Callable[[DeviceSnapshot], None] +CORRECTION_MODES = frozenset({ModeCommand.ONLINE_TRACKING, ModeCommand.TRAJECTORY_REPLAY}) def pose_delta(target: Pose6D) -> CorrectionProvider: @@ -60,7 +61,7 @@ class PollingRunner: self.timeout_count = 0 if self.snapshot_handler: self.snapshot_handler(snapshot) - if snapshot.mode is ModeCommand.ONLINE_TRACKING and self.correction_provider: + if snapshot.mode in CORRECTION_MODES and self.correction_provider: self.client.write_correction(self.correction_provider(snapshot)) return snapshot diff --git a/src/line_laser_modbus/simulator.py b/src/line_laser_modbus/simulator.py index 92f203c..4a48039 100644 --- a/src/line_laser_modbus/simulator.py +++ b/src/line_laser_modbus/simulator.py @@ -48,8 +48,8 @@ class SimulatedModbusBackend: self, *, slave_id: int = SLAVE_ID, - mode: ModeCommand = ModeCommand.STANDBY_RESET, - status: DeviceStatus = DeviceStatus.STANDBY_READY, + mode: ModeCommand = ModeCommand.MANUAL_TEACHING, + status: DeviceStatus = DeviceStatus.IDLE, current_pose: Pose6D | None = None, ) -> None: """创建模拟后端并写入初始寄存器值""" diff --git a/tests/test_client_simulator.py b/tests/test_client_simulator.py index ad9ff4f..c0b049c 100644 --- a/tests/test_client_simulator.py +++ b/tests/test_client_simulator.py @@ -4,12 +4,24 @@ from line_laser_modbus.client import LineLaserClient from line_laser_modbus.config import SerialConfig from line_laser_modbus.constants import ( ADDR_CORRECTION, + ADDR_CURRENT_POSE, + ADDR_MODE_COMMAND, ADDR_TARGET_POSE, ) from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D, TimedPose6D from line_laser_modbus.simulator import SimulatedModbusBackend +class CountingBackend(SimulatedModbusBackend): + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self.read_addresses: list[int] = [] + + def read_holding_registers(self, address: int, *, count: int, device_id: int): + self.read_addresses.append(address) + return super().read_holding_registers(address, count=count, device_id=device_id) + + def test_client_reads_seeded_status_and_pose_from_simulator() -> None: pose = Pose6D(10.0, 20.0, 30.0, 1.0, 2.0, 3.0) backend = SimulatedModbusBackend( @@ -37,6 +49,37 @@ def test_client_writes_mode_and_correction_to_simulator() -> None: assert backend.correction() == correction +def test_client_rejects_emergency_stop_as_normal_mode_write() -> None: + backend = SimulatedModbusBackend() + + with ( + pytest.raises(ValueError, match="reserved for emergency stop"), + LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client, + ): + client.write_mode(ModeCommand.EMERGENCY_STOP) + + +def test_client_triggers_emergency_stop_as_special_command() -> None: + backend = SimulatedModbusBackend() + + with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client: + client.trigger_emergency_stop() + + assert backend.registers[ADDR_MODE_COMMAND] == ModeCommand.EMERGENCY_STOP.value + + +def test_snapshot_skips_pose_read_in_manual_teaching_mode() -> None: + backend = CountingBackend(mode=ModeCommand.MANUAL_TEACHING, status=DeviceStatus.IDLE) + + with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client: + snapshot = client.read_snapshot() + + assert snapshot.mode is ModeCommand.MANUAL_TEACHING + assert snapshot.status is DeviceStatus.IDLE + assert snapshot.pose == Pose6D.zeros() + assert ADDR_CURRENT_POSE not in backend.read_addresses + + def test_client_writes_timed_target_pose_to_simulator() -> None: backend = SimulatedModbusBackend() target = TimedPose6D(1234, Pose6D(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)) diff --git a/tests/test_codec.py b/tests/test_codec.py index 78122ed..72391c3 100644 --- a/tests/test_codec.py +++ b/tests/test_codec.py @@ -32,12 +32,12 @@ def test_timed_pose_register_roundtrip() -> None: assert decode_timed_pose(registers) == data -def test_current_pose_read_frame_matches_v12_example() -> None: +def test_current_pose_read_frame_matches_protocol_example() -> None: frame = build_read_frame(0xD00A, 14) assert frame.hex(" ").upper() == "08 03 D0 0A 00 0E DC 55" -def test_correction_write_frame_matches_v12_docx_example() -> None: +def test_correction_write_frame_matches_protocol_docx_example() -> None: data = TimedPose6D(1000, Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0)) frame = build_write_frame(0xD036, encode_timed_pose(data), slave_id=0x01) diff --git a/tests/test_config.py b/tests/test_config.py index 4b2f14d..fc490f7 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -25,7 +25,7 @@ max_timeouts = 5 assert config.polling.max_timeouts == 5 -def test_default_config_matches_v12_timing() -> None: +def test_default_config_matches_protocol_timing() -> None: config = AppConfig() assert config.serial.timeout == 0.15 assert config.polling.interval_seconds == 0.05 diff --git a/tests/test_constants.py b/tests/test_constants.py index 6c23c13..efc66a8 100644 --- a/tests/test_constants.py +++ b/tests/test_constants.py @@ -24,14 +24,14 @@ def test_calibration_reserved_range_matches_protocol() -> None: assert ADDR_CALIBRATION_RESERVED_START + REGISTER_COUNT_CALIBRATION_RESERVED - 1 == 0xD06B -def test_v12_pose_ranges_match_protocol() -> None: +def test_pose_ranges_match_protocol() -> None: assert ADDR_CURRENT_POSE == 0xD00A assert ADDR_TARGET_POSE == 0xD020 assert ADDR_CORRECTION == 0xD036 assert REGISTER_COUNT_POSE == 14 -def test_v12_extension_reserved_ranges_match_protocol() -> None: +def test_extension_reserved_ranges_match_protocol() -> None: ranges = [ (ADDR_EXTENSION_RESERVED_1_START, ADDR_EXTENSION_RESERVED_1_END, 0xD002, 0xD009), (ADDR_EXTENSION_RESERVED_2_START, ADDR_EXTENSION_RESERVED_2_END, 0xD018, 0xD01F), diff --git a/tests/test_runner.py b/tests/test_runner.py index 3658f1d..d12f776 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -21,7 +21,21 @@ def test_polling_runner_writes_tracking_correction() -> None: assert backend.correction() == Pose6D(1.0, 2.0, 3.0, 0.0, -1.0, 3.0) -def test_polling_runner_does_not_write_correction_outside_tracking() -> None: +def test_polling_runner_writes_replay_correction() -> None: + backend = SimulatedModbusBackend( + mode=ModeCommand.TRAJECTORY_REPLAY, + status=DeviceStatus.RUNNING, + current_pose=Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0), + ) + target = Pose6D(2.0, 4.0, 6.0, 0.0, 0.0, 5.0) + + with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client: + PollingRunner(client, correction_provider=pose_delta(target)).run_once() + + assert backend.correction() == Pose6D(1.0, 2.0, 3.0, 0.0, -1.0, 3.0) + + +def test_polling_runner_does_not_write_correction_outside_correction_modes() -> None: backend = SimulatedModbusBackend(current_pose=Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0)) with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client: diff --git a/tests/test_state_machine.py b/tests/test_state_machine.py index 8c2313c..718f46e 100644 --- a/tests/test_state_machine.py +++ b/tests/test_state_machine.py @@ -1,6 +1,7 @@ import pytest from line_laser_modbus.models import ( + NORMAL_MODE_COMMANDS, DeviceStatus, ModeCommand, can_switch_mode, @@ -8,13 +9,20 @@ from line_laser_modbus.models import ( ) -def test_emergency_can_only_return_to_standby_or_stay_emergency() -> None: - assert can_switch_mode(ModeCommand.EMERGENCY_STOP, ModeCommand.STANDBY_RESET) - assert can_switch_mode(ModeCommand.EMERGENCY_STOP, ModeCommand.EMERGENCY_STOP) - assert not can_switch_mode(ModeCommand.EMERGENCY_STOP, ModeCommand.ONLINE_TRACKING) +def test_normal_mode_commands_match_v13_range() -> None: + assert {mode.value for mode in NORMAL_MODE_COMMANDS} == {0, 1, 2, 3, 4} + + +def test_emergency_stop_is_not_a_normal_mode_switch_target() -> None: + assert not can_switch_mode(ModeCommand.MANUAL_TEACHING, ModeCommand.EMERGENCY_STOP) def test_teaching_done_can_enter_tracking_or_replay() -> None: + assert can_switch_mode( + ModeCommand.PRE_WELD_TEACHING, + ModeCommand.MANUAL_TEACHING, + DeviceStatus.TEACHING_DONE, + ) assert can_switch_mode( ModeCommand.PRE_WELD_TEACHING, ModeCommand.ONLINE_TRACKING, @@ -27,10 +35,10 @@ def test_teaching_done_can_enter_tracking_or_replay() -> None: ) -def test_calibration_done_waits_for_controller_to_return_standby() -> None: +def test_calibration_done_returns_to_idle_command() -> None: assert can_switch_mode( ModeCommand.CALIBRATION, - ModeCommand.STANDBY_RESET, + ModeCommand.MANUAL_TEACHING, DeviceStatus.CALIBRATION_DONE, ) assert not can_switch_mode( @@ -45,6 +53,12 @@ def test_calibration_done_waits_for_controller_to_return_standby() -> None: ) +def test_tracking_and_replay_only_switch_to_each_other() -> None: + assert can_switch_mode(ModeCommand.ONLINE_TRACKING, ModeCommand.TRAJECTORY_REPLAY) + assert can_switch_mode(ModeCommand.TRAJECTORY_REPLAY, ModeCommand.ONLINE_TRACKING) + assert not can_switch_mode(ModeCommand.ONLINE_TRACKING, ModeCommand.MANUAL_TEACHING) + + def test_invalid_transition_raises() -> None: with pytest.raises(ValueError, match="Illegal mode switch"): - validate_mode_switch(ModeCommand.EMERGENCY_STOP, ModeCommand.ONLINE_TRACKING) + validate_mode_switch(ModeCommand.MANUAL_TEACHING, ModeCommand.EMERGENCY_STOP)