feat: 完成轮询运行器实现

This commit is contained in:
2026-05-13 14:05:35 +08:00
Unverified
parent 9d99b763d6
commit 1f86e6b573
4 changed files with 125 additions and 1 deletions
+3
View File
@@ -3,12 +3,15 @@
from line_laser_modbus.client import LineLaserClient
from line_laser_modbus.config import SerialConfig
from line_laser_modbus.models import DeviceSnapshot, DeviceStatus, ModeCommand, Pose6D
from line_laser_modbus.runner import PollingConfig, PollingRunner
__all__ = [
"DeviceSnapshot",
"DeviceStatus",
"LineLaserClient",
"ModeCommand",
"PollingConfig",
"PollingRunner",
"Pose6D",
"SerialConfig",
]
+11 -1
View File
@@ -7,6 +7,7 @@ import argparse
from line_laser_modbus.client import LineLaserClient
from line_laser_modbus.config import SerialConfig
from line_laser_modbus.models import ModeCommand, Pose6D
from line_laser_modbus.runner import PollingRunner, pose_delta
from line_laser_modbus.simulator import SimulatedModbusBackend
@@ -20,6 +21,8 @@ def main() -> None:
subparsers.add_parser("read-status")
write_mode = subparsers.add_parser("write-mode")
write_mode.add_argument("mode", type=int, choices=[mode.value for mode in ModeCommand])
poll = subparsers.add_parser("poll-once")
poll.add_argument("--target", nargs=6, type=float, metavar=("X", "Y", "Z", "A", "B", "C"))
subparsers.add_parser("demo")
args = parser.parse_args()
@@ -32,9 +35,16 @@ def main() -> None:
elif args.command == "write-mode":
client.write_mode(args.mode)
print(client.read_mode().name)
elif args.command == "poll-once":
target = Pose6D.from_iterable(args.target) if args.target else Pose6D.zeros()
snapshot = PollingRunner(client, correction_provider=pose_delta(target)).run_once()
print(f"{snapshot.mode.name} {snapshot.status.name}")
elif args.command == "demo":
client.write_mode(ModeCommand.ONLINE_TRACKING)
client.write_correction(Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0))
PollingRunner(
client,
correction_provider=pose_delta(Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0)),
).run_once()
print(client.read_mode().name)
+78
View File
@@ -0,0 +1,78 @@
"""Polling loop helpers for production and simulation."""
from __future__ import annotations
import time
from collections.abc import Callable
from dataclasses import dataclass
from line_laser_modbus.client import LineLaserClient
from line_laser_modbus.constants import DEFAULT_TIMEOUT_SECONDS
from line_laser_modbus.models import DeviceSnapshot, ModeCommand, Pose6D
CorrectionProvider = Callable[[DeviceSnapshot], Pose6D]
SnapshotHandler = Callable[[DeviceSnapshot], None]
@dataclass(frozen=True, slots=True)
class PollingConfig:
interval_seconds: float = 0.02
max_timeouts: int = 3
def pose_delta(target: Pose6D) -> CorrectionProvider:
def calculate(snapshot: DeviceSnapshot) -> Pose6D:
# 纠偏量按目标位姿减当前位姿计算,实际项目可替换为轨迹规划结果
return Pose6D.from_iterable(
[
target_value - current
for target_value, current in zip(
target.as_tuple(),
snapshot.pose.as_tuple(),
strict=True,
)
]
)
return calculate
class PollingRunner:
def __init__(
self,
client: LineLaserClient,
correction_provider: CorrectionProvider | None = None,
snapshot_handler: SnapshotHandler | None = None,
config: PollingConfig | None = None,
) -> None:
self.client = client
self.correction_provider = correction_provider
self.snapshot_handler = snapshot_handler
self.config = config or PollingConfig()
self.timeout_count = 0
def run_once(self) -> DeviceSnapshot:
snapshot = self.client.read_snapshot()
self.timeout_count = 0
if self.snapshot_handler:
self.snapshot_handler(snapshot)
if snapshot.mode is ModeCommand.ONLINE_TRACKING and self.correction_provider:
self.client.write_correction(self.correction_provider(snapshot))
return snapshot
def run_forever(self) -> None:
while True:
started = time.monotonic()
try:
self.run_once()
except TimeoutError:
self.timeout_count += 1
if self.timeout_count >= self.config.max_timeouts:
raise
elapsed = time.monotonic() - started
time.sleep(max(0.0, self.config.interval_seconds - elapsed))
def default_polling_config() -> PollingConfig:
max_timeouts = max(1, round(DEFAULT_TIMEOUT_SECONDS / 0.02))
return PollingConfig(interval_seconds=0.02, max_timeouts=max_timeouts)
+33
View File
@@ -0,0 +1,33 @@
from line_laser_modbus.client import LineLaserClient
from line_laser_modbus.config import SerialConfig
from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D
from line_laser_modbus.runner import PollingRunner, pose_delta
from line_laser_modbus.simulator import SimulatedModbusBackend
def test_polling_runner_writes_tracking_correction() -> None:
backend = SimulatedModbusBackend(
mode=ModeCommand.ONLINE_TRACKING,
status=DeviceStatus.TRACKING_OK,
current_pose=Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0),
)
target = Pose6D(2.0, 4.0, 6.0, 0.0, 0.0, 5.0)
with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client:
snapshot = PollingRunner(client, correction_provider=pose_delta(target)).run_once()
assert snapshot.status is DeviceStatus.TRACKING_OK
assert backend.correction() == Pose6D(1.0, 2.0, 3.0, 0.0, -1.0, 3.0)
def test_polling_runner_does_not_write_correction_outside_tracking() -> None:
backend = SimulatedModbusBackend(current_pose=Pose6D(1.0, 2.0, 3.0, 0.0, 1.0, 2.0))
with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client:
runner = PollingRunner(
client,
correction_provider=pose_delta(Pose6D(2.0, 2.0, 2.0, 2.0, 2.0, 2.0)),
)
runner.run_once()
assert backend.correction() == Pose6D.zeros()