From 1f86e6b573bd77c54d0e3d28940201c7820e3c71 Mon Sep 17 00:00:00 2001 From: chuan Date: Wed, 13 May 2026 14:05:35 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=8C=E6=88=90=E8=BD=AE=E8=AF=A2?= =?UTF-8?q?=E8=BF=90=E8=A1=8C=E5=99=A8=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/line_laser_modbus/__init__.py | 3 ++ src/line_laser_modbus/cli.py | 12 ++++- src/line_laser_modbus/runner.py | 78 +++++++++++++++++++++++++++++++ tests/test_runner.py | 33 +++++++++++++ 4 files changed, 125 insertions(+), 1 deletion(-) create mode 100644 src/line_laser_modbus/runner.py create mode 100644 tests/test_runner.py diff --git a/src/line_laser_modbus/__init__.py b/src/line_laser_modbus/__init__.py index aaa19e0..b19a6e2 100644 --- a/src/line_laser_modbus/__init__.py +++ b/src/line_laser_modbus/__init__.py @@ -3,12 +3,15 @@ from line_laser_modbus.client import LineLaserClient from line_laser_modbus.config import SerialConfig from line_laser_modbus.models import DeviceSnapshot, DeviceStatus, ModeCommand, Pose6D +from line_laser_modbus.runner import PollingConfig, PollingRunner __all__ = [ "DeviceSnapshot", "DeviceStatus", "LineLaserClient", "ModeCommand", + "PollingConfig", + "PollingRunner", "Pose6D", "SerialConfig", ] diff --git a/src/line_laser_modbus/cli.py b/src/line_laser_modbus/cli.py index bb0fafe..d8b7003 100644 --- a/src/line_laser_modbus/cli.py +++ b/src/line_laser_modbus/cli.py @@ -7,6 +7,7 @@ import argparse from line_laser_modbus.client import LineLaserClient from line_laser_modbus.config import SerialConfig from line_laser_modbus.models import ModeCommand, Pose6D +from line_laser_modbus.runner import PollingRunner, pose_delta from line_laser_modbus.simulator import SimulatedModbusBackend @@ -20,6 +21,8 @@ def main() -> None: subparsers.add_parser("read-status") write_mode = subparsers.add_parser("write-mode") write_mode.add_argument("mode", type=int, choices=[mode.value for mode in ModeCommand]) + poll = subparsers.add_parser("poll-once") + poll.add_argument("--target", nargs=6, type=float, metavar=("X", "Y", "Z", "A", "B", "C")) subparsers.add_parser("demo") args = parser.parse_args() @@ -32,9 +35,16 @@ def main() -> None: elif args.command == "write-mode": client.write_mode(args.mode) print(client.read_mode().name) + elif args.command == "poll-once": + target = Pose6D.from_iterable(args.target) if args.target else Pose6D.zeros() + snapshot = PollingRunner(client, correction_provider=pose_delta(target)).run_once() + print(f"{snapshot.mode.name} {snapshot.status.name}") elif args.command == "demo": client.write_mode(ModeCommand.ONLINE_TRACKING) - client.write_correction(Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0)) + PollingRunner( + client, + correction_provider=pose_delta(Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0)), + ).run_once() print(client.read_mode().name) diff --git a/src/line_laser_modbus/runner.py b/src/line_laser_modbus/runner.py new file mode 100644 index 0000000..4ec59bf --- /dev/null +++ b/src/line_laser_modbus/runner.py @@ -0,0 +1,78 @@ +"""Polling loop helpers for production and simulation.""" + +from __future__ import annotations + +import time +from collections.abc import Callable +from dataclasses import dataclass + +from line_laser_modbus.client import LineLaserClient +from line_laser_modbus.constants import DEFAULT_TIMEOUT_SECONDS +from line_laser_modbus.models import DeviceSnapshot, ModeCommand, Pose6D + +CorrectionProvider = Callable[[DeviceSnapshot], Pose6D] +SnapshotHandler = Callable[[DeviceSnapshot], None] + + +@dataclass(frozen=True, slots=True) +class PollingConfig: + interval_seconds: float = 0.02 + max_timeouts: int = 3 + + +def pose_delta(target: Pose6D) -> CorrectionProvider: + def calculate(snapshot: DeviceSnapshot) -> Pose6D: + # 纠偏量按目标位姿减当前位姿计算,实际项目可替换为轨迹规划结果 + return Pose6D.from_iterable( + [ + target_value - current + for target_value, current in zip( + target.as_tuple(), + snapshot.pose.as_tuple(), + strict=True, + ) + ] + ) + + return calculate + + +class PollingRunner: + def __init__( + self, + client: LineLaserClient, + correction_provider: CorrectionProvider | None = None, + snapshot_handler: SnapshotHandler | None = None, + config: PollingConfig | None = None, + ) -> None: + self.client = client + self.correction_provider = correction_provider + self.snapshot_handler = snapshot_handler + self.config = config or PollingConfig() + self.timeout_count = 0 + + def run_once(self) -> DeviceSnapshot: + snapshot = self.client.read_snapshot() + self.timeout_count = 0 + if self.snapshot_handler: + self.snapshot_handler(snapshot) + if snapshot.mode is ModeCommand.ONLINE_TRACKING and self.correction_provider: + self.client.write_correction(self.correction_provider(snapshot)) + return snapshot + + def run_forever(self) -> None: + while True: + started = time.monotonic() + try: + self.run_once() + except TimeoutError: + self.timeout_count += 1 + if self.timeout_count >= self.config.max_timeouts: + raise + elapsed = time.monotonic() - started + time.sleep(max(0.0, self.config.interval_seconds - elapsed)) + + +def default_polling_config() -> PollingConfig: + max_timeouts = max(1, round(DEFAULT_TIMEOUT_SECONDS / 0.02)) + return PollingConfig(interval_seconds=0.02, max_timeouts=max_timeouts) diff --git a/tests/test_runner.py b/tests/test_runner.py new file mode 100644 index 0000000..3360b43 --- /dev/null +++ b/tests/test_runner.py @@ -0,0 +1,33 @@ +from line_laser_modbus.client import LineLaserClient +from line_laser_modbus.config import SerialConfig +from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D +from line_laser_modbus.runner import PollingRunner, pose_delta +from line_laser_modbus.simulator import SimulatedModbusBackend + + +def test_polling_runner_writes_tracking_correction() -> None: + backend = SimulatedModbusBackend( + mode=ModeCommand.ONLINE_TRACKING, + status=DeviceStatus.TRACKING_OK, + current_pose=Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0), + ) + target = Pose6D(2.0, 4.0, 6.0, 0.0, 0.0, 5.0) + + with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client: + snapshot = PollingRunner(client, correction_provider=pose_delta(target)).run_once() + + assert snapshot.status is DeviceStatus.TRACKING_OK + assert backend.correction() == Pose6D(1.0, 2.0, 3.0, 0.0, -1.0, 3.0) + + +def test_polling_runner_does_not_write_correction_outside_tracking() -> None: + backend = SimulatedModbusBackend(current_pose=Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0)) + + with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client: + runner = PollingRunner( + client, + correction_provider=pose_delta(Pose6D(2.0, 2.0, 2.0, 2.0, 2.0, 2.0)), + ) + runner.run_once() + + assert backend.correction() == Pose6D.zeros()