Align HMI with protocol v1.4

This commit is contained in:
2026-06-22 14:39:30 +08:00
Unverified
parent 009ea92e5f
commit 1483d89f49
15 changed files with 344 additions and 50 deletions
+4 -4
View File
@@ -7,11 +7,11 @@
## 功能
- 串口连接配置与自动枚举 COM 口,**模拟模式**无需硬件即可运行
- 6 种工作模式切换(按协议状态机校验)与独立急停按钮
- 设备状态字、时间戳、通信超时实时监控
- 5普通工作模式切换(按协议状态机校验)与独立急停按钮
- 设备状态字、时间戳、可用缓存数量(0xD002)、通信超时实时监控
- 当前 6 轴位姿数码显示与实时趋势曲线
- 手动下发目标示教位姿 / 纠偏量,设置在线跟踪自动纠偏目标
- 50ms 后台轮询(独立线程,不阻塞界面),连接成功后自动启动
- 手动下发目标示教位姿 / 纠偏量,设置在线跟踪自动纠偏目标;手动下发按当前模式限制,目标示教位姿写入前检查 0xD002 可用缓存数量
- 50ms 后台轮询、150ms 单帧超时(独立线程,不阻塞界面),连接成功后自动启动
- 快照录制为 CSV,通信日志独立窗口(菜单「视图 → 日志窗口」,Ctrl+L)可导出
## 安装与运行
+11 -5
View File
@@ -31,11 +31,11 @@ GUI 线程 --请求信号(队列)--> ModbusWorker(子线程) --结果信号(
| 面板 | 模块 | 对应协议库能力 |
| ---- | ---- | -------------- |
| 连接 | `panels/connection_panel.py` | `SerialConfig`、模拟后端开关 |
| 模式控制 | `panels/mode_panel.py` | `switch_mode`(状态机校验)、急停 `write_mode(5)` |
| 状态监控 | `panels/status_panel.py` | `read_status`、时间戳、超时计数 |
| 模式控制 | `panels/mode_panel.py` | `switch_mode`(状态机校验)、急停 `trigger_emergency_stop` |
| 状态监控 | `panels/status_panel.py` | `read_status``read_available_cache_count`时间戳、超时计数 |
| 当前位姿 | `panels/pose_panel.py` | `read_current_pose` |
| 轮询与录制 | `panels/polling_panel.py` | `PollingRunner`、CSV 录制 |
| 手动下发 | `panels/manual_panel.py` | `write_target_pose``write_correction`、跟踪目标 |
| 手动下发 | `panels/manual_panel.py` | `read_available_cache_count``write_target_pose``write_correction`、跟踪目标 |
| 实时曲线 | `panels/chart_panel.py` | 6 轴位姿趋势 |
| 日志窗口 | `panels/log_panel.py``LogWindow` | 通信/错误记录、导出;由菜单「视图 → 日志窗口」(Ctrl+L) 打开 |
@@ -46,14 +46,20 @@ GUI 线程 --请求信号(队列)--> ModbusWorker(子线程) --结果信号(
为满足无硬件演示,`backend.py` 提供 `DynamicSimulatedBackend`
- 读取当前位姿时按正弦规律围绕基准位姿摆动,使数码显示与曲线动起来;
- 写入模式命令时联动刷新设备状态字(如进入在线跟踪后状态变为 `TRACKING_OK`
- 写入模式命令时联动刷新设备状态字(如进入在线跟踪后状态变为 `TRACKING_OK`
- 默认提供 `0xD002` 可用缓存数量为 `1`,便于验证目标示教位姿下发。
勾选连接面板的「模拟模式」即可在没有串口和控制器的机器上完整体验所有功能。
## 5. 状态机与安全
- 普通模式切换只包含 `0~4`,模式 `0` 在界面显示为「手动示教」。
- 普通模式切换走 `switch_mode`,非法切换由协议库抛错并在日志提示。
- 急停按钮走 `force_mode``write_mode`,不做状态机校验),保证任意状态下可立即下发急停。
- 急停按钮走 `force_mode`内部调用 `trigger_emergency_stop`,不做状态机校验),保证任意状态下可立即下发急停。
- 手动下发按钮按当前模式启用:模式 `2` 允许目标示教位姿,模式 `3/4` 允许 6 轴纠偏量,模式 `0/1` 不允许下发控制指令。
- worker 写入前会再次读取当前模式校验,避免按钮状态滞后或信号绕过。
- 模式 `2` 下刷新 `0xD002` 可用缓存数量;目标示教位姿写入前再次读取,数量为 `0` 时界面提示并拒绝下发。
- UI 固定使用协议轮询周期 `50ms` 和单帧超时 `150ms`;轮询连续失败达到阈值后自动断开连接。
## 6. 数据录制
+9 -2
View File
@@ -8,13 +8,18 @@ import time
from line_laser_modbus.client import LineLaserClient, ModbusBackend
from line_laser_modbus.codec import encode_timed_pose
from line_laser_modbus.config import SerialConfig
from line_laser_modbus.constants import ADDR_CURRENT_POSE, ADDR_DEVICE_STATUS, ADDR_MODE_COMMAND
from line_laser_modbus.constants import (
ADDR_AVAILABLE_CACHE_COUNT,
ADDR_CURRENT_POSE,
ADDR_DEVICE_STATUS,
ADDR_MODE_COMMAND,
)
from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D, TimedPose6D
from line_laser_modbus.simulator import SimulatedModbusBackend
# 模式写入后模拟控制器反馈的状态映射,仅用于无硬件演示。
_MODE_TO_STATUS = {
ModeCommand.STANDBY_RESET: DeviceStatus.STANDBY_READY,
ModeCommand.MANUAL_TEACHING: DeviceStatus.IDLE,
ModeCommand.CALIBRATION: DeviceStatus.RUNNING,
ModeCommand.PRE_WELD_TEACHING: DeviceStatus.RUNNING,
ModeCommand.ONLINE_TRACKING: DeviceStatus.TRACKING_OK,
@@ -44,6 +49,8 @@ class DynamicSimulatedBackend(SimulatedModbusBackend):
"""写入模式命令时联动刷新模拟设备状态字"""
response = super().write_registers(address, values, device_id=device_id)
if address == ADDR_AVAILABLE_CACHE_COUNT and values:
self.registers[ADDR_AVAILABLE_CACHE_COUNT] = max(0, values[0])
if address == ADDR_MODE_COMMAND and values:
try:
status = _MODE_TO_STATUS[ModeCommand(values[0])]
+36
View File
@@ -0,0 +1,36 @@
"""Display labels for protocol enum values."""
from __future__ import annotations
from line_laser_modbus.models import DeviceStatus, ModeCommand
MODE_LABELS = {
ModeCommand.MANUAL_TEACHING: "手动示教",
ModeCommand.CALIBRATION: "系统标定",
ModeCommand.PRE_WELD_TEACHING: "焊前扫描示教",
ModeCommand.ONLINE_TRACKING: "在线全轴跟踪",
ModeCommand.TRAJECTORY_REPLAY: "轨迹批量复现",
ModeCommand.EMERGENCY_STOP: "紧急停止",
}
STATUS_LABELS = {
DeviceStatus.IDLE: "空闲",
DeviceStatus.RUNNING: "运动运行中",
DeviceStatus.TEACHING_DONE: "示教完成",
DeviceStatus.TRACKING_OK: "在线跟踪正常",
DeviceStatus.ALARM: "设备报警",
DeviceStatus.CALIBRATION_DONE: "标定完成",
DeviceStatus.EMERGENCY_TRIGGERED: "急停已触发",
}
def mode_label(mode: ModeCommand) -> str:
"""Return the user-facing Chinese label for a mode command."""
return MODE_LABELS.get(mode, mode.name)
def status_label(status: DeviceStatus) -> str:
"""Return the user-facing Chinese label for a device status."""
return STATUS_LABELS.get(status, status.name)
+3
View File
@@ -149,6 +149,7 @@ class MainWindow(QMainWindow):
self._worker.connected.connect(self._on_connected)
self._worker.disconnected.connect(self._on_disconnected)
self._worker.snapshotReady.connect(self._on_snapshot)
self._worker.cacheCountReady.connect(self._status.set_cache_count)
self._worker.writeAck.connect(lambda msg: self._log.append(msg))
self._worker.errorOccurred.connect(lambda msg: self._log.append(msg, "错误"))
self._worker.timeoutCount.connect(self._status.set_timeout)
@@ -171,6 +172,7 @@ class MainWindow(QMainWindow):
self._connection.set_connected(False, "未连接")
self._status.set_connection(False, "未连接")
self._status.set_cache_count(None)
self._set_controls_enabled(False)
self._on_stop_record()
self._log.append("已断开连接")
@@ -181,6 +183,7 @@ class MainWindow(QMainWindow):
self._status.update_snapshot(snapshot)
self._pose.update_pose(snapshot.pose)
self._mode.set_current_mode(snapshot.mode)
self._manual.set_mode(snapshot.mode)
self._chart.append(snapshot.pose)
if self._recorder.is_recording:
self._recorder.write(snapshot)
+62 -1
View File
@@ -6,7 +6,8 @@ from collections import deque
import pyqtgraph as pg
from line_laser_modbus.models import Pose6D
from PySide6.QtWidgets import QGroupBox, QVBoxLayout, QWidget
from PySide6.QtCore import QSignalBlocker
from PySide6.QtWidgets import QCheckBox, QGroupBox, QHBoxLayout, QPushButton, QVBoxLayout, QWidget
# 曲线缓冲长度,按 50ms 周期约保留最近 30 秒数据。
_BUFFER = 600
@@ -27,11 +28,27 @@ class ChartPanel(QGroupBox):
self._samples = 0
self._x: deque[int] = deque(maxlen=_BUFFER)
self._curves: list[tuple[deque[float], pg.PlotDataItem]] = []
self._plots: list[pg.PlotWidget] = []
self._follow_latest = True
self._applying_range = False
self._follow = QCheckBox("跟随最新")
self._follow.setChecked(True)
self._follow.toggled.connect(self._on_follow_toggled)
self._reset = QPushButton("重置视图")
self._reset.clicked.connect(self._restore_follow_latest)
position_plot = self._make_plot("位置 (mm)", _POSITION)
attitude_plot = self._make_plot("姿态 (°)", _ATTITUDE)
controls = QHBoxLayout()
controls.addWidget(self._follow)
controls.addWidget(self._reset)
controls.addStretch(1)
layout = QVBoxLayout(self)
layout.addLayout(controls)
layout.addWidget(position_plot)
layout.addWidget(attitude_plot)
@@ -44,6 +61,8 @@ class ChartPanel(QGroupBox):
for value, (buffer, curve) in zip(pose.as_tuple(), self._curves, strict=True):
buffer.append(value)
curve.setData(x, list(buffer))
if self._follow_latest:
self._apply_follow_range()
def clear(self) -> None:
"""清空所有曲线缓冲"""
@@ -53,6 +72,7 @@ class ChartPanel(QGroupBox):
for buffer, curve in self._curves:
buffer.clear()
curve.setData([], [])
self._restore_follow_latest()
def _make_plot(self, title: str, axes: tuple[tuple[str, str], ...]) -> pg.PlotWidget:
"""创建一个带图例的曲线图并登记其曲线"""
@@ -61,7 +81,48 @@ class ChartPanel(QGroupBox):
plot.addLegend()
plot.showGrid(x=True, y=True, alpha=0.3)
plot.setMinimumHeight(160)
plot.getViewBox().sigRangeChangedManually.connect(self._on_manual_range_changed)
self._plots.append(plot)
for name, color in axes:
curve = plot.plot(pen=pg.mkPen(color, width=2), name=name)
self._curves.append((deque(maxlen=_BUFFER), curve))
return plot
def _on_follow_toggled(self, checked: bool) -> None:
"""切换是否让视图自动跟随最新数据"""
self._follow_latest = checked
if checked:
self._apply_follow_range()
def _restore_follow_latest(self) -> None:
"""恢复默认的跟随最新视图"""
self._follow_latest = True
with QSignalBlocker(self._follow):
self._follow.setChecked(True)
self._apply_follow_range()
def _on_manual_range_changed(self, *_args) -> None:
"""用户拖动或缩放后暂停自动跟随"""
if self._applying_range:
return
self._follow_latest = False
with QSignalBlocker(self._follow):
self._follow.setChecked(False)
def _apply_follow_range(self) -> None:
"""将 X 轴滚动到最新采样并让 Y 轴自动适配"""
if not self._plots:
return
start = max(0, self._samples - _BUFFER)
stop = max(start + 1, self._samples - 1)
self._applying_range = True
try:
for plot in self._plots:
plot.setXRange(start, stop, padding=0)
plot.enableAutoRange(axis=pg.ViewBox.YAxis, enable=True)
finally:
self._applying_range = False
@@ -3,6 +3,7 @@
from __future__ import annotations
from line_laser_modbus.config import SerialConfig
from line_laser_modbus.constants import DEFAULT_TIMEOUT_SECONDS
from PySide6.QtCore import Signal
from PySide6.QtWidgets import (
QCheckBox,
@@ -50,10 +51,9 @@ class ConnectionPanel(QGroupBox):
self._slave_id.setValue(8)
self._timeout = QDoubleSpinBox()
self._timeout.setRange(0.01, 5.0)
self._timeout.setSingleStep(0.05)
self._timeout.setRange(DEFAULT_TIMEOUT_SECONDS, DEFAULT_TIMEOUT_SECONDS)
self._timeout.setDecimals(2)
self._timeout.setValue(0.15)
self._timeout.setValue(DEFAULT_TIMEOUT_SECONDS)
self._retries = QSpinBox()
self._retries.setRange(0, 10)
@@ -94,7 +94,7 @@ class ConnectionPanel(QGroupBox):
port=self._port.currentText().strip() or "COM1",
slave_id=self._slave_id.value(),
baudrate=int(self._baudrate.currentText()),
timeout=self._timeout.value(),
timeout=DEFAULT_TIMEOUT_SECONDS,
retries=self._retries.value(),
)
@@ -112,7 +112,7 @@ class ConnectionPanel(QGroupBox):
self._port.setCurrentText(serial.port)
self._baudrate.setCurrentText(str(serial.baudrate))
self._slave_id.setValue(serial.slave_id)
self._timeout.setValue(serial.timeout)
self._timeout.setValue(DEFAULT_TIMEOUT_SECONDS)
self._retries.setValue(serial.retries)
self._simulate.setChecked(config.ui.simulate)
+25 -4
View File
@@ -2,7 +2,7 @@
from __future__ import annotations
from line_laser_modbus.models import Pose6D
from line_laser_modbus.models import ModeCommand, Pose6D
from PySide6.QtCore import Signal
from PySide6.QtWidgets import (
QDoubleSpinBox,
@@ -32,6 +32,8 @@ class ManualPanel(QGroupBox):
super().__init__("手动下发", parent)
self._inputs: list[QDoubleSpinBox] = []
self._connected = False
self._mode: ModeCommand | None = None
grid = QGridLayout()
for index, (name, unit) in enumerate(_AXES):
@@ -78,9 +80,16 @@ class ManualPanel(QGroupBox):
def set_controls_enabled(self, enabled: bool) -> None:
"""连接状态变化时启用或禁用下发按钮"""
self._target_button.setEnabled(enabled)
self._correction_button.setEnabled(enabled)
self._track_button.setEnabled(enabled)
self._connected = enabled
if not enabled:
self._mode = None
self._apply_protocol_mode()
def set_mode(self, mode: ModeCommand) -> None:
"""按当前协议模式启用允许的手动下发动作"""
self._mode = mode
self._apply_protocol_mode()
def _emit_target(self) -> None:
"""发出目标示教位姿下发请求"""
@@ -96,3 +105,15 @@ class ManualPanel(QGroupBox):
"""发出设置自动跟踪目标请求"""
self.setTrackingTarget.emit(self.pose())
def _apply_protocol_mode(self) -> None:
"""按 V1.4 通信行为限制手动控制按钮"""
target_enabled = self._connected and self._mode is ModeCommand.PRE_WELD_TEACHING
correction_enabled = self._connected and self._mode in {
ModeCommand.ONLINE_TRACKING,
ModeCommand.TRAJECTORY_REPLAY,
}
self._target_button.setEnabled(target_enabled)
self._correction_button.setEnabled(correction_enabled)
self._track_button.setEnabled(self._connected)
+7 -5
View File
@@ -13,13 +13,15 @@ from PySide6.QtWidgets import (
QWidget,
)
from line_laser_hmi.labels import mode_label
# 可由界面主动切换的运行模式(急停单独用醒目按钮处理)。
_MODE_BUTTONS = (
(ModeCommand.STANDBY_RESET, "待机复位"),
(ModeCommand.MANUAL_TEACHING, "手动示教"),
(ModeCommand.CALIBRATION, "系统标定"),
(ModeCommand.PRE_WELD_TEACHING, "焊前示教"),
(ModeCommand.ONLINE_TRACKING, "在线跟踪"),
(ModeCommand.TRAJECTORY_REPLAY, "轨迹复现"),
(ModeCommand.PRE_WELD_TEACHING, "焊前扫描示教"),
(ModeCommand.ONLINE_TRACKING, "在线全轴跟踪"),
(ModeCommand.TRAJECTORY_REPLAY, "轨迹批量复现"),
)
@@ -61,7 +63,7 @@ class ModePanel(QGroupBox):
def set_current_mode(self, mode: ModeCommand) -> None:
"""高亮当前模式按钮并更新文字"""
self._current.setText(f"当前模式:{mode.name}")
self._current.setText(f"当前模式:{mode_label(mode)}")
for value, button in self._buttons.items():
highlight = value is mode
button.setStyleSheet(
+11 -8
View File
@@ -2,6 +2,7 @@
from __future__ import annotations
from line_laser_modbus.constants import DEFAULT_POLLING_INTERVAL_SECONDS
from PySide6.QtCore import Signal
from PySide6.QtWidgets import (
QFormLayout,
@@ -14,9 +15,11 @@ from PySide6.QtWidgets import (
QWidget,
)
_PROTOCOL_POLLING_INTERVAL_MS = round(DEFAULT_POLLING_INTERVAL_SECONDS * 1000)
class PollingPanel(QGroupBox):
"""控制后台轮询周期与 CSV 录制"""
"""控制后台轮询与 CSV 录制"""
startPolling = Signal(int)
stopPolling = Signal()
@@ -31,10 +34,10 @@ class PollingPanel(QGroupBox):
self._recording = False
self._interval = QSpinBox()
self._interval.setRange(10, 5000)
self._interval.setSingleStep(10)
self._interval.setValue(50)
self._interval.setRange(_PROTOCOL_POLLING_INTERVAL_MS, _PROTOCOL_POLLING_INTERVAL_MS)
self._interval.setValue(_PROTOCOL_POLLING_INTERVAL_MS)
self._interval.setSuffix(" ms")
self._interval.setEnabled(False)
self._poll_button = QPushButton("开始轮询")
self._poll_button.clicked.connect(self._on_poll)
@@ -59,9 +62,9 @@ class PollingPanel(QGroupBox):
self.set_controls_enabled(False)
def interval(self) -> int:
"""返回当前设置的轮询周期(毫秒)"""
"""返回协议固定轮询周期(毫秒)"""
return self._interval.value()
return _PROTOCOL_POLLING_INTERVAL_MS
def set_controls_enabled(self, enabled: bool) -> None:
"""连接状态变化时启用或禁用轮询控件"""
@@ -77,7 +80,7 @@ class PollingPanel(QGroupBox):
self._polling = polling
self._poll_button.setText("停止轮询" if polling else "开始轮询")
self._interval.setEnabled(not polling)
self._interval.setEnabled(False)
def set_recording(self, recording: bool, message: str) -> None:
"""更新录制按钮和状态文字"""
@@ -98,7 +101,7 @@ class PollingPanel(QGroupBox):
if self._polling:
self.stopPolling.emit()
else:
self.startPolling.emit(self._interval.value())
self.startPolling.emit(_PROTOCOL_POLLING_INTERVAL_MS)
def _on_record(self) -> None:
"""录制按钮点击时发出开始或停止请求"""
+18 -4
View File
@@ -5,9 +5,11 @@ from __future__ import annotations
from line_laser_modbus.models import DeviceSnapshot, DeviceStatus
from PySide6.QtWidgets import QFormLayout, QGroupBox, QLabel, QWidget
from line_laser_hmi.labels import mode_label, status_label
# 异常类状态用红色,正常运行类用绿色,其余用中性色。
_STATUS_COLORS = {
DeviceStatus.STANDBY_READY: "#455a64",
DeviceStatus.IDLE: "#455a64",
DeviceStatus.RUNNING: "#2e7d32",
DeviceStatus.TEACHING_DONE: "#1565c0",
DeviceStatus.TRACKING_OK: "#2e7d32",
@@ -18,7 +20,7 @@ _STATUS_COLORS = {
class StatusPanel(QGroupBox):
"""显示模式、设备状态字、时间戳和通信超时计数"""
"""显示模式、设备状态字、时间戳、缓存数量和通信超时计数"""
def __init__(self, parent: QWidget | None = None) -> None:
"""构建状态监控标签"""
@@ -29,6 +31,7 @@ class StatusPanel(QGroupBox):
self._status = QLabel("")
self._status.setStyleSheet("color: white; background: #455a64; padding: 4px;")
self._timestamp = QLabel("")
self._cache = QLabel("")
self._timeout = QLabel("0")
self._connection = QLabel("未连接")
@@ -36,18 +39,29 @@ class StatusPanel(QGroupBox):
form.addRow("模式", self._mode)
form.addRow("设备状态", self._status)
form.addRow("时间戳(ms)", self._timestamp)
form.addRow("可用缓存数量", self._cache)
form.addRow("连续超时", self._timeout)
form.addRow("连接", self._connection)
def update_snapshot(self, snapshot: DeviceSnapshot) -> None:
"""用一次快照刷新模式、状态和时间戳"""
self._mode.setText(snapshot.mode.name)
self._mode.setText(mode_label(snapshot.mode))
color = _STATUS_COLORS.get(snapshot.status, "#455a64")
self._status.setText(snapshot.status.name)
self._status.setText(status_label(snapshot.status))
self._status.setStyleSheet(f"color: white; background: {color}; padding: 4px;")
self._timestamp.setText(str(snapshot.timestamp))
def set_cache_count(self, count: int | None) -> None:
"""刷新 0xD002 可用缓存数量,0 时标红提醒不可下发示教位姿"""
if count is None:
self._cache.setText("")
self._cache.setStyleSheet("")
return
self._cache.setText(str(count))
self._cache.setStyleSheet("color: #c62828; font-weight: bold;" if count <= 0 else "")
def set_timeout(self, count: int) -> None:
"""刷新连续超时计数,非零时标红"""
+53 -11
View File
@@ -6,11 +6,13 @@ import contextlib
from line_laser_modbus.client import LineLaserClient
from line_laser_modbus.config import SerialConfig
from line_laser_modbus.constants import DEFAULT_POLLING_INTERVAL_SECONDS
from line_laser_modbus.models import ModeCommand, Pose6D, validate_mode_switch
from line_laser_modbus.runner import PollingRunner, pose_delta
from line_laser_modbus.runner import PollingRunner, default_polling_config, pose_delta
from PySide6.QtCore import QObject, QTimer, Signal, Slot
from line_laser_hmi.backend import build_client
from line_laser_hmi.labels import mode_label
class ModbusWorker(QObject):
@@ -23,6 +25,7 @@ class ModbusWorker(QObject):
connected = Signal(bool, str)
disconnected = Signal()
snapshotReady = Signal(object)
cacheCountReady = Signal(object)
writeAck = Signal(str)
errorOccurred = Signal(str)
timeoutCount = Signal(int)
@@ -36,6 +39,7 @@ class ModbusWorker(QObject):
self._runner: PollingRunner | None = None
self._target: Pose6D | None = None
self._timeouts = 0
self._max_timeouts = default_polling_config().max_timeouts
@Slot(object, bool)
def connect_device(self, serial: SerialConfig, simulate: bool) -> None:
@@ -53,7 +57,6 @@ class ModbusWorker(QObject):
self._runner = PollingRunner(
client,
correction_provider=self._provide_correction,
snapshot_handler=self.snapshotReady.emit,
)
self._timer = QTimer(self)
self._timer.timeout.connect(self._tick)
@@ -75,7 +78,7 @@ class ModbusWorker(QObject):
self.errorOccurred.emit("未连接,无法启动轮询")
return
self._timeouts = 0
self._timer.setInterval(max(1, interval_ms))
self._timer.setInterval(round(DEFAULT_POLLING_INTERVAL_SECONDS * 1000))
self._timer.start()
@Slot()
@@ -100,7 +103,7 @@ class ModbusWorker(QObject):
except Exception as exc: # noqa: BLE001 - 切换失败回传给界面提示
self.errorOccurred.emit(f"模式切换失败:{exc}")
return
self.writeAck.emit(f"切换模式 -> {target.name}")
self.writeAck.emit(f"切换模式 -> {mode_label(target)}")
@Slot(int)
def force_mode(self, mode_value: int) -> None:
@@ -110,11 +113,15 @@ class ModbusWorker(QObject):
self.errorOccurred.emit("未连接,无法下发模式")
return
try:
self._client.write_mode(mode_value)
mode = ModeCommand(mode_value)
if mode is ModeCommand.EMERGENCY_STOP:
self._client.trigger_emergency_stop()
else:
self._client.write_mode(mode)
except Exception as exc: # noqa: BLE001 - 写入失败回传给界面提示
self.errorOccurred.emit(f"模式下发失败:{exc}")
return
self.writeAck.emit(f"强制模式 -> {ModeCommand(mode_value).name}")
self.writeAck.emit(f"强制模式 -> {mode_label(mode)}")
@Slot(object, int)
def write_target(self, pose: Pose6D, timestamp: int) -> None:
@@ -124,6 +131,15 @@ class ModbusWorker(QObject):
self.errorOccurred.emit("未连接,无法下发示教位姿")
return
try:
mode = self._client.read_mode()
if mode is not ModeCommand.PRE_WELD_TEACHING:
self.errorOccurred.emit("当前模式不允许下发示教位姿,仅焊前扫描示教模式(2)允许")
return
cache_count = self._client.read_available_cache_count()
self.cacheCountReady.emit(cache_count)
if cache_count <= 0:
self.errorOccurred.emit("目标位姿缓存已满,0xD002 可用缓存数量为 0")
return
self._client.write_target_pose(pose, timestamp=timestamp)
except Exception as exc: # noqa: BLE001 - 写入失败回传给界面提示
self.errorOccurred.emit(f"示教位姿下发失败:{exc}")
@@ -138,6 +154,10 @@ class ModbusWorker(QObject):
self.errorOccurred.emit("未连接,无法下发纠偏量")
return
try:
mode = self._client.read_mode()
if mode not in {ModeCommand.ONLINE_TRACKING, ModeCommand.TRAJECTORY_REPLAY}:
self.errorOccurred.emit("当前模式不允许下发纠偏量,仅在线跟踪(3)和轨迹复现(4)允许")
return
self._client.write_correction(pose, timestamp=timestamp)
except Exception as exc: # noqa: BLE001 - 写入失败回传给界面提示
self.errorOccurred.emit(f"纠偏量下发失败:{exc}")
@@ -163,14 +183,14 @@ class ModbusWorker(QObject):
if self._runner is None:
return
try:
self._runner.run_once()
snapshot = self._runner.run_once()
self.snapshotReady.emit(snapshot)
self._emit_cache_count(snapshot.mode)
except TimeoutError:
self._timeouts += 1
self.timeoutCount.emit(self._timeouts)
self.errorOccurred.emit("通信超时")
self._handle_polling_failure("通信超时")
return
except Exception as exc: # noqa: BLE001 - 周期异常回传但不中断定时器
self.errorOccurred.emit(f"轮询异常:{exc}")
self._handle_polling_failure(f"轮询异常:{exc}")
return
if self._timeouts:
self._timeouts = 0
@@ -190,3 +210,25 @@ class ModbusWorker(QObject):
self._client = None
self._runner = None
self._timeouts = 0
def _emit_cache_count(self, mode: ModeCommand) -> None:
"""仅在示教模式读取并回传 0xD002 可用缓存数量"""
if self._client is None:
return
if mode is not ModeCommand.PRE_WELD_TEACHING:
self.cacheCountReady.emit(None)
return
self.cacheCountReady.emit(self._client.read_available_cache_count())
def _handle_polling_failure(self, message: str) -> None:
"""累计轮询通信失败,多次失败后断开连接"""
self._timeouts += 1
self.timeoutCount.emit(self._timeouts)
self.errorOccurred.emit(message)
if self._timeouts < self._max_timeouts:
return
self.errorOccurred.emit(f"连续通信失败 {self._timeouts} 次,已断开连接")
self._teardown()
self.disconnected.emit()
+10 -1
View File
@@ -36,6 +36,15 @@ def test_write_mode_updates_status():
client.connect()
client.write_mode(ModeCommand.ONLINE_TRACKING)
assert client.read_status() is DeviceStatus.TRACKING_OK
client.write_mode(ModeCommand.EMERGENCY_STOP)
client.trigger_emergency_stop()
assert client.read_status() is DeviceStatus.EMERGENCY_TRIGGERED
client.close()
def test_available_cache_count_is_readable():
"""模拟后端应暴露 V1.4 的 0xD002 可用缓存数量"""
client = build_client(SerialConfig(port="SIM"), simulate=True)
client.connect()
assert client.read_available_cache_count() == 1
client.close()
+15
View File
@@ -0,0 +1,15 @@
"""Protocol enum display labels."""
from __future__ import annotations
from line_laser_modbus.models import DeviceStatus, ModeCommand
from line_laser_hmi.labels import mode_label, status_label
def test_protocol_labels_follow_v14_names():
"""UI labels should use the V1.4 mode/status wording."""
assert mode_label(ModeCommand.MANUAL_TEACHING) == "手动示教"
assert mode_label(ModeCommand.EMERGENCY_STOP) == "紧急停止"
assert status_label(DeviceStatus.IDLE) == "空闲"
+75
View File
@@ -0,0 +1,75 @@
"""Worker-side protocol guards."""
from __future__ import annotations
from line_laser_modbus.models import ModeCommand, Pose6D
from line_laser_hmi.worker import ModbusWorker
class _FakeClient:
def __init__(self, mode: ModeCommand, *, cache_count: int = 1) -> None:
self.mode = mode
self.cache_count = cache_count
self.target_writes = 0
self.correction_writes = 0
def read_mode(self) -> ModeCommand:
return self.mode
def read_available_cache_count(self) -> int:
return self.cache_count
def write_target_pose(self, pose: Pose6D, *, timestamp: int = 0) -> None:
self.target_writes += 1
def write_correction(self, pose: Pose6D, *, timestamp: int = 0) -> None:
self.correction_writes += 1
def test_worker_rejects_target_pose_outside_teaching_mode():
"""Target pose writes are only allowed in pre-weld teaching mode."""
worker = ModbusWorker()
client = _FakeClient(ModeCommand.MANUAL_TEACHING)
worker._client = client
worker.write_target(Pose6D.zeros(), 0)
assert client.target_writes == 0
def test_worker_allows_target_pose_in_teaching_mode():
"""Target pose writes pass when mode 2 has available cache."""
worker = ModbusWorker()
client = _FakeClient(ModeCommand.PRE_WELD_TEACHING)
worker._client = client
worker.write_target(Pose6D.zeros(), 0)
assert client.target_writes == 1
def test_worker_rejects_correction_outside_tracking_modes():
"""Correction writes are only allowed in mode 3 or 4."""
worker = ModbusWorker()
client = _FakeClient(ModeCommand.PRE_WELD_TEACHING)
worker._client = client
worker.write_correction(Pose6D.zeros(), 0)
assert client.correction_writes == 0
def test_worker_allows_correction_in_replay_mode():
"""Trajectory replay is allowed to receive small correction writes."""
worker = ModbusWorker()
client = _FakeClient(ModeCommand.TRAJECTORY_REPLAY)
worker._client = client
worker.write_correction(Pose6D.zeros(), 0)
assert client.correction_writes == 1