diff --git a/src/line_laser_modbus/client.py b/src/line_laser_modbus/client.py index 03518e4..46ec9cd 100644 --- a/src/line_laser_modbus/client.py +++ b/src/line_laser_modbus/client.py @@ -16,6 +16,7 @@ from line_laser_modbus.codec import ( ) from line_laser_modbus.config import SerialConfig from line_laser_modbus.constants import ( + ADDR_AVAILABLE_CACHE_COUNT, ADDR_CORRECTION, ADDR_CURRENT_POSE, ADDR_DEVICE_STATUS, @@ -133,6 +134,11 @@ class LineLaserClient: return ensure_status(self._read_word(ADDR_DEVICE_STATUS)) + def read_available_cache_count(self) -> int: + """读取 0xD002 目标位姿可用缓存数量""" + + return self._read_word(ADDR_AVAILABLE_CACHE_COUNT) + def read_current_pose(self) -> Pose6D: """读取控制器当前 XYZABC 位姿""" @@ -168,6 +174,9 @@ class LineLaserClient: def write_target_timed_pose(self, data: TimedPose6D) -> None: """写入示教目标时间戳和 XYZABC 位姿""" + if self.read_available_cache_count() <= 0: + msg = "Target pose cache is full; available cache count at 0xD002 is 0" + raise RuntimeError(msg) self._write_registers(ADDR_TARGET_POSE, encode_timed_pose(data)) def write_correction(self, pose: Pose6D, *, timestamp: int = 0) -> None: diff --git a/src/line_laser_modbus/constants.py b/src/line_laser_modbus/constants.py index 88e2668..5fe16e3 100644 --- a/src/line_laser_modbus/constants.py +++ b/src/line_laser_modbus/constants.py @@ -9,7 +9,8 @@ FUNC_WRITE_MULTIPLE_REGISTERS = 0x10 # 保持寄存器绝对地址。 ADDR_MODE_COMMAND = 0xD000 ADDR_DEVICE_STATUS = 0xD001 -ADDR_EXTENSION_RESERVED_1_START = 0xD002 +ADDR_AVAILABLE_CACHE_COUNT = 0xD002 +ADDR_EXTENSION_RESERVED_1_START = 0xD003 ADDR_EXTENSION_RESERVED_1_END = 0xD009 ADDR_CURRENT_POSE = 0xD00A ADDR_EXTENSION_RESERVED_2_START = 0xD018 @@ -28,6 +29,7 @@ REGISTER_COUNT_WORD = 1 REGISTER_COUNT_TIMESTAMP = 2 REGISTER_COUNT_AXES = 12 REGISTER_COUNT_POSE = 14 +REGISTER_COUNT_EXTENSION_RESERVED_1 = 7 REGISTER_COUNT_EXTENSION_RESERVED = 8 REGISTER_COUNT_CALIBRATION_RESERVED = 32 AXIS_NAMES = ("x", "y", "z", "a", "b", "c") diff --git a/src/line_laser_modbus/simulator.py b/src/line_laser_modbus/simulator.py index 4a48039..3f0f069 100644 --- a/src/line_laser_modbus/simulator.py +++ b/src/line_laser_modbus/simulator.py @@ -4,8 +4,9 @@ from __future__ import annotations from dataclasses import dataclass -from line_laser_modbus.codec import decode_pose, encode_pose +from line_laser_modbus.codec import decode_pose, encode_pose, encode_u16 from line_laser_modbus.constants import ( + ADDR_AVAILABLE_CACHE_COUNT, ADDR_CORRECTION, ADDR_CURRENT_POSE, ADDR_DEVICE_STATUS, @@ -50,6 +51,7 @@ class SimulatedModbusBackend: slave_id: int = SLAVE_ID, mode: ModeCommand = ModeCommand.MANUAL_TEACHING, status: DeviceStatus = DeviceStatus.IDLE, + available_cache_count: int = 1, current_pose: Pose6D | None = None, ) -> None: """创建模拟后端并写入初始寄存器值""" @@ -57,7 +59,7 @@ class SimulatedModbusBackend: self.slave_id = slave_id self.connected = False self.registers: dict[int, int] = {} - self._seed(mode, status, current_pose or Pose6D.zeros()) + self._seed(mode, status, available_cache_count, current_pose or Pose6D.zeros()) def connect(self) -> bool: """标记模拟后端为已连接""" @@ -94,11 +96,18 @@ class SimulatedModbusBackend: return self._read_pose(ADDR_CORRECTION) - def _seed(self, mode: ModeCommand, status: DeviceStatus, pose: Pose6D) -> None: + def _seed( + self, + mode: ModeCommand, + status: DeviceStatus, + available_cache_count: int, + pose: Pose6D, + ) -> None: """写入模拟器初始模式状态和当前位姿""" self.registers[ADDR_MODE_COMMAND] = mode.value self.registers[ADDR_DEVICE_STATUS] = status.value + self.registers[ADDR_AVAILABLE_CACHE_COUNT] = encode_u16(available_cache_count) for offset, value in enumerate(encode_pose(pose)): self.registers[ADDR_CURRENT_POSE + offset] = value diff --git a/tests/test_client_simulator.py b/tests/test_client_simulator.py index c0b049c..2e1cc86 100644 --- a/tests/test_client_simulator.py +++ b/tests/test_client_simulator.py @@ -3,6 +3,7 @@ import pytest from line_laser_modbus.client import LineLaserClient from line_laser_modbus.config import SerialConfig from line_laser_modbus.constants import ( + ADDR_AVAILABLE_CACHE_COUNT, ADDR_CORRECTION, ADDR_CURRENT_POSE, ADDR_MODE_COMMAND, @@ -31,6 +32,7 @@ def test_client_reads_seeded_status_and_pose_from_simulator() -> None: with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client: assert client.read_status() is DeviceStatus.TRACKING_OK + assert client.read_available_cache_count() == 1 assert client.read_current_pose() == pose timed_pose = client.read_current_timed_pose() assert timed_pose.timestamp == 0 @@ -92,6 +94,20 @@ def test_client_writes_timed_target_pose_to_simulator() -> None: assert backend.registers[ADDR_TARGET_POSE + 1] == 0x04D2 +def test_client_rejects_target_pose_when_cache_is_full() -> None: + backend = SimulatedModbusBackend(available_cache_count=0) + target = TimedPose6D(1234, Pose6D(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)) + + with ( + pytest.raises(RuntimeError, match="Target pose cache is full"), + LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client, + ): + client.write_target_timed_pose(target) + + assert ADDR_TARGET_POSE not in backend.registers + assert backend.registers[ADDR_AVAILABLE_CACHE_COUNT] == 0 + + def test_client_writes_timed_correction_to_simulator() -> None: backend = SimulatedModbusBackend() correction = TimedPose6D(1000, Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0)) diff --git a/tests/test_constants.py b/tests/test_constants.py index efc66a8..afcc188 100644 --- a/tests/test_constants.py +++ b/tests/test_constants.py @@ -1,4 +1,5 @@ from line_laser_modbus.constants import ( + ADDR_AVAILABLE_CACHE_COUNT, ADDR_CALIBRATION_RESERVED_END, ADDR_CALIBRATION_RESERVED_START, ADDR_CORRECTION, @@ -14,6 +15,7 @@ from line_laser_modbus.constants import ( ADDR_TARGET_POSE, REGISTER_COUNT_CALIBRATION_RESERVED, REGISTER_COUNT_EXTENSION_RESERVED, + REGISTER_COUNT_EXTENSION_RESERVED_1, REGISTER_COUNT_POSE, ) @@ -25,6 +27,7 @@ def test_calibration_reserved_range_matches_protocol() -> None: def test_pose_ranges_match_protocol() -> None: + assert ADDR_AVAILABLE_CACHE_COUNT == 0xD002 assert ADDR_CURRENT_POSE == 0xD00A assert ADDR_TARGET_POSE == 0xD020 assert ADDR_CORRECTION == 0xD036 @@ -33,11 +36,15 @@ def test_pose_ranges_match_protocol() -> None: def test_extension_reserved_ranges_match_protocol() -> None: ranges = [ - (ADDR_EXTENSION_RESERVED_1_START, ADDR_EXTENSION_RESERVED_1_END, 0xD002, 0xD009), (ADDR_EXTENSION_RESERVED_2_START, ADDR_EXTENSION_RESERVED_2_END, 0xD018, 0xD01F), (ADDR_EXTENSION_RESERVED_3_START, ADDR_EXTENSION_RESERVED_3_END, 0xD02E, 0xD035), (ADDR_EXTENSION_RESERVED_4_START, ADDR_EXTENSION_RESERVED_4_END, 0xD044, 0xD04B), ] + assert ADDR_EXTENSION_RESERVED_1_START == 0xD003 + assert ADDR_EXTENSION_RESERVED_1_END == 0xD009 + assert ADDR_EXTENSION_RESERVED_1_END - ADDR_EXTENSION_RESERVED_1_START + 1 == ( + REGISTER_COUNT_EXTENSION_RESERVED_1 + ) for start, end, expected_start, expected_end in ranges: assert start == expected_start assert end == expected_end