feat: 完成基础协议的添加

This commit is contained in:
2026-05-13 13:47:27 +08:00
Unverified
parent 18c331e7eb
commit 648c1f1920
11 changed files with 226 additions and 3 deletions
+1 -1
View File
@@ -19,7 +19,7 @@ dev = [
]
[build-system]
requires = ["uv_build>=0.9.5,<0.10.0"]
requires = ["uv_build>=0.11.14,<0.12.0"]
build-backend = "uv_build"
[tool.ruff]
+1 -2
View File
@@ -1,7 +1,6 @@
"""Line laser Modbus protocol package."""
from line_laser_modbus.client import LineLaserClient
from line_laser_modbus.config import SerialConfig
from line_laser_modbus.models import DeviceStatus, ModeCommand, Pose6D
__all__ = ["DeviceStatus", "LineLaserClient", "ModeCommand", "Pose6D", "SerialConfig"]
__all__ = ["DeviceStatus", "ModeCommand", "Pose6D", "SerialConfig"]
+92
View File
@@ -0,0 +1,92 @@
"""Register and RTU frame codec helpers."""
from __future__ import annotations
import struct
from line_laser_modbus.constants import (
AXIS_NAMES,
FUNC_READ_HOLDING_REGISTERS,
FUNC_WRITE_MULTIPLE_REGISTERS,
REGISTER_COUNT_POSE,
SLAVE_ID,
)
from line_laser_modbus.models import Pose6D
def encode_u16(value: int) -> int:
if not 0 <= value <= 0xFFFF:
msg = f"Value out of uint16 range: {value}"
raise ValueError(msg)
return value
def decode_u16(register: int) -> int:
return encode_u16(register)
def encode_f32(value: float) -> list[int]:
# 按协议要求使用大端字节序拆成两个保持寄存器
raw = struct.pack(">f", float(value))
high, low = struct.unpack(">HH", raw)
return [high, low]
def decode_f32(registers: list[int] | tuple[int, int]) -> float:
if len(registers) != 2:
msg = f"float32 requires 2 registers, got {len(registers)}"
raise ValueError(msg)
raw = struct.pack(">HH", encode_u16(registers[0]), encode_u16(registers[1]))
return struct.unpack(">f", raw)[0]
def encode_pose(pose: Pose6D) -> list[int]:
registers: list[int] = []
for value in pose.as_tuple():
registers.extend(encode_f32(value))
return registers
def decode_pose(registers: list[int] | tuple[int, ...]) -> Pose6D:
if len(registers) != REGISTER_COUNT_POSE:
msg = f"Pose requires {REGISTER_COUNT_POSE} registers, got {len(registers)}"
raise ValueError(msg)
values = [decode_f32(registers[index : index + 2]) for index in range(0, len(registers), 2)]
return Pose6D(**dict(zip(AXIS_NAMES, values, strict=True)))
def crc16(data: bytes) -> int:
# Modbus RTU CRC16 低字节先发但整数内部按正常高低位保存
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc & 0xFFFF
def append_crc(data: bytes) -> bytes:
crc = crc16(data)
return data + bytes((crc & 0xFF, crc >> 8))
def build_read_frame(address: int, count: int, slave_id: int = SLAVE_ID) -> bytes:
payload = struct.pack(">BBHH", slave_id, FUNC_READ_HOLDING_REGISTERS, address, count)
return append_crc(payload)
def build_write_frame(address: int, registers: list[int], slave_id: int = SLAVE_ID) -> bytes:
count = len(registers)
body = b"".join(struct.pack(">H", encode_u16(register)) for register in registers)
payload = struct.pack(
">BBHHB",
slave_id,
FUNC_WRITE_MULTIPLE_REGISTERS,
address,
count,
len(body),
)
return append_crc(payload + body)
+37
View File
@@ -0,0 +1,37 @@
"""Runtime configuration."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import tomllib
from line_laser_modbus.constants import (
DEFAULT_BAUDRATE,
DEFAULT_BYTESIZE,
DEFAULT_PARITY,
DEFAULT_RETRIES,
DEFAULT_STOPBITS,
DEFAULT_TIMEOUT_SECONDS,
SLAVE_ID,
)
@dataclass(frozen=True, slots=True)
class SerialConfig:
port: str = "COM1"
slave_id: int = SLAVE_ID
baudrate: int = DEFAULT_BAUDRATE
bytesize: int = DEFAULT_BYTESIZE
parity: str = DEFAULT_PARITY
stopbits: int = DEFAULT_STOPBITS
timeout: float = DEFAULT_TIMEOUT_SECONDS
retries: int = DEFAULT_RETRIES
@classmethod
def from_toml(cls, path: str | Path) -> "SerialConfig":
with Path(path).open("rb") as file:
data = tomllib.load(file)
serial = data.get("serial", {})
return cls(**serial)
+23
View File
@@ -0,0 +1,23 @@
"""Protocol constants from README.md."""
SLAVE_ID = 0x08
FUNC_READ_HOLDING_REGISTERS = 0x03
FUNC_WRITE_MULTIPLE_REGISTERS = 0x10
ADDR_MODE_COMMAND = 0xD000
ADDR_DEVICE_STATUS = 0xD001
ADDR_CURRENT_POSE = 0xD00A
ADDR_TARGET_POSE = 0xD016
ADDR_CORRECTION = 0xD022
REGISTER_COUNT_WORD = 1
REGISTER_COUNT_POSE = 12
AXIS_NAMES = ("x", "y", "z", "a", "b", "c")
DEFAULT_BAUDRATE = 115200
DEFAULT_BYTESIZE = 8
DEFAULT_PARITY = "N"
DEFAULT_STOPBITS = 1
DEFAULT_TIMEOUT_SECONDS = 0.05
DEFAULT_RETRIES = 3
+72
View File
@@ -0,0 +1,72 @@
"""Typed protocol models."""
from __future__ import annotations
from dataclasses import dataclass
from enum import IntEnum
from typing import Self
class ModeCommand(IntEnum):
"""Mode command word at 0xD000."""
STANDBY_RESET = 0
CALIBRATION = 1
PRE_WELD_TEACHING = 2
ONLINE_TRACKING = 3
TRAJECTORY_REPLAY = 4
EMERGENCY_STOP = 5
class DeviceStatus(IntEnum):
"""Device status word at 0xD001."""
STANDBY_READY = 0
RUNNING = 1
TEACHING_DONE = 2
TRACKING_OK = 3
ALARM = 4
CALIBRATION_DONE = 5
EMERGENCY_TRIGGERED = 6
@dataclass(frozen=True, slots=True)
class Pose6D:
"""XYZABC pose or correction vector."""
x: float
y: float
z: float
a: float
b: float
c: float
@classmethod
def zeros(cls) -> Self:
return cls(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
@classmethod
def from_iterable(cls, values: list[float] | tuple[float, ...]) -> Self:
if len(values) != 6:
msg = f"Pose6D requires 6 values, got {len(values)}"
raise ValueError(msg)
return cls(*(float(value) for value in values))
def as_tuple(self) -> tuple[float, float, float, float, float, float]:
return (self.x, self.y, self.z, self.a, self.b, self.c)
def ensure_mode(value: int | ModeCommand) -> ModeCommand:
try:
return ModeCommand(value)
except ValueError as exc:
msg = f"Invalid mode command: {value}"
raise ValueError(msg) from exc
def ensure_status(value: int | DeviceStatus) -> DeviceStatus:
try:
return DeviceStatus(value)
except ValueError as exc:
msg = f"Invalid device status: {value}"
raise ValueError(msg) from exc