docs: 补充中文注释和使用示例

This commit is contained in:
2026-05-13 14:27:58 +08:00
Unverified
parent 0b39ef842e
commit 2f642b3cf7
14 changed files with 301 additions and 24 deletions
+15
View File
@@ -29,6 +29,21 @@ python -m uv run line-laser-modbus --simulate poll-once --target 1 2 3 0 1 2
python -m uv run line-laser-modbus --simulate demo
```
## 示例代码
示例代码在 `examples/` 目录下,优先阅读模拟示例了解库的调用方式:
```powershell
python -m uv run python examples/simulated_basic.py
python -m uv run python examples/simulated_polling.py
```
连接真实串口前先修改 `config.toml`,再运行:
```powershell
python -m uv run python examples/real_serial_read_status.py
```
## TOML 配置
运行前按实际串口修改 `config.toml`。默认会读取当前目录下的 `config.toml`
+22
View File
@@ -0,0 +1,22 @@
# 使用示例
这些示例用于说明 `line_laser_modbus` 作为库时的常见用法
运行前先同步依赖
```powershell
python -m uv sync --default-index https://mirrors.ustc.edu.cn/pypi/simple
```
无硬件环境优先运行模拟示例
```powershell
python -m uv run python examples/simulated_basic.py
python -m uv run python examples/simulated_polling.py
```
连接真实串口前先修改根目录 `config.toml`
```powershell
python -m uv run python examples/real_serial_read_status.py
```
+27
View File
@@ -0,0 +1,27 @@
"""真实串口读取状态示例"""
from pathlib import Path
from line_laser_modbus import AppConfig, LineLaserClient
def main() -> None:
"""从 config.toml 读取串口配置并读取设备状态"""
# 运行这个示例前先修改项目根目录的 config.toml
# port 需要改成现场电脑实际看到的串口名
# Windows 常见格式是 COM1 或 COM3
config = AppConfig.from_toml(Path("config.toml"))
# 真实串口路径不注入 backend
# LineLaserClient 会自动创建 pymodbus 串口客户端
with LineLaserClient(config.serial) as client:
status = client.read_status()
pose = client.read_current_pose()
print("设备状态", status.name)
print("当前位姿", pose)
if __name__ == "__main__":
main()
+43
View File
@@ -0,0 +1,43 @@
"""模拟环境中的基础读写示例"""
from line_laser_modbus import LineLaserClient, ModeCommand, Pose6D, SerialConfig
from line_laser_modbus.simulator import SimulatedModbusBackend
def main() -> None:
"""演示不连接硬件时如何使用客户端"""
# 模拟后端实现了客户端需要的最小 Modbus 接口
# 这让开发和测试可以在没有串口和控制器的机器上运行
backend = SimulatedModbusBackend()
# 端口名在模拟模式下不会真正打开
# 这里保留 SIM 只是让配置含义更清晰
config = SerialConfig(port="SIM")
# 客户端支持上下文管理器
# 进入 with 时连接后端退出 with 时关闭后端
with LineLaserClient(config, backend=backend) as client:
print("初始状态", client.read_status().name)
# write_mode 会直接写寄存器不做状态机校验
# 如果业务需要遵守协议切换规则可以使用 switch_mode
client.write_mode(ModeCommand.ONLINE_TRACKING)
print("当前模式", client.read_mode().name)
# 目标示教位姿和纠偏量都使用 Pose6D 表达
# 字段顺序固定为 X Y Z A B C
target_pose = Pose6D(100.0, 20.0, 30.0, 0.0, 15.0, 90.0)
correction = Pose6D(1.0, 0.0, -0.5, 0.0, 0.0, 2.0)
client.write_target_pose(target_pose)
client.write_correction(correction)
# 模拟后端提供便捷读取方法
# 真实设备运行时通常由控制器消费这些寄存器
print("模拟目标位姿", backend.target_pose())
print("模拟纠偏量", backend.correction())
if __name__ == "__main__":
main()
+38
View File
@@ -0,0 +1,38 @@
"""模拟环境中的单周期轮询示例"""
from line_laser_modbus import DeviceStatus, LineLaserClient, ModeCommand, Pose6D, SerialConfig
from line_laser_modbus.runner import PollingRunner, pose_delta
from line_laser_modbus.simulator import SimulatedModbusBackend
def main() -> None:
"""演示轮询运行器如何读取快照并写入纠偏量"""
# 先让模拟控制器处于在线跟踪模式
# 只有 ONLINE_TRACKING 模式下 PollingRunner 才会写入纠偏寄存器
backend = SimulatedModbusBackend(
mode=ModeCommand.ONLINE_TRACKING,
status=DeviceStatus.TRACKING_OK,
current_pose=Pose6D(10.0, 20.0, 30.0, 0.0, 1.0, 2.0),
)
# 这里假设目标轨迹点是下面这个位姿
# pose_delta 会生成一个函数用于计算 目标位姿减当前位姿
target_pose = Pose6D(11.0, 22.0, 33.0, 0.0, 1.5, 1.0)
correction_provider = pose_delta(target_pose)
with LineLaserClient(SerialConfig(port="SIM"), backend=backend) as client:
runner = PollingRunner(client, correction_provider=correction_provider)
# run_once 只执行一个周期
# 测试和无硬件演示建议用它避免进入无限循环
snapshot = runner.run_once()
print("读取模式", snapshot.mode.name)
print("读取状态", snapshot.status.name)
print("读取当前位姿", snapshot.pose)
print("写入纠偏量", backend.correction())
if __name__ == "__main__":
main()
+1 -1
View File
@@ -1,4 +1,4 @@
"""Line laser Modbus protocol package."""
"""线激光 Modbus 协议包"""
from line_laser_modbus.client import LineLaserClient
from line_laser_modbus.config import AppConfig, PollingConfig, SerialConfig
+3 -1
View File
@@ -1,4 +1,4 @@
"""Command line helpers."""
"""命令行入口"""
from __future__ import annotations
@@ -13,6 +13,8 @@ from line_laser_modbus.simulator import SimulatedModbusBackend
def main() -> None:
"""解析命令行参数并执行对应协议动作"""
parser = argparse.ArgumentParser(prog="line-laser-modbus")
parser.add_argument("--config", default="config.toml", help="TOML config path")
parser.add_argument("--simulate", action="store_true", help="Use in-memory simulator")
+52 -8
View File
@@ -1,4 +1,4 @@
"""High-level Modbus client for the line laser protocol."""
"""线激光 Modbus 协议客户端"""
from __future__ import annotations
@@ -30,19 +30,31 @@ from line_laser_modbus.models import (
class ModbusBackend(Protocol):
def connect(self) -> bool: ...
"""协议客户端需要的最小后端接口"""
def close(self) -> None: ...
def connect(self) -> bool:
"""打开后端连接"""
...
def read_holding_registers(self, address: int, *, count: int, device_id: int): ...
def close(self) -> None:
"""关闭后端连接"""
...
def write_registers(self, address: int, values: list[int], *, device_id: int): ...
def read_holding_registers(self, address: int, *, count: int, device_id: int):
"""从设备读取保持寄存器"""
...
def write_registers(self, address: int, values: list[int], *, device_id: int):
"""向设备写入保持寄存器"""
...
class LineLaserClient:
"""Protocol-specific client with typed read/write helpers."""
"""面向协议语义的类型化读写客户端"""
def __init__(self, config: SerialConfig, backend: ModbusBackend | None = None) -> None:
"""创建真实串口客户端或注入模拟后端"""
self.config = config
self._backend = backend or ModbusSerialClient(
port=config.port,
@@ -55,6 +67,8 @@ class LineLaserClient:
)
def __enter__(self) -> Self:
"""进入上下文管理器时连接后端"""
self.connect()
return self
@@ -64,49 +78,75 @@ class LineLaserClient:
exc: BaseException | None,
traceback: TracebackType | None,
) -> None:
"""退出上下文管理器时关闭后端"""
self.close()
def connect(self) -> None:
"""连接配置中的 Modbus 设备"""
if not self._backend.connect():
msg = f"Unable to connect Modbus device on {self.config.port}"
raise ConnectionError(msg)
def close(self) -> None:
"""关闭当前 Modbus 连接"""
self._backend.close()
def read_mode(self) -> ModeCommand:
"""读取 0xD000 模式命令字"""
return ensure_mode(self._read_word(ADDR_MODE_COMMAND))
def write_mode(self, mode: int | ModeCommand) -> None:
"""不做状态机校验直接写入模式命令字"""
self._write_registers(ADDR_MODE_COMMAND, [encode_u16(ensure_mode(mode).value)])
def switch_mode(self, mode: int | ModeCommand) -> None:
"""按协议状态机规则校验后切换模式"""
current = self.read_mode()
status = self.read_status()
target = validate_mode_switch(current, mode, status)
self.write_mode(target)
def read_status(self) -> DeviceStatus:
"""读取 0xD001 设备状态字"""
return ensure_status(self._read_word(ADDR_DEVICE_STATUS))
def read_current_pose(self) -> Pose6D:
"""读取控制器当前 XYZABC 位姿"""
registers = self._read_registers(ADDR_CURRENT_POSE, REGISTER_COUNT_POSE)
return decode_pose(registers)
def read_snapshot(self) -> DeviceSnapshot:
# 快照按协议关键字段顺序读取,避免上层重复拼装状态
"""读取模式状态和当前位姿组成一次逻辑快照"""
# 快照按协议关键字段顺序读取避免上层重复拼装状态
return DeviceSnapshot(self.read_mode(), self.read_status(), self.read_current_pose())
def write_target_pose(self, pose: Pose6D) -> None:
"""写入示教目标 XYZABC 位姿"""
self._write_registers(ADDR_TARGET_POSE, encode_pose(pose))
def write_correction(self, pose: Pose6D) -> None:
"""写入实时 XYZABC 纠偏量"""
self._write_registers(ADDR_CORRECTION, encode_pose(pose))
def _read_word(self, address: int) -> int:
"""从绝对地址读取单个 uint16 寄存器"""
return decode_u16(self._read_registers(address, 1)[0])
def _read_registers(self, address: int, count: int) -> list[int]:
"""读取并校验连续保持寄存器"""
try:
response = self._backend.read_holding_registers(
address,
@@ -121,6 +161,8 @@ class LineLaserClient:
return [decode_u16(register) for register in response.registers]
def _write_registers(self, address: int, registers: list[int]) -> None:
"""校验并写入连续保持寄存器"""
safe_registers = [encode_u16(register) for register in registers]
try:
response = self._backend.write_registers(
@@ -136,6 +178,8 @@ class LineLaserClient:
@staticmethod
def _raise_on_error(response, message: str) -> None:
# pymodbus 异常响应不是 Python 异常,需要显式判断
""" pymodbus 返回异常响应时抛出运行时错误"""
# pymodbus 的异常响应不是 Python 异常需要显式判断
if response is None or response.isError():
raise RuntimeError(message)
+21 -1
View File
@@ -1,4 +1,4 @@
"""Register and RTU frame codec helpers."""
"""寄存器和 RTU 帧编解码工具"""
from __future__ import annotations
@@ -15,6 +15,8 @@ from line_laser_modbus.models import Pose6D
def encode_u16(value: int) -> int:
"""校验并返回一个可放入单个 Modbus 寄存器的值"""
if not 0 <= value <= 0xFFFF:
msg = f"Value out of uint16 range: {value}"
raise ValueError(msg)
@@ -22,10 +24,14 @@ def encode_u16(value: int) -> int:
def decode_u16(register: int) -> int:
"""校验并解析单个无符号 16 位寄存器"""
return encode_u16(register)
def encode_f32(value: float) -> list[int]:
"""将一个 float32 编码为两个大端寄存器"""
# 按协议要求使用大端字节序拆成两个保持寄存器
raw = struct.pack(">f", float(value))
high, low = struct.unpack(">HH", raw)
@@ -33,6 +39,8 @@ def encode_f32(value: float) -> list[int]:
def decode_f32(registers: list[int] | tuple[int, int]) -> float:
"""将两个大端寄存器解析为一个 float32"""
if len(registers) != 2:
msg = f"float32 requires 2 registers, got {len(registers)}"
raise ValueError(msg)
@@ -41,6 +49,8 @@ def decode_f32(registers: list[int] | tuple[int, int]) -> float:
def encode_pose(pose: Pose6D) -> list[int]:
"""将 XYZABC 位姿编码为十二个保持寄存器"""
registers: list[int] = []
for value in pose.as_tuple():
registers.extend(encode_f32(value))
@@ -48,6 +58,8 @@ def encode_pose(pose: Pose6D) -> list[int]:
def decode_pose(registers: list[int] | tuple[int, ...]) -> Pose6D:
"""将十二个保持寄存器解析为 XYZABC 位姿"""
if len(registers) != REGISTER_COUNT_POSE:
msg = f"Pose requires {REGISTER_COUNT_POSE} registers, got {len(registers)}"
raise ValueError(msg)
@@ -56,6 +68,8 @@ def decode_pose(registers: list[int] | tuple[int, ...]) -> Pose6D:
def crc16(data: bytes) -> int:
"""计算原始帧字节的 Modbus RTU CRC16"""
# Modbus RTU CRC16 低字节先发但整数内部按正常高低位保存
crc = 0xFFFF
for byte in data:
@@ -69,16 +83,22 @@ def crc16(data: bytes) -> int:
def append_crc(data: bytes) -> bytes:
"""给帧载荷追加 Modbus RTU CRC16 字节"""
crc = crc16(data)
return data + bytes((crc & 0xFF, crc >> 8))
def build_read_frame(address: int, count: int, slave_id: int = SLAVE_ID) -> bytes:
"""构造功能码 0x03 的原始读保持寄存器请求"""
payload = struct.pack(">BBHH", slave_id, FUNC_READ_HOLDING_REGISTERS, address, count)
return append_crc(payload)
def build_write_frame(address: int, registers: list[int], slave_id: int = SLAVE_ID) -> bytes:
"""构造功能码 0x10 的原始写多个寄存器请求"""
count = len(registers)
body = b"".join(struct.pack(">H", encode_u16(register)) for register in registers)
payload = struct.pack(
+12 -2
View File
@@ -1,4 +1,4 @@
"""Runtime configuration."""
"""运行配置模型"""
from __future__ import annotations
@@ -19,6 +19,8 @@ from line_laser_modbus.constants import (
@dataclass(frozen=True, slots=True)
class SerialConfig:
"""串口通信配置"""
port: str = "COM1"
slave_id: int = SLAVE_ID
baudrate: int = DEFAULT_BAUDRATE
@@ -30,6 +32,8 @@ class SerialConfig:
@classmethod
def from_toml(cls, path: str | Path) -> SerialConfig:
"""从 TOML 文件读取串口配置"""
with Path(path).open("rb") as file:
data = tomllib.load(file)
serial = data.get("serial", {})
@@ -38,20 +42,26 @@ class SerialConfig:
@dataclass(frozen=True, slots=True)
class PollingConfig:
"""轮询运行配置"""
interval_seconds: float = 0.02
max_timeouts: int = 3
@dataclass(frozen=True, slots=True)
class AppConfig:
"""应用运行配置"""
serial: SerialConfig = SerialConfig()
polling: PollingConfig = PollingConfig()
@classmethod
def from_toml(cls, path: str | Path) -> AppConfig:
"""从 TOML 文件读取完整应用配置"""
with Path(path).open("rb") as file:
data = tomllib.load(file)
# 配置文件只暴露运行需要的最小入口避免协议常量被外部误改
# 配置文件只暴露运行需要的最小入口避免协议常量被外部误改
return cls(
serial=SerialConfig(**data.get("serial", {})),
polling=PollingConfig(**data.get("polling", {})),
+1 -1
View File
@@ -1,4 +1,4 @@
"""Protocol constants from docs/proto.md."""
"""来自 docs/proto.md 的协议常量"""
SLAVE_ID = 0x08
+19 -5
View File
@@ -1,4 +1,4 @@
"""Typed protocol models."""
"""协议类型模型"""
from __future__ import annotations
@@ -8,7 +8,7 @@ from typing import Self
class ModeCommand(IntEnum):
"""Mode command word at 0xD000."""
"""0xD000 模式命令字"""
STANDBY_RESET = 0
CALIBRATION = 1
@@ -19,7 +19,7 @@ class ModeCommand(IntEnum):
class DeviceStatus(IntEnum):
"""Device status word at 0xD001."""
"""0xD001 设备状态字"""
STANDBY_READY = 0
RUNNING = 1
@@ -32,7 +32,7 @@ class DeviceStatus(IntEnum):
@dataclass(frozen=True, slots=True)
class DeviceSnapshot:
"""Current controller state read in one logical cycle."""
"""单次逻辑周期读取到的控制器状态"""
mode: ModeCommand
status: DeviceStatus
@@ -41,7 +41,7 @@ class DeviceSnapshot:
@dataclass(frozen=True, slots=True)
class Pose6D:
"""XYZABC pose or correction vector."""
"""XYZABC 位姿或纠偏向量"""
x: float
y: float
@@ -52,20 +52,28 @@ class Pose6D:
@classmethod
def zeros(cls) -> Self:
"""创建全零位姿"""
return cls(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
@classmethod
def from_iterable(cls, values: list[float] | tuple[float, ...]) -> Self:
"""从六个数值创建 XYZABC 位姿"""
if len(values) != 6:
msg = f"Pose6D requires 6 values, got {len(values)}"
raise ValueError(msg)
return cls(*(float(value) for value in values))
def as_tuple(self) -> tuple[float, float, float, float, float, float]:
"""按协议顺序返回六轴元组"""
return (self.x, self.y, self.z, self.a, self.b, self.c)
def ensure_mode(value: int | ModeCommand) -> ModeCommand:
"""校验并转换模式命令字"""
try:
return ModeCommand(value)
except ValueError as exc:
@@ -74,6 +82,8 @@ def ensure_mode(value: int | ModeCommand) -> ModeCommand:
def ensure_status(value: int | DeviceStatus) -> DeviceStatus:
"""校验并转换设备状态字"""
try:
return DeviceStatus(value)
except ValueError as exc:
@@ -86,6 +96,8 @@ def can_switch_mode(
target: int | ModeCommand,
status: int | DeviceStatus | None = None,
) -> bool:
"""判断模式切换是否符合协议状态机规则"""
current_mode = ensure_mode(current)
target_mode = ensure_mode(target)
current_status = ensure_status(status) if status is not None else None
@@ -109,6 +121,8 @@ def validate_mode_switch(
target: int | ModeCommand,
status: int | DeviceStatus | None = None,
) -> ModeCommand:
"""校验模式切换并返回目标模式"""
target_mode = ensure_mode(target)
if not can_switch_mode(current, target_mode, status):
msg = f"Illegal mode switch: {ensure_mode(current).name} -> {target_mode.name}"
+16 -2
View File
@@ -1,4 +1,4 @@
"""Polling loop helpers for production and simulation."""
"""生产和模拟环境共用的轮询运行器"""
from __future__ import annotations
@@ -15,8 +15,12 @@ SnapshotHandler = Callable[[DeviceSnapshot], None]
def pose_delta(target: Pose6D) -> CorrectionProvider:
"""创建按目标位姿减当前位姿计算纠偏量的函数"""
def calculate(snapshot: DeviceSnapshot) -> Pose6D:
# 纠偏量按目标位姿减当前位姿计算,实际项目可替换为轨迹规划结果
"""根据状态快照计算纠偏量"""
# 纠偏量按目标位姿减当前位姿计算实际项目可替换为轨迹规划结果
return Pose6D.from_iterable(
[
target_value - current
@@ -32,6 +36,8 @@ def pose_delta(target: Pose6D) -> CorrectionProvider:
class PollingRunner:
"""按固定周期读取状态并在跟踪模式下写入纠偏量"""
def __init__(
self,
client: LineLaserClient,
@@ -39,6 +45,8 @@ class PollingRunner:
snapshot_handler: SnapshotHandler | None = None,
config: PollingConfig | None = None,
) -> None:
"""创建轮询运行器"""
self.client = client
self.correction_provider = correction_provider
self.snapshot_handler = snapshot_handler
@@ -46,6 +54,8 @@ class PollingRunner:
self.timeout_count = 0
def run_once(self) -> DeviceSnapshot:
"""执行一次读取和可选纠偏写入"""
snapshot = self.client.read_snapshot()
self.timeout_count = 0
if self.snapshot_handler:
@@ -55,6 +65,8 @@ class PollingRunner:
return snapshot
def run_forever(self) -> None:
"""按配置周期持续运行直到出现不可恢复超时"""
while True:
started = time.monotonic()
try:
@@ -68,5 +80,7 @@ class PollingRunner:
def default_polling_config() -> PollingConfig:
"""按协议默认超时时间生成轮询配置"""
max_timeouts = max(1, round(DEFAULT_TIMEOUT_SECONDS / 0.02))
return PollingConfig(interval_seconds=0.02, max_timeouts=max_timeouts)
+31 -3
View File
@@ -1,4 +1,4 @@
"""In-memory Modbus simulator used by tests and demos."""
"""测试和演示使用的内存 Modbus 模拟器"""
from __future__ import annotations
@@ -18,23 +18,31 @@ from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D
@dataclass(slots=True)
class _ReadResponse:
"""模拟 pymodbus 读响应"""
registers: list[int]
def isError(self) -> bool:
"""返回响应是否为异常响应"""
return False
@dataclass(slots=True)
class _WriteResponse:
"""模拟 pymodbus 写响应"""
address: int
count: int
def isError(self) -> bool:
"""返回响应是否为异常响应"""
return False
class SimulatedModbusBackend:
"""Small pymodbus-compatible backend without hardware."""
"""不依赖硬件的 pymodbus 兼容后端"""
def __init__(
self,
@@ -44,47 +52,67 @@ class SimulatedModbusBackend:
status: DeviceStatus = DeviceStatus.STANDBY_READY,
current_pose: Pose6D | None = None,
) -> None:
"""创建模拟后端并写入初始寄存器值"""
self.slave_id = slave_id
self.connected = False
self.registers: dict[int, int] = {}
self._seed(mode, status, current_pose or Pose6D.zeros())
def connect(self) -> bool:
"""标记模拟后端为已连接"""
self.connected = True
return True
def close(self) -> None:
"""标记模拟后端为已关闭"""
self.connected = False
def read_holding_registers(self, address: int, *, count: int, device_id: int) -> _ReadResponse:
"""从模拟寄存器表读取连续保持寄存器"""
self._ensure_ready(device_id)
return _ReadResponse([self.registers.get(address + offset, 0) for offset in range(count)])
def write_registers(self, address: int, values: list[int], *, device_id: int) -> _WriteResponse:
"""向模拟寄存器表写入连续保持寄存器"""
self._ensure_ready(device_id)
for offset, value in enumerate(values):
self.registers[address + offset] = value
return _WriteResponse(address, len(values))
def target_pose(self) -> Pose6D:
"""读取模拟器中保存的目标示教位姿"""
return self._read_pose(ADDR_TARGET_POSE)
def correction(self) -> Pose6D:
"""读取模拟器中保存的实时纠偏量"""
return self._read_pose(ADDR_CORRECTION)
def _seed(self, mode: ModeCommand, status: DeviceStatus, pose: Pose6D) -> None:
"""写入模拟器初始模式状态和当前位姿"""
self.registers[ADDR_MODE_COMMAND] = mode.value
self.registers[ADDR_DEVICE_STATUS] = status.value
for offset, value in enumerate(encode_pose(pose)):
self.registers[ADDR_CURRENT_POSE + offset] = value
def _read_pose(self, address: int) -> Pose6D:
"""从指定地址读取一组模拟位姿寄存器"""
from line_laser_modbus.codec import decode_pose
return decode_pose([self.registers.get(address + offset, 0) for offset in range(12)])
def _ensure_ready(self, device_id: int) -> None:
# 模拟器也校验从站地址,避免测试漏掉 Unit ID 配置
"""检查连接状态和从站地址"""
# 模拟器也校验从站地址避免测试漏掉 Unit ID 配置
if not self.connected:
raise ConnectionError("Simulated backend is not connected")
if device_id != self.slave_id: