From 866814e94e8192f50b3e6a64df525114722eb194 Mon Sep 17 00:00:00 2001 From: chuan Date: Thu, 28 May 2026 12:20:10 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=A0=B9=E6=8D=AEv1.2=E7=9A=84?= =?UTF-8?q?=E6=96=87=E6=A1=A3=E5=AE=8C=E6=88=90=E4=BB=A3=E7=A0=81=E8=BF=81?= =?UTF-8?q?=E7=A7=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 6 +-- TODO.md | 2 +- config.toml | 4 +- src/line_laser_modbus/__init__.py | 3 +- src/line_laser_modbus/client.py | 41 +++++++++++++++---- src/line_laser_modbus/codec.py | 63 +++++++++++++++++++++++++++--- src/line_laser_modbus/config.py | 3 +- src/line_laser_modbus/constants.py | 24 +++++++++--- src/line_laser_modbus/models.py | 17 +++++++- src/line_laser_modbus/runner.py | 9 +++-- src/line_laser_modbus/simulator.py | 6 +-- tests/test_client_simulator.py | 33 +++++++++++++++- tests/test_codec.py | 28 ++++++++++++- tests/test_config.py | 6 +++ tests/test_constants.py | 39 ++++++++++++++++-- tests/test_runner.py | 1 + 16 files changed, 245 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 92c0496..e756a3f 100644 --- a/README.md +++ b/README.md @@ -7,9 +7,9 @@ ## 功能 - 基于 `pymodbus` 的 Modbus RTU 串口客户端 -- 按协议实现 uint16、float32、XYZABC 位姿和 CRC16 编解码 +- 按协议实现 uint16、uint32 时间戳、float32、XYZABC 位姿和 CRC16 编解码 - 提供无硬件内存模拟器用于测试 -- 提供模式状态机校验和 20ms 轮询运行器 +- 提供模式状态机校验和 50ms 轮询运行器 - 使用 TOML 作为运行配置入口 ## 安装与测试 @@ -52,7 +52,7 @@ python -m uv run python examples/real_serial_read_status.py python -m uv run line-laser-modbus read-status ``` -配置文件包含 `[serial]` 和 `[polling]` 两段,其中 `[polling]` 用于配置 20ms 轮询周期和连续超时判定次数。 +配置文件包含 `[serial]` 和 `[polling]` 两段,其中 `[polling]` 用于配置 50ms 轮询周期和连续超时判定次数。 ## 类型标记 diff --git a/TODO.md b/TODO.md index ea1a2c7..a5bda5b 100644 --- a/TODO.md +++ b/TODO.md @@ -6,7 +6,7 @@ - [ ] 增加 CLI 模拟模式测试,覆盖常用命令输出和参数校验。 - [ ] 增加轮询异常测试,覆盖连续超时、恢复计数和不可恢复错误。 - [ ] 增加真实串口硬件联调记录,确认状态字、当前位姿、模式写入、目标位姿和纠偏量读写。 -- [ ] 验证 20ms 轮询稳定性,记录实际通信耗时、超时次数和重试表现。 +- [ ] 验证 50ms 轮询稳定性,记录实际通信耗时、超时次数和重试表现。 - [ ] 完善发布元数据,包括 license、authors、classifiers 和 project URLs。 - [ ] 评估是否加入类型检查和覆盖率统计。 - [ ] 根据现场设备参数更新或补充配置示例。 diff --git a/config.toml b/config.toml index bc8045e..1607c62 100644 --- a/config.toml +++ b/config.toml @@ -5,9 +5,9 @@ baudrate = 115200 bytesize = 8 parity = "N" stopbits = 1 -timeout = 0.05 +timeout = 0.15 retries = 3 [polling] -interval_seconds = 0.02 +interval_seconds = 0.05 max_timeouts = 3 diff --git a/src/line_laser_modbus/__init__.py b/src/line_laser_modbus/__init__.py index ce2595c..c1adac8 100644 --- a/src/line_laser_modbus/__init__.py +++ b/src/line_laser_modbus/__init__.py @@ -2,7 +2,7 @@ from line_laser_modbus.client import LineLaserClient from line_laser_modbus.config import AppConfig, PollingConfig, SerialConfig -from line_laser_modbus.models import DeviceSnapshot, DeviceStatus, ModeCommand, Pose6D +from line_laser_modbus.models import DeviceSnapshot, DeviceStatus, ModeCommand, Pose6D, TimedPose6D from line_laser_modbus.runner import PollingRunner # 公开库调用时最常用的类型和入口。 @@ -16,4 +16,5 @@ __all__ = [ "PollingRunner", "Pose6D", "SerialConfig", + "TimedPose6D", ] diff --git a/src/line_laser_modbus/client.py b/src/line_laser_modbus/client.py index 24d85b7..5df3d06 100644 --- a/src/line_laser_modbus/client.py +++ b/src/line_laser_modbus/client.py @@ -8,7 +8,12 @@ from typing import Protocol, Self from pymodbus import ModbusException from pymodbus.client import ModbusSerialClient -from line_laser_modbus.codec import decode_pose, decode_u16, encode_pose, encode_u16 +from line_laser_modbus.codec import ( + decode_timed_pose, + decode_u16, + encode_timed_pose, + encode_u16, +) from line_laser_modbus.config import SerialConfig from line_laser_modbus.constants import ( ADDR_CORRECTION, @@ -23,6 +28,7 @@ from line_laser_modbus.models import ( DeviceStatus, ModeCommand, Pose6D, + TimedPose6D, ensure_mode, ensure_status, validate_mode_switch, @@ -120,24 +126,45 @@ class LineLaserClient: def read_current_pose(self) -> Pose6D: """读取控制器当前 XYZABC 位姿""" + return self.read_current_timed_pose().pose + + def read_current_timed_pose(self) -> TimedPose6D: + """读取控制器当前时间戳和 XYZABC 位姿""" + registers = self._read_registers(ADDR_CURRENT_POSE, REGISTER_COUNT_POSE) - return decode_pose(registers) + return decode_timed_pose(registers) def read_snapshot(self) -> DeviceSnapshot: """读取模式状态和当前位姿组成一次逻辑快照""" # 快照按协议关键字段顺序读取避免上层重复拼装状态 - return DeviceSnapshot(self.read_mode(), self.read_status(), self.read_current_pose()) + timed_pose = self.read_current_timed_pose() + return DeviceSnapshot( + self.read_mode(), + self.read_status(), + timed_pose.pose, + timed_pose.timestamp, + ) - def write_target_pose(self, pose: Pose6D) -> None: + def write_target_pose(self, pose: Pose6D, *, timestamp: int = 0) -> None: """写入示教目标 XYZABC 位姿""" - self._write_registers(ADDR_TARGET_POSE, encode_pose(pose)) + self.write_target_timed_pose(TimedPose6D.from_pose(pose, timestamp)) - def write_correction(self, pose: Pose6D) -> None: + def write_target_timed_pose(self, data: TimedPose6D) -> None: + """写入示教目标时间戳和 XYZABC 位姿""" + + self._write_registers(ADDR_TARGET_POSE, encode_timed_pose(data)) + + def write_correction(self, pose: Pose6D, *, timestamp: int = 0) -> None: """写入实时 XYZABC 纠偏量""" - self._write_registers(ADDR_CORRECTION, encode_pose(pose)) + self.write_timed_correction(TimedPose6D.from_pose(pose, timestamp)) + + def write_timed_correction(self, data: TimedPose6D) -> None: + """写入实时时间戳和 XYZABC 纠偏量""" + + self._write_registers(ADDR_CORRECTION, encode_timed_pose(data)) def _read_word(self, address: int) -> int: """从绝对地址读取单个 uint16 寄存器""" diff --git a/src/line_laser_modbus/codec.py b/src/line_laser_modbus/codec.py index 86924b9..e708eb4 100644 --- a/src/line_laser_modbus/codec.py +++ b/src/line_laser_modbus/codec.py @@ -8,10 +8,12 @@ from line_laser_modbus.constants import ( AXIS_NAMES, FUNC_READ_HOLDING_REGISTERS, FUNC_WRITE_MULTIPLE_REGISTERS, + REGISTER_COUNT_AXES, REGISTER_COUNT_POSE, + REGISTER_COUNT_TIMESTAMP, SLAVE_ID, ) -from line_laser_modbus.models import Pose6D +from line_laser_modbus.models import Pose6D, TimedPose6D def encode_u16(value: int) -> int: @@ -29,6 +31,26 @@ def decode_u16(register: int) -> int: return encode_u16(register) +def encode_u32(value: int) -> list[int]: + """将一个 uint32 编码为两个大端寄存器""" + + if not 0 <= value <= 0xFFFFFFFF: + msg = f"Value out of uint32 range: {value}" + raise ValueError(msg) + high, low = struct.unpack(">HH", struct.pack(">I", value)) + return [high, low] + + +def decode_u32(registers: list[int] | tuple[int, int]) -> int: + """将两个大端寄存器解析为一个 uint32""" + + if len(registers) != REGISTER_COUNT_TIMESTAMP: + msg = f"uint32 requires 2 registers, got {len(registers)}" + raise ValueError(msg) + raw = struct.pack(">HH", encode_u16(registers[0]), encode_u16(registers[1])) + return struct.unpack(">I", raw)[0] + + def encode_f32(value: float) -> list[int]: """将一个 float32 编码为两个大端寄存器""" @@ -49,7 +71,13 @@ def decode_f32(registers: list[int] | tuple[int, int]) -> float: def encode_pose(pose: Pose6D) -> list[int]: - """将 XYZABC 位姿编码为十二个保持寄存器""" + """将 XYZABC 数据按 V1.2 编码为时间戳 0 加十二个轴寄存器""" + + return encode_timed_pose(TimedPose6D.from_pose(pose)) + + +def encode_axes(pose: Pose6D) -> list[int]: + """将 XYZABC 六轴数据编码为十二个保持寄存器""" registers: list[int] = [] for value in pose.as_tuple(): @@ -57,16 +85,39 @@ def encode_pose(pose: Pose6D) -> list[int]: return registers -def decode_pose(registers: list[int] | tuple[int, ...]) -> Pose6D: - """将十二个保持寄存器解析为 XYZABC 位姿""" +def decode_axes(registers: list[int] | tuple[int, ...]) -> Pose6D: + """将十二个轴寄存器解析为 XYZABC 数据""" - if len(registers) != REGISTER_COUNT_POSE: - msg = f"Pose requires {REGISTER_COUNT_POSE} registers, got {len(registers)}" + if len(registers) != REGISTER_COUNT_AXES: + msg = f"Pose axes require {REGISTER_COUNT_AXES} registers, got {len(registers)}" raise ValueError(msg) values = [decode_f32(registers[index : index + 2]) for index in range(0, len(registers), 2)] return Pose6D(**dict(zip(AXIS_NAMES, values, strict=True))) +def encode_timed_pose(data: TimedPose6D) -> list[int]: + """将时间戳和 XYZABC 编码为十四个保持寄存器""" + + return [*encode_u32(data.timestamp), *encode_axes(data.pose)] + + +def decode_timed_pose(registers: list[int] | tuple[int, ...]) -> TimedPose6D: + """将十四个保持寄存器解析为时间戳和 XYZABC 数据""" + + if len(registers) != REGISTER_COUNT_POSE: + msg = f"Timed pose requires {REGISTER_COUNT_POSE} registers, got {len(registers)}" + raise ValueError(msg) + timestamp = decode_u32(registers[:REGISTER_COUNT_TIMESTAMP]) + pose = decode_axes(registers[REGISTER_COUNT_TIMESTAMP:]) + return TimedPose6D(timestamp, pose) + + +def decode_pose(registers: list[int] | tuple[int, ...]) -> Pose6D: + """将 V1.2 时间戳加六轴数据块解析为 XYZABC 数据""" + + return decode_timed_pose(registers).pose + + def crc16(data: bytes) -> int: """计算原始帧字节的 Modbus RTU CRC16""" diff --git a/src/line_laser_modbus/config.py b/src/line_laser_modbus/config.py index 0e07481..0d006ac 100644 --- a/src/line_laser_modbus/config.py +++ b/src/line_laser_modbus/config.py @@ -10,6 +10,7 @@ from line_laser_modbus.constants import ( DEFAULT_BAUDRATE, DEFAULT_BYTESIZE, DEFAULT_PARITY, + DEFAULT_POLLING_INTERVAL_SECONDS, DEFAULT_RETRIES, DEFAULT_STOPBITS, DEFAULT_TIMEOUT_SECONDS, @@ -44,7 +45,7 @@ class SerialConfig: class PollingConfig: """轮询运行配置""" - interval_seconds: float = 0.02 + interval_seconds: float = DEFAULT_POLLING_INTERVAL_SECONDS max_timeouts: int = 3 diff --git a/src/line_laser_modbus/constants.py b/src/line_laser_modbus/constants.py index e1ba588..88e2668 100644 --- a/src/line_laser_modbus/constants.py +++ b/src/line_laser_modbus/constants.py @@ -9,15 +9,26 @@ FUNC_WRITE_MULTIPLE_REGISTERS = 0x10 # 保持寄存器绝对地址。 ADDR_MODE_COMMAND = 0xD000 ADDR_DEVICE_STATUS = 0xD001 +ADDR_EXTENSION_RESERVED_1_START = 0xD002 +ADDR_EXTENSION_RESERVED_1_END = 0xD009 ADDR_CURRENT_POSE = 0xD00A -ADDR_TARGET_POSE = 0xD016 -ADDR_CORRECTION = 0xD022 -ADDR_CALIBRATION_RESERVED_START = 0xD02E -ADDR_CALIBRATION_RESERVED_END = 0xD04D +ADDR_EXTENSION_RESERVED_2_START = 0xD018 +ADDR_EXTENSION_RESERVED_2_END = 0xD01F +ADDR_TARGET_POSE = 0xD020 +ADDR_EXTENSION_RESERVED_3_START = 0xD02E +ADDR_EXTENSION_RESERVED_3_END = 0xD035 +ADDR_CORRECTION = 0xD036 +ADDR_EXTENSION_RESERVED_4_START = 0xD044 +ADDR_EXTENSION_RESERVED_4_END = 0xD04B +ADDR_CALIBRATION_RESERVED_START = 0xD04C +ADDR_CALIBRATION_RESERVED_END = 0xD06B # 协议数据宽度和位姿字段顺序。 REGISTER_COUNT_WORD = 1 -REGISTER_COUNT_POSE = 12 +REGISTER_COUNT_TIMESTAMP = 2 +REGISTER_COUNT_AXES = 12 +REGISTER_COUNT_POSE = 14 +REGISTER_COUNT_EXTENSION_RESERVED = 8 REGISTER_COUNT_CALIBRATION_RESERVED = 32 AXIS_NAMES = ("x", "y", "z", "a", "b", "c") @@ -26,5 +37,6 @@ DEFAULT_BAUDRATE = 115200 DEFAULT_BYTESIZE = 8 DEFAULT_PARITY = "N" DEFAULT_STOPBITS = 1 -DEFAULT_TIMEOUT_SECONDS = 0.05 +DEFAULT_TIMEOUT_SECONDS = 0.15 DEFAULT_RETRIES = 3 +DEFAULT_POLLING_INTERVAL_SECONDS = 0.05 diff --git a/src/line_laser_modbus/models.py b/src/line_laser_modbus/models.py index 33964bd..42b995e 100644 --- a/src/line_laser_modbus/models.py +++ b/src/line_laser_modbus/models.py @@ -37,6 +37,7 @@ class DeviceSnapshot: mode: ModeCommand status: DeviceStatus pose: Pose6D + timestamp: int = 0 @dataclass(frozen=True, slots=True) @@ -71,6 +72,20 @@ class Pose6D: return (self.x, self.y, self.z, self.a, self.b, self.c) +@dataclass(frozen=True, slots=True) +class TimedPose6D: + """V1.2 协议中的时间戳加 XYZABC 数据块""" + + timestamp: int + pose: Pose6D + + @classmethod + def from_pose(cls, pose: Pose6D, timestamp: int = 0) -> Self: + """从普通六轴数据创建带时间戳数据块""" + + return cls(timestamp, pose) + + def ensure_mode(value: int | ModeCommand) -> ModeCommand: """校验并转换模式命令字""" @@ -107,7 +122,7 @@ def can_switch_mode( if current_mode is ModeCommand.EMERGENCY_STOP: return False if current_status is DeviceStatus.CALIBRATION_DONE: - # 标定完成后由控制器内部回待机,不直接进入运行模式。 + # V1.2 状态机要求标定完成后回待机,不直接进入运行模式。 return False if current_status is DeviceStatus.TEACHING_DONE: # 示教完成后可进入依赖标准轨迹的运行模式。 diff --git a/src/line_laser_modbus/runner.py b/src/line_laser_modbus/runner.py index a8eb684..6eb9658 100644 --- a/src/line_laser_modbus/runner.py +++ b/src/line_laser_modbus/runner.py @@ -7,7 +7,7 @@ from collections.abc import Callable from line_laser_modbus.client import LineLaserClient from line_laser_modbus.config import PollingConfig -from line_laser_modbus.constants import DEFAULT_TIMEOUT_SECONDS +from line_laser_modbus.constants import DEFAULT_POLLING_INTERVAL_SECONDS, DEFAULT_TIMEOUT_SECONDS from line_laser_modbus.models import DeviceSnapshot, ModeCommand, Pose6D CorrectionProvider = Callable[[DeviceSnapshot], Pose6D] @@ -82,5 +82,8 @@ class PollingRunner: def default_polling_config() -> PollingConfig: """按协议默认超时时间生成轮询配置""" - max_timeouts = max(1, round(DEFAULT_TIMEOUT_SECONDS / 0.02)) - return PollingConfig(interval_seconds=0.02, max_timeouts=max_timeouts) + max_timeouts = max(1, round(DEFAULT_TIMEOUT_SECONDS / DEFAULT_POLLING_INTERVAL_SECONDS)) + return PollingConfig( + interval_seconds=DEFAULT_POLLING_INTERVAL_SECONDS, + max_timeouts=max_timeouts, + ) diff --git a/src/line_laser_modbus/simulator.py b/src/line_laser_modbus/simulator.py index 0d155c9..92f203c 100644 --- a/src/line_laser_modbus/simulator.py +++ b/src/line_laser_modbus/simulator.py @@ -4,7 +4,7 @@ from __future__ import annotations from dataclasses import dataclass -from line_laser_modbus.codec import encode_pose +from line_laser_modbus.codec import decode_pose, encode_pose from line_laser_modbus.constants import ( ADDR_CORRECTION, ADDR_CURRENT_POSE, @@ -105,9 +105,7 @@ class SimulatedModbusBackend: def _read_pose(self, address: int) -> Pose6D: """从指定地址读取一组模拟位姿寄存器""" - from line_laser_modbus.codec import decode_pose - - return decode_pose([self.registers.get(address + offset, 0) for offset in range(12)]) + return decode_pose([self.registers.get(address + offset, 0) for offset in range(14)]) def _ensure_ready(self, device_id: int) -> None: """检查连接状态和从站地址""" diff --git a/tests/test_client_simulator.py b/tests/test_client_simulator.py index 0b1cd9f..ad9ff4f 100644 --- a/tests/test_client_simulator.py +++ b/tests/test_client_simulator.py @@ -2,7 +2,11 @@ import pytest from line_laser_modbus.client import LineLaserClient from line_laser_modbus.config import SerialConfig -from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D +from line_laser_modbus.constants import ( + ADDR_CORRECTION, + ADDR_TARGET_POSE, +) +from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D, TimedPose6D from line_laser_modbus.simulator import SimulatedModbusBackend @@ -16,6 +20,9 @@ def test_client_reads_seeded_status_and_pose_from_simulator() -> None: with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client: assert client.read_status() is DeviceStatus.TRACKING_OK assert client.read_current_pose() == pose + timed_pose = client.read_current_timed_pose() + assert timed_pose.timestamp == 0 + assert timed_pose.pose == pose def test_client_writes_mode_and_correction_to_simulator() -> None: @@ -30,6 +37,30 @@ def test_client_writes_mode_and_correction_to_simulator() -> None: assert backend.correction() == correction +def test_client_writes_timed_target_pose_to_simulator() -> None: + backend = SimulatedModbusBackend() + target = TimedPose6D(1234, Pose6D(1.0, 2.0, 3.0, 4.0, 5.0, 6.0)) + + with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client: + client.write_target_timed_pose(target) + + assert backend.target_pose() == target.pose + assert backend.registers[ADDR_TARGET_POSE] == 0x0000 + assert backend.registers[ADDR_TARGET_POSE + 1] == 0x04D2 + + +def test_client_writes_timed_correction_to_simulator() -> None: + backend = SimulatedModbusBackend() + correction = TimedPose6D(1000, Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0)) + + with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client: + client.write_timed_correction(correction) + + assert backend.correction() == correction.pose + assert backend.registers[ADDR_CORRECTION] == 0x0000 + assert backend.registers[ADDR_CORRECTION + 1] == 0x03E8 + + def test_simulator_rejects_wrong_slave_id() -> None: backend = SimulatedModbusBackend() diff --git a/tests/test_codec.py b/tests/test_codec.py index ccd88dc..78122ed 100644 --- a/tests/test_codec.py +++ b/tests/test_codec.py @@ -2,9 +2,11 @@ from line_laser_modbus.codec import ( build_read_frame, build_write_frame, decode_pose, + decode_timed_pose, encode_pose, + encode_timed_pose, ) -from line_laser_modbus.models import Pose6D +from line_laser_modbus.models import Pose6D, TimedPose6D def test_read_frame_matches_readme_example() -> None: @@ -20,3 +22,27 @@ def test_mode_write_frame_matches_readme_example() -> None: def test_pose_float_register_roundtrip() -> None: pose = Pose6D(1.25, -2.5, 3.0, 0.0, 45.5, -90.0) assert decode_pose(encode_pose(pose)) == pose + + +def test_timed_pose_register_roundtrip() -> None: + data = TimedPose6D(1000, Pose6D(1.25, -2.5, 3.0, 0.0, 45.5, -90.0)) + registers = encode_timed_pose(data) + + assert len(registers) == 14 + assert decode_timed_pose(registers) == data + + +def test_current_pose_read_frame_matches_v12_example() -> None: + frame = build_read_frame(0xD00A, 14) + assert frame.hex(" ").upper() == "08 03 D0 0A 00 0E DC 55" + + +def test_correction_write_frame_matches_v12_docx_example() -> None: + data = TimedPose6D(1000, Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0)) + frame = build_write_frame(0xD036, encode_timed_pose(data), slave_id=0x01) + + assert ( + frame.hex(" ").upper() + == "01 10 D0 36 00 0E 1C 00 00 03 E8 3F 80 00 00 40 00 00 00 " + "40 40 00 00 00 00 00 00 3F 80 00 00 40 00 00 00 D0 72" + ) diff --git a/tests/test_config.py b/tests/test_config.py index 2acb98c..4b2f14d 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -23,3 +23,9 @@ max_timeouts = 5 assert config.serial.port == "COM9" assert config.polling.interval_seconds == 0.01 assert config.polling.max_timeouts == 5 + + +def test_default_config_matches_v12_timing() -> None: + config = AppConfig() + assert config.serial.timeout == 0.15 + assert config.polling.interval_seconds == 0.05 diff --git a/tests/test_constants.py b/tests/test_constants.py index 8239d27..6c23c13 100644 --- a/tests/test_constants.py +++ b/tests/test_constants.py @@ -1,11 +1,44 @@ from line_laser_modbus.constants import ( ADDR_CALIBRATION_RESERVED_END, ADDR_CALIBRATION_RESERVED_START, + ADDR_CORRECTION, + ADDR_CURRENT_POSE, + ADDR_EXTENSION_RESERVED_1_END, + ADDR_EXTENSION_RESERVED_1_START, + ADDR_EXTENSION_RESERVED_2_END, + ADDR_EXTENSION_RESERVED_2_START, + ADDR_EXTENSION_RESERVED_3_END, + ADDR_EXTENSION_RESERVED_3_START, + ADDR_EXTENSION_RESERVED_4_END, + ADDR_EXTENSION_RESERVED_4_START, + ADDR_TARGET_POSE, REGISTER_COUNT_CALIBRATION_RESERVED, + REGISTER_COUNT_EXTENSION_RESERVED, + REGISTER_COUNT_POSE, ) def test_calibration_reserved_range_matches_protocol() -> None: - assert ADDR_CALIBRATION_RESERVED_START == 0xD02E - assert ADDR_CALIBRATION_RESERVED_END == 0xD04D - assert ADDR_CALIBRATION_RESERVED_START + REGISTER_COUNT_CALIBRATION_RESERVED - 1 == 0xD04D + assert ADDR_CALIBRATION_RESERVED_START == 0xD04C + assert ADDR_CALIBRATION_RESERVED_END == 0xD06B + assert ADDR_CALIBRATION_RESERVED_START + REGISTER_COUNT_CALIBRATION_RESERVED - 1 == 0xD06B + + +def test_v12_pose_ranges_match_protocol() -> None: + assert ADDR_CURRENT_POSE == 0xD00A + assert ADDR_TARGET_POSE == 0xD020 + assert ADDR_CORRECTION == 0xD036 + assert REGISTER_COUNT_POSE == 14 + + +def test_v12_extension_reserved_ranges_match_protocol() -> None: + ranges = [ + (ADDR_EXTENSION_RESERVED_1_START, ADDR_EXTENSION_RESERVED_1_END, 0xD002, 0xD009), + (ADDR_EXTENSION_RESERVED_2_START, ADDR_EXTENSION_RESERVED_2_END, 0xD018, 0xD01F), + (ADDR_EXTENSION_RESERVED_3_START, ADDR_EXTENSION_RESERVED_3_END, 0xD02E, 0xD035), + (ADDR_EXTENSION_RESERVED_4_START, ADDR_EXTENSION_RESERVED_4_END, 0xD044, 0xD04B), + ] + for start, end, expected_start, expected_end in ranges: + assert start == expected_start + assert end == expected_end + assert end - start + 1 == REGISTER_COUNT_EXTENSION_RESERVED diff --git a/tests/test_runner.py b/tests/test_runner.py index 3360b43..3658f1d 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -17,6 +17,7 @@ def test_polling_runner_writes_tracking_correction() -> None: snapshot = PollingRunner(client, correction_provider=pose_delta(target)).run_once() assert snapshot.status is DeviceStatus.TRACKING_OK + assert snapshot.timestamp == 0 assert backend.correction() == Pose6D(1.0, 2.0, 3.0, 0.0, -1.0, 3.0)