From 1483d89f4927d6e9154c87a86bb6df520e892510 Mon Sep 17 00:00:00 2001 From: chuan Date: Mon, 22 Jun 2026 14:39:30 +0800 Subject: [PATCH] Align HMI with protocol v1.4 --- README.md | 8 +- docs/design.md | 16 ++-- src/line_laser_hmi/backend.py | 11 ++- src/line_laser_hmi/labels.py | 36 +++++++++ src/line_laser_hmi/main_window.py | 3 + src/line_laser_hmi/panels/chart_panel.py | 63 +++++++++++++++- src/line_laser_hmi/panels/connection_panel.py | 10 +-- src/line_laser_hmi/panels/manual_panel.py | 29 ++++++- src/line_laser_hmi/panels/mode_panel.py | 12 +-- src/line_laser_hmi/panels/polling_panel.py | 19 +++-- src/line_laser_hmi/panels/status_panel.py | 22 +++++- src/line_laser_hmi/worker.py | 64 +++++++++++++--- tests/test_backend.py | 11 ++- tests/test_labels.py | 15 ++++ tests/test_worker.py | 75 +++++++++++++++++++ 15 files changed, 344 insertions(+), 50 deletions(-) create mode 100644 src/line_laser_hmi/labels.py create mode 100644 tests/test_labels.py create mode 100644 tests/test_worker.py diff --git a/README.md b/README.md index 2164e44..907e933 100644 --- a/README.md +++ b/README.md @@ -7,11 +7,11 @@ ## 功能 - 串口连接配置与自动枚举 COM 口,**模拟模式**无需硬件即可运行 -- 6 种工作模式切换(按协议状态机校验)与独立急停按钮 -- 设备状态字、时间戳、通信超时实时监控 +- 5 种普通工作模式切换(按协议状态机校验)与独立急停按钮 +- 设备状态字、时间戳、可用缓存数量(0xD002)、通信超时实时监控 - 当前 6 轴位姿数码显示与实时趋势曲线 -- 手动下发目标示教位姿 / 纠偏量,设置在线跟踪自动纠偏目标 -- 50ms 后台轮询(独立线程,不阻塞界面),连接成功后自动启动 +- 手动下发目标示教位姿 / 纠偏量,设置在线跟踪自动纠偏目标;手动下发按当前模式限制,目标示教位姿写入前检查 0xD002 可用缓存数量 +- 50ms 后台轮询、150ms 单帧超时(独立线程,不阻塞界面),连接成功后自动启动 - 快照录制为 CSV,通信日志独立窗口(菜单「视图 → 日志窗口」,Ctrl+L)可导出 ## 安装与运行 diff --git a/docs/design.md b/docs/design.md index 295d396..ddb7adf 100644 --- a/docs/design.md +++ b/docs/design.md @@ -31,11 +31,11 @@ GUI 线程 --请求信号(队列)--> ModbusWorker(子线程) --结果信号( | 面板 | 模块 | 对应协议库能力 | | ---- | ---- | -------------- | | 连接 | `panels/connection_panel.py` | `SerialConfig`、模拟后端开关 | -| 模式控制 | `panels/mode_panel.py` | `switch_mode`(状态机校验)、急停 `write_mode(5)` | -| 状态监控 | `panels/status_panel.py` | `read_status`、时间戳、超时计数 | +| 模式控制 | `panels/mode_panel.py` | `switch_mode`(状态机校验)、急停 `trigger_emergency_stop` | +| 状态监控 | `panels/status_panel.py` | `read_status`、`read_available_cache_count`、时间戳、超时计数 | | 当前位姿 | `panels/pose_panel.py` | `read_current_pose` | | 轮询与录制 | `panels/polling_panel.py` | `PollingRunner`、CSV 录制 | -| 手动下发 | `panels/manual_panel.py` | `write_target_pose`、`write_correction`、跟踪目标 | +| 手动下发 | `panels/manual_panel.py` | `read_available_cache_count`、`write_target_pose`、`write_correction`、跟踪目标 | | 实时曲线 | `panels/chart_panel.py` | 6 轴位姿趋势 | | 日志窗口 | `panels/log_panel.py` 的 `LogWindow` | 通信/错误记录、导出;由菜单「视图 → 日志窗口」(Ctrl+L) 打开 | @@ -46,14 +46,20 @@ GUI 线程 --请求信号(队列)--> ModbusWorker(子线程) --结果信号( 为满足无硬件演示,`backend.py` 提供 `DynamicSimulatedBackend`: - 读取当前位姿时按正弦规律围绕基准位姿摆动,使数码显示与曲线动起来; -- 写入模式命令时联动刷新设备状态字(如进入在线跟踪后状态变为 `TRACKING_OK`)。 +- 写入模式命令时联动刷新设备状态字(如进入在线跟踪后状态变为 `TRACKING_OK`); +- 默认提供 `0xD002` 可用缓存数量为 `1`,便于验证目标示教位姿下发。 勾选连接面板的「模拟模式」即可在没有串口和控制器的机器上完整体验所有功能。 ## 5. 状态机与安全 +- 普通模式切换只包含 `0~4`,模式 `0` 在界面显示为「手动示教」。 - 普通模式切换走 `switch_mode`,非法切换由协议库抛错并在日志提示。 -- 急停按钮走 `force_mode`(`write_mode`,不做状态机校验),保证任意状态下可立即下发急停。 +- 急停按钮走 `force_mode`(内部调用 `trigger_emergency_stop`,不做状态机校验),保证任意状态下可立即下发急停。 +- 手动下发按钮按当前模式启用:模式 `2` 允许目标示教位姿,模式 `3/4` 允许 6 轴纠偏量,模式 `0/1` 不允许下发控制指令。 +- worker 写入前会再次读取当前模式校验,避免按钮状态滞后或信号绕过。 +- 模式 `2` 下刷新 `0xD002` 可用缓存数量;目标示教位姿写入前再次读取,数量为 `0` 时界面提示并拒绝下发。 +- UI 固定使用协议轮询周期 `50ms` 和单帧超时 `150ms`;轮询连续失败达到阈值后自动断开连接。 ## 6. 数据录制 diff --git a/src/line_laser_hmi/backend.py b/src/line_laser_hmi/backend.py index 40aa621..d2c949a 100644 --- a/src/line_laser_hmi/backend.py +++ b/src/line_laser_hmi/backend.py @@ -8,13 +8,18 @@ import time from line_laser_modbus.client import LineLaserClient, ModbusBackend from line_laser_modbus.codec import encode_timed_pose from line_laser_modbus.config import SerialConfig -from line_laser_modbus.constants import ADDR_CURRENT_POSE, ADDR_DEVICE_STATUS, ADDR_MODE_COMMAND +from line_laser_modbus.constants import ( + ADDR_AVAILABLE_CACHE_COUNT, + ADDR_CURRENT_POSE, + ADDR_DEVICE_STATUS, + ADDR_MODE_COMMAND, +) from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D, TimedPose6D from line_laser_modbus.simulator import SimulatedModbusBackend # 模式写入后模拟控制器反馈的状态映射,仅用于无硬件演示。 _MODE_TO_STATUS = { - ModeCommand.STANDBY_RESET: DeviceStatus.STANDBY_READY, + ModeCommand.MANUAL_TEACHING: DeviceStatus.IDLE, ModeCommand.CALIBRATION: DeviceStatus.RUNNING, ModeCommand.PRE_WELD_TEACHING: DeviceStatus.RUNNING, ModeCommand.ONLINE_TRACKING: DeviceStatus.TRACKING_OK, @@ -44,6 +49,8 @@ class DynamicSimulatedBackend(SimulatedModbusBackend): """写入模式命令时联动刷新模拟设备状态字""" response = super().write_registers(address, values, device_id=device_id) + if address == ADDR_AVAILABLE_CACHE_COUNT and values: + self.registers[ADDR_AVAILABLE_CACHE_COUNT] = max(0, values[0]) if address == ADDR_MODE_COMMAND and values: try: status = _MODE_TO_STATUS[ModeCommand(values[0])] diff --git a/src/line_laser_hmi/labels.py b/src/line_laser_hmi/labels.py new file mode 100644 index 0000000..f5fa1d9 --- /dev/null +++ b/src/line_laser_hmi/labels.py @@ -0,0 +1,36 @@ +"""Display labels for protocol enum values.""" + +from __future__ import annotations + +from line_laser_modbus.models import DeviceStatus, ModeCommand + +MODE_LABELS = { + ModeCommand.MANUAL_TEACHING: "手动示教", + ModeCommand.CALIBRATION: "系统标定", + ModeCommand.PRE_WELD_TEACHING: "焊前扫描示教", + ModeCommand.ONLINE_TRACKING: "在线全轴跟踪", + ModeCommand.TRAJECTORY_REPLAY: "轨迹批量复现", + ModeCommand.EMERGENCY_STOP: "紧急停止", +} + +STATUS_LABELS = { + DeviceStatus.IDLE: "空闲", + DeviceStatus.RUNNING: "运动运行中", + DeviceStatus.TEACHING_DONE: "示教完成", + DeviceStatus.TRACKING_OK: "在线跟踪正常", + DeviceStatus.ALARM: "设备报警", + DeviceStatus.CALIBRATION_DONE: "标定完成", + DeviceStatus.EMERGENCY_TRIGGERED: "急停已触发", +} + + +def mode_label(mode: ModeCommand) -> str: + """Return the user-facing Chinese label for a mode command.""" + + return MODE_LABELS.get(mode, mode.name) + + +def status_label(status: DeviceStatus) -> str: + """Return the user-facing Chinese label for a device status.""" + + return STATUS_LABELS.get(status, status.name) diff --git a/src/line_laser_hmi/main_window.py b/src/line_laser_hmi/main_window.py index e4f4c04..bcce4cc 100644 --- a/src/line_laser_hmi/main_window.py +++ b/src/line_laser_hmi/main_window.py @@ -149,6 +149,7 @@ class MainWindow(QMainWindow): self._worker.connected.connect(self._on_connected) self._worker.disconnected.connect(self._on_disconnected) self._worker.snapshotReady.connect(self._on_snapshot) + self._worker.cacheCountReady.connect(self._status.set_cache_count) self._worker.writeAck.connect(lambda msg: self._log.append(msg)) self._worker.errorOccurred.connect(lambda msg: self._log.append(msg, "错误")) self._worker.timeoutCount.connect(self._status.set_timeout) @@ -171,6 +172,7 @@ class MainWindow(QMainWindow): self._connection.set_connected(False, "未连接") self._status.set_connection(False, "未连接") + self._status.set_cache_count(None) self._set_controls_enabled(False) self._on_stop_record() self._log.append("已断开连接") @@ -181,6 +183,7 @@ class MainWindow(QMainWindow): self._status.update_snapshot(snapshot) self._pose.update_pose(snapshot.pose) self._mode.set_current_mode(snapshot.mode) + self._manual.set_mode(snapshot.mode) self._chart.append(snapshot.pose) if self._recorder.is_recording: self._recorder.write(snapshot) diff --git a/src/line_laser_hmi/panels/chart_panel.py b/src/line_laser_hmi/panels/chart_panel.py index ac822c0..e7f510f 100644 --- a/src/line_laser_hmi/panels/chart_panel.py +++ b/src/line_laser_hmi/panels/chart_panel.py @@ -6,7 +6,8 @@ from collections import deque import pyqtgraph as pg from line_laser_modbus.models import Pose6D -from PySide6.QtWidgets import QGroupBox, QVBoxLayout, QWidget +from PySide6.QtCore import QSignalBlocker +from PySide6.QtWidgets import QCheckBox, QGroupBox, QHBoxLayout, QPushButton, QVBoxLayout, QWidget # 曲线缓冲长度,按 50ms 周期约保留最近 30 秒数据。 _BUFFER = 600 @@ -27,11 +28,27 @@ class ChartPanel(QGroupBox): self._samples = 0 self._x: deque[int] = deque(maxlen=_BUFFER) self._curves: list[tuple[deque[float], pg.PlotDataItem]] = [] + self._plots: list[pg.PlotWidget] = [] + self._follow_latest = True + self._applying_range = False + + self._follow = QCheckBox("跟随最新") + self._follow.setChecked(True) + self._follow.toggled.connect(self._on_follow_toggled) + + self._reset = QPushButton("重置视图") + self._reset.clicked.connect(self._restore_follow_latest) position_plot = self._make_plot("位置 (mm)", _POSITION) attitude_plot = self._make_plot("姿态 (°)", _ATTITUDE) + controls = QHBoxLayout() + controls.addWidget(self._follow) + controls.addWidget(self._reset) + controls.addStretch(1) + layout = QVBoxLayout(self) + layout.addLayout(controls) layout.addWidget(position_plot) layout.addWidget(attitude_plot) @@ -44,6 +61,8 @@ class ChartPanel(QGroupBox): for value, (buffer, curve) in zip(pose.as_tuple(), self._curves, strict=True): buffer.append(value) curve.setData(x, list(buffer)) + if self._follow_latest: + self._apply_follow_range() def clear(self) -> None: """清空所有曲线缓冲""" @@ -53,6 +72,7 @@ class ChartPanel(QGroupBox): for buffer, curve in self._curves: buffer.clear() curve.setData([], []) + self._restore_follow_latest() def _make_plot(self, title: str, axes: tuple[tuple[str, str], ...]) -> pg.PlotWidget: """创建一个带图例的曲线图并登记其曲线""" @@ -61,7 +81,48 @@ class ChartPanel(QGroupBox): plot.addLegend() plot.showGrid(x=True, y=True, alpha=0.3) plot.setMinimumHeight(160) + plot.getViewBox().sigRangeChangedManually.connect(self._on_manual_range_changed) + self._plots.append(plot) for name, color in axes: curve = plot.plot(pen=pg.mkPen(color, width=2), name=name) self._curves.append((deque(maxlen=_BUFFER), curve)) return plot + + def _on_follow_toggled(self, checked: bool) -> None: + """切换是否让视图自动跟随最新数据""" + + self._follow_latest = checked + if checked: + self._apply_follow_range() + + def _restore_follow_latest(self) -> None: + """恢复默认的跟随最新视图""" + + self._follow_latest = True + with QSignalBlocker(self._follow): + self._follow.setChecked(True) + self._apply_follow_range() + + def _on_manual_range_changed(self, *_args) -> None: + """用户拖动或缩放后暂停自动跟随""" + + if self._applying_range: + return + self._follow_latest = False + with QSignalBlocker(self._follow): + self._follow.setChecked(False) + + def _apply_follow_range(self) -> None: + """将 X 轴滚动到最新采样并让 Y 轴自动适配""" + + if not self._plots: + return + start = max(0, self._samples - _BUFFER) + stop = max(start + 1, self._samples - 1) + self._applying_range = True + try: + for plot in self._plots: + plot.setXRange(start, stop, padding=0) + plot.enableAutoRange(axis=pg.ViewBox.YAxis, enable=True) + finally: + self._applying_range = False diff --git a/src/line_laser_hmi/panels/connection_panel.py b/src/line_laser_hmi/panels/connection_panel.py index 24095ff..6d7c9c8 100644 --- a/src/line_laser_hmi/panels/connection_panel.py +++ b/src/line_laser_hmi/panels/connection_panel.py @@ -3,6 +3,7 @@ from __future__ import annotations from line_laser_modbus.config import SerialConfig +from line_laser_modbus.constants import DEFAULT_TIMEOUT_SECONDS from PySide6.QtCore import Signal from PySide6.QtWidgets import ( QCheckBox, @@ -50,10 +51,9 @@ class ConnectionPanel(QGroupBox): self._slave_id.setValue(8) self._timeout = QDoubleSpinBox() - self._timeout.setRange(0.01, 5.0) - self._timeout.setSingleStep(0.05) + self._timeout.setRange(DEFAULT_TIMEOUT_SECONDS, DEFAULT_TIMEOUT_SECONDS) self._timeout.setDecimals(2) - self._timeout.setValue(0.15) + self._timeout.setValue(DEFAULT_TIMEOUT_SECONDS) self._retries = QSpinBox() self._retries.setRange(0, 10) @@ -94,7 +94,7 @@ class ConnectionPanel(QGroupBox): port=self._port.currentText().strip() or "COM1", slave_id=self._slave_id.value(), baudrate=int(self._baudrate.currentText()), - timeout=self._timeout.value(), + timeout=DEFAULT_TIMEOUT_SECONDS, retries=self._retries.value(), ) @@ -112,7 +112,7 @@ class ConnectionPanel(QGroupBox): self._port.setCurrentText(serial.port) self._baudrate.setCurrentText(str(serial.baudrate)) self._slave_id.setValue(serial.slave_id) - self._timeout.setValue(serial.timeout) + self._timeout.setValue(DEFAULT_TIMEOUT_SECONDS) self._retries.setValue(serial.retries) self._simulate.setChecked(config.ui.simulate) diff --git a/src/line_laser_hmi/panels/manual_panel.py b/src/line_laser_hmi/panels/manual_panel.py index 8bf298c..bfe0977 100644 --- a/src/line_laser_hmi/panels/manual_panel.py +++ b/src/line_laser_hmi/panels/manual_panel.py @@ -2,7 +2,7 @@ from __future__ import annotations -from line_laser_modbus.models import Pose6D +from line_laser_modbus.models import ModeCommand, Pose6D from PySide6.QtCore import Signal from PySide6.QtWidgets import ( QDoubleSpinBox, @@ -32,6 +32,8 @@ class ManualPanel(QGroupBox): super().__init__("手动下发", parent) self._inputs: list[QDoubleSpinBox] = [] + self._connected = False + self._mode: ModeCommand | None = None grid = QGridLayout() for index, (name, unit) in enumerate(_AXES): @@ -78,9 +80,16 @@ class ManualPanel(QGroupBox): def set_controls_enabled(self, enabled: bool) -> None: """连接状态变化时启用或禁用下发按钮""" - self._target_button.setEnabled(enabled) - self._correction_button.setEnabled(enabled) - self._track_button.setEnabled(enabled) + self._connected = enabled + if not enabled: + self._mode = None + self._apply_protocol_mode() + + def set_mode(self, mode: ModeCommand) -> None: + """按当前协议模式启用允许的手动下发动作""" + + self._mode = mode + self._apply_protocol_mode() def _emit_target(self) -> None: """发出目标示教位姿下发请求""" @@ -96,3 +105,15 @@ class ManualPanel(QGroupBox): """发出设置自动跟踪目标请求""" self.setTrackingTarget.emit(self.pose()) + + def _apply_protocol_mode(self) -> None: + """按 V1.4 通信行为限制手动控制按钮""" + + target_enabled = self._connected and self._mode is ModeCommand.PRE_WELD_TEACHING + correction_enabled = self._connected and self._mode in { + ModeCommand.ONLINE_TRACKING, + ModeCommand.TRAJECTORY_REPLAY, + } + self._target_button.setEnabled(target_enabled) + self._correction_button.setEnabled(correction_enabled) + self._track_button.setEnabled(self._connected) diff --git a/src/line_laser_hmi/panels/mode_panel.py b/src/line_laser_hmi/panels/mode_panel.py index ef8ed0d..4d6b955 100644 --- a/src/line_laser_hmi/panels/mode_panel.py +++ b/src/line_laser_hmi/panels/mode_panel.py @@ -13,13 +13,15 @@ from PySide6.QtWidgets import ( QWidget, ) +from line_laser_hmi.labels import mode_label + # 可由界面主动切换的运行模式(急停单独用醒目按钮处理)。 _MODE_BUTTONS = ( - (ModeCommand.STANDBY_RESET, "待机复位"), + (ModeCommand.MANUAL_TEACHING, "手动示教"), (ModeCommand.CALIBRATION, "系统标定"), - (ModeCommand.PRE_WELD_TEACHING, "焊前示教"), - (ModeCommand.ONLINE_TRACKING, "在线跟踪"), - (ModeCommand.TRAJECTORY_REPLAY, "轨迹复现"), + (ModeCommand.PRE_WELD_TEACHING, "焊前扫描示教"), + (ModeCommand.ONLINE_TRACKING, "在线全轴跟踪"), + (ModeCommand.TRAJECTORY_REPLAY, "轨迹批量复现"), ) @@ -61,7 +63,7 @@ class ModePanel(QGroupBox): def set_current_mode(self, mode: ModeCommand) -> None: """高亮当前模式按钮并更新文字""" - self._current.setText(f"当前模式:{mode.name}") + self._current.setText(f"当前模式:{mode_label(mode)}") for value, button in self._buttons.items(): highlight = value is mode button.setStyleSheet( diff --git a/src/line_laser_hmi/panels/polling_panel.py b/src/line_laser_hmi/panels/polling_panel.py index 9aa611f..aa4f37c 100644 --- a/src/line_laser_hmi/panels/polling_panel.py +++ b/src/line_laser_hmi/panels/polling_panel.py @@ -2,6 +2,7 @@ from __future__ import annotations +from line_laser_modbus.constants import DEFAULT_POLLING_INTERVAL_SECONDS from PySide6.QtCore import Signal from PySide6.QtWidgets import ( QFormLayout, @@ -14,9 +15,11 @@ from PySide6.QtWidgets import ( QWidget, ) +_PROTOCOL_POLLING_INTERVAL_MS = round(DEFAULT_POLLING_INTERVAL_SECONDS * 1000) + class PollingPanel(QGroupBox): - """控制后台轮询周期与 CSV 录制""" + """控制后台轮询与 CSV 录制""" startPolling = Signal(int) stopPolling = Signal() @@ -31,10 +34,10 @@ class PollingPanel(QGroupBox): self._recording = False self._interval = QSpinBox() - self._interval.setRange(10, 5000) - self._interval.setSingleStep(10) - self._interval.setValue(50) + self._interval.setRange(_PROTOCOL_POLLING_INTERVAL_MS, _PROTOCOL_POLLING_INTERVAL_MS) + self._interval.setValue(_PROTOCOL_POLLING_INTERVAL_MS) self._interval.setSuffix(" ms") + self._interval.setEnabled(False) self._poll_button = QPushButton("开始轮询") self._poll_button.clicked.connect(self._on_poll) @@ -59,9 +62,9 @@ class PollingPanel(QGroupBox): self.set_controls_enabled(False) def interval(self) -> int: - """返回当前设置的轮询周期(毫秒)""" + """返回协议固定轮询周期(毫秒)""" - return self._interval.value() + return _PROTOCOL_POLLING_INTERVAL_MS def set_controls_enabled(self, enabled: bool) -> None: """连接状态变化时启用或禁用轮询控件""" @@ -77,7 +80,7 @@ class PollingPanel(QGroupBox): self._polling = polling self._poll_button.setText("停止轮询" if polling else "开始轮询") - self._interval.setEnabled(not polling) + self._interval.setEnabled(False) def set_recording(self, recording: bool, message: str) -> None: """更新录制按钮和状态文字""" @@ -98,7 +101,7 @@ class PollingPanel(QGroupBox): if self._polling: self.stopPolling.emit() else: - self.startPolling.emit(self._interval.value()) + self.startPolling.emit(_PROTOCOL_POLLING_INTERVAL_MS) def _on_record(self) -> None: """录制按钮点击时发出开始或停止请求""" diff --git a/src/line_laser_hmi/panels/status_panel.py b/src/line_laser_hmi/panels/status_panel.py index 603af42..0cf7775 100644 --- a/src/line_laser_hmi/panels/status_panel.py +++ b/src/line_laser_hmi/panels/status_panel.py @@ -5,9 +5,11 @@ from __future__ import annotations from line_laser_modbus.models import DeviceSnapshot, DeviceStatus from PySide6.QtWidgets import QFormLayout, QGroupBox, QLabel, QWidget +from line_laser_hmi.labels import mode_label, status_label + # 异常类状态用红色,正常运行类用绿色,其余用中性色。 _STATUS_COLORS = { - DeviceStatus.STANDBY_READY: "#455a64", + DeviceStatus.IDLE: "#455a64", DeviceStatus.RUNNING: "#2e7d32", DeviceStatus.TEACHING_DONE: "#1565c0", DeviceStatus.TRACKING_OK: "#2e7d32", @@ -18,7 +20,7 @@ _STATUS_COLORS = { class StatusPanel(QGroupBox): - """显示模式、设备状态字、时间戳和通信超时计数""" + """显示模式、设备状态字、时间戳、缓存数量和通信超时计数""" def __init__(self, parent: QWidget | None = None) -> None: """构建状态监控标签""" @@ -29,6 +31,7 @@ class StatusPanel(QGroupBox): self._status = QLabel("—") self._status.setStyleSheet("color: white; background: #455a64; padding: 4px;") self._timestamp = QLabel("—") + self._cache = QLabel("—") self._timeout = QLabel("0") self._connection = QLabel("未连接") @@ -36,18 +39,29 @@ class StatusPanel(QGroupBox): form.addRow("模式", self._mode) form.addRow("设备状态", self._status) form.addRow("时间戳(ms)", self._timestamp) + form.addRow("可用缓存数量", self._cache) form.addRow("连续超时", self._timeout) form.addRow("连接", self._connection) def update_snapshot(self, snapshot: DeviceSnapshot) -> None: """用一次快照刷新模式、状态和时间戳""" - self._mode.setText(snapshot.mode.name) + self._mode.setText(mode_label(snapshot.mode)) color = _STATUS_COLORS.get(snapshot.status, "#455a64") - self._status.setText(snapshot.status.name) + self._status.setText(status_label(snapshot.status)) self._status.setStyleSheet(f"color: white; background: {color}; padding: 4px;") self._timestamp.setText(str(snapshot.timestamp)) + def set_cache_count(self, count: int | None) -> None: + """刷新 0xD002 可用缓存数量,0 时标红提醒不可下发示教位姿""" + + if count is None: + self._cache.setText("—") + self._cache.setStyleSheet("") + return + self._cache.setText(str(count)) + self._cache.setStyleSheet("color: #c62828; font-weight: bold;" if count <= 0 else "") + def set_timeout(self, count: int) -> None: """刷新连续超时计数,非零时标红""" diff --git a/src/line_laser_hmi/worker.py b/src/line_laser_hmi/worker.py index 34d4891..c946944 100644 --- a/src/line_laser_hmi/worker.py +++ b/src/line_laser_hmi/worker.py @@ -6,11 +6,13 @@ import contextlib from line_laser_modbus.client import LineLaserClient from line_laser_modbus.config import SerialConfig +from line_laser_modbus.constants import DEFAULT_POLLING_INTERVAL_SECONDS from line_laser_modbus.models import ModeCommand, Pose6D, validate_mode_switch -from line_laser_modbus.runner import PollingRunner, pose_delta +from line_laser_modbus.runner import PollingRunner, default_polling_config, pose_delta from PySide6.QtCore import QObject, QTimer, Signal, Slot from line_laser_hmi.backend import build_client +from line_laser_hmi.labels import mode_label class ModbusWorker(QObject): @@ -23,6 +25,7 @@ class ModbusWorker(QObject): connected = Signal(bool, str) disconnected = Signal() snapshotReady = Signal(object) + cacheCountReady = Signal(object) writeAck = Signal(str) errorOccurred = Signal(str) timeoutCount = Signal(int) @@ -36,6 +39,7 @@ class ModbusWorker(QObject): self._runner: PollingRunner | None = None self._target: Pose6D | None = None self._timeouts = 0 + self._max_timeouts = default_polling_config().max_timeouts @Slot(object, bool) def connect_device(self, serial: SerialConfig, simulate: bool) -> None: @@ -53,7 +57,6 @@ class ModbusWorker(QObject): self._runner = PollingRunner( client, correction_provider=self._provide_correction, - snapshot_handler=self.snapshotReady.emit, ) self._timer = QTimer(self) self._timer.timeout.connect(self._tick) @@ -75,7 +78,7 @@ class ModbusWorker(QObject): self.errorOccurred.emit("未连接,无法启动轮询") return self._timeouts = 0 - self._timer.setInterval(max(1, interval_ms)) + self._timer.setInterval(round(DEFAULT_POLLING_INTERVAL_SECONDS * 1000)) self._timer.start() @Slot() @@ -100,7 +103,7 @@ class ModbusWorker(QObject): except Exception as exc: # noqa: BLE001 - 切换失败回传给界面提示 self.errorOccurred.emit(f"模式切换失败:{exc}") return - self.writeAck.emit(f"切换模式 -> {target.name}") + self.writeAck.emit(f"切换模式 -> {mode_label(target)}") @Slot(int) def force_mode(self, mode_value: int) -> None: @@ -110,11 +113,15 @@ class ModbusWorker(QObject): self.errorOccurred.emit("未连接,无法下发模式") return try: - self._client.write_mode(mode_value) + mode = ModeCommand(mode_value) + if mode is ModeCommand.EMERGENCY_STOP: + self._client.trigger_emergency_stop() + else: + self._client.write_mode(mode) except Exception as exc: # noqa: BLE001 - 写入失败回传给界面提示 self.errorOccurred.emit(f"模式下发失败:{exc}") return - self.writeAck.emit(f"强制模式 -> {ModeCommand(mode_value).name}") + self.writeAck.emit(f"强制模式 -> {mode_label(mode)}") @Slot(object, int) def write_target(self, pose: Pose6D, timestamp: int) -> None: @@ -124,6 +131,15 @@ class ModbusWorker(QObject): self.errorOccurred.emit("未连接,无法下发示教位姿") return try: + mode = self._client.read_mode() + if mode is not ModeCommand.PRE_WELD_TEACHING: + self.errorOccurred.emit("当前模式不允许下发示教位姿,仅焊前扫描示教模式(2)允许") + return + cache_count = self._client.read_available_cache_count() + self.cacheCountReady.emit(cache_count) + if cache_count <= 0: + self.errorOccurred.emit("目标位姿缓存已满,0xD002 可用缓存数量为 0") + return self._client.write_target_pose(pose, timestamp=timestamp) except Exception as exc: # noqa: BLE001 - 写入失败回传给界面提示 self.errorOccurred.emit(f"示教位姿下发失败:{exc}") @@ -138,6 +154,10 @@ class ModbusWorker(QObject): self.errorOccurred.emit("未连接,无法下发纠偏量") return try: + mode = self._client.read_mode() + if mode not in {ModeCommand.ONLINE_TRACKING, ModeCommand.TRAJECTORY_REPLAY}: + self.errorOccurred.emit("当前模式不允许下发纠偏量,仅在线跟踪(3)和轨迹复现(4)允许") + return self._client.write_correction(pose, timestamp=timestamp) except Exception as exc: # noqa: BLE001 - 写入失败回传给界面提示 self.errorOccurred.emit(f"纠偏量下发失败:{exc}") @@ -163,14 +183,14 @@ class ModbusWorker(QObject): if self._runner is None: return try: - self._runner.run_once() + snapshot = self._runner.run_once() + self.snapshotReady.emit(snapshot) + self._emit_cache_count(snapshot.mode) except TimeoutError: - self._timeouts += 1 - self.timeoutCount.emit(self._timeouts) - self.errorOccurred.emit("通信超时") + self._handle_polling_failure("通信超时") return except Exception as exc: # noqa: BLE001 - 周期异常回传但不中断定时器 - self.errorOccurred.emit(f"轮询异常:{exc}") + self._handle_polling_failure(f"轮询异常:{exc}") return if self._timeouts: self._timeouts = 0 @@ -190,3 +210,25 @@ class ModbusWorker(QObject): self._client = None self._runner = None self._timeouts = 0 + + def _emit_cache_count(self, mode: ModeCommand) -> None: + """仅在示教模式读取并回传 0xD002 可用缓存数量""" + + if self._client is None: + return + if mode is not ModeCommand.PRE_WELD_TEACHING: + self.cacheCountReady.emit(None) + return + self.cacheCountReady.emit(self._client.read_available_cache_count()) + + def _handle_polling_failure(self, message: str) -> None: + """累计轮询通信失败,多次失败后断开连接""" + + self._timeouts += 1 + self.timeoutCount.emit(self._timeouts) + self.errorOccurred.emit(message) + if self._timeouts < self._max_timeouts: + return + self.errorOccurred.emit(f"连续通信失败 {self._timeouts} 次,已断开连接") + self._teardown() + self.disconnected.emit() diff --git a/tests/test_backend.py b/tests/test_backend.py index 42386b1..3a4af9e 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -36,6 +36,15 @@ def test_write_mode_updates_status(): client.connect() client.write_mode(ModeCommand.ONLINE_TRACKING) assert client.read_status() is DeviceStatus.TRACKING_OK - client.write_mode(ModeCommand.EMERGENCY_STOP) + client.trigger_emergency_stop() assert client.read_status() is DeviceStatus.EMERGENCY_TRIGGERED client.close() + + +def test_available_cache_count_is_readable(): + """模拟后端应暴露 V1.4 的 0xD002 可用缓存数量""" + + client = build_client(SerialConfig(port="SIM"), simulate=True) + client.connect() + assert client.read_available_cache_count() == 1 + client.close() diff --git a/tests/test_labels.py b/tests/test_labels.py new file mode 100644 index 0000000..2a137bc --- /dev/null +++ b/tests/test_labels.py @@ -0,0 +1,15 @@ +"""Protocol enum display labels.""" + +from __future__ import annotations + +from line_laser_modbus.models import DeviceStatus, ModeCommand + +from line_laser_hmi.labels import mode_label, status_label + + +def test_protocol_labels_follow_v14_names(): + """UI labels should use the V1.4 mode/status wording.""" + + assert mode_label(ModeCommand.MANUAL_TEACHING) == "手动示教" + assert mode_label(ModeCommand.EMERGENCY_STOP) == "紧急停止" + assert status_label(DeviceStatus.IDLE) == "空闲" diff --git a/tests/test_worker.py b/tests/test_worker.py new file mode 100644 index 0000000..80ab1c8 --- /dev/null +++ b/tests/test_worker.py @@ -0,0 +1,75 @@ +"""Worker-side protocol guards.""" + +from __future__ import annotations + +from line_laser_modbus.models import ModeCommand, Pose6D + +from line_laser_hmi.worker import ModbusWorker + + +class _FakeClient: + def __init__(self, mode: ModeCommand, *, cache_count: int = 1) -> None: + self.mode = mode + self.cache_count = cache_count + self.target_writes = 0 + self.correction_writes = 0 + + def read_mode(self) -> ModeCommand: + return self.mode + + def read_available_cache_count(self) -> int: + return self.cache_count + + def write_target_pose(self, pose: Pose6D, *, timestamp: int = 0) -> None: + self.target_writes += 1 + + def write_correction(self, pose: Pose6D, *, timestamp: int = 0) -> None: + self.correction_writes += 1 + + +def test_worker_rejects_target_pose_outside_teaching_mode(): + """Target pose writes are only allowed in pre-weld teaching mode.""" + + worker = ModbusWorker() + client = _FakeClient(ModeCommand.MANUAL_TEACHING) + worker._client = client + + worker.write_target(Pose6D.zeros(), 0) + + assert client.target_writes == 0 + + +def test_worker_allows_target_pose_in_teaching_mode(): + """Target pose writes pass when mode 2 has available cache.""" + + worker = ModbusWorker() + client = _FakeClient(ModeCommand.PRE_WELD_TEACHING) + worker._client = client + + worker.write_target(Pose6D.zeros(), 0) + + assert client.target_writes == 1 + + +def test_worker_rejects_correction_outside_tracking_modes(): + """Correction writes are only allowed in mode 3 or 4.""" + + worker = ModbusWorker() + client = _FakeClient(ModeCommand.PRE_WELD_TEACHING) + worker._client = client + + worker.write_correction(Pose6D.zeros(), 0) + + assert client.correction_writes == 0 + + +def test_worker_allows_correction_in_replay_mode(): + """Trajectory replay is allowed to receive small correction writes.""" + + worker = ModbusWorker() + client = _FakeClient(ModeCommand.TRAJECTORY_REPLAY) + worker._client = client + + worker.write_correction(Pose6D.zeros(), 0) + + assert client.correction_writes == 1