impress_sig_mesh_hacs/custom_components/sigmesh_gateway/serial_reader.py
impressionyang 93778ef861 fix: 改进串口写入错误日志
1. 移除 write_command 中的 try-catch,让错误向上抛出
2. 在 write 方法中添加更详细的错误日志
3. 直接调用 _serial.write() 而不是先检查 is_connected
2026-04-16 17:14:22 +08:00

328 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""SigMesh Gateway 串口读取器模块."""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Callable
import serial
import serial.tools.list_ports
from homeassistant.core import HomeAssistant
from .const import (
DEFAULT_BAUDRATE,
SERIAL_EVENT_PREFIX,
SERIAL_MESH_RECV,
SERIAL_PROV_DEVICE_JOINED,
SERIAL_PROV_DEVICE_LEFT,
)
_LOGGER = logging.getLogger(__name__)
@dataclass
class SerialDataEvent:
"""串口数据事件."""
timestamp: datetime
raw_data: bytes
decoded_line: str
@dataclass
class MeshMessageEvent:
"""Mesh 消息事件."""
timestamp: datetime
src_address: str
dst_address: str
opcode: int
payload: bytes
@dataclass
class ProvDeviceEvent:
"""配网设备事件."""
timestamp: datetime
event_type: str # "joined" or "left"
mac_address: str
element_count: int | None = None
class SerialReader:
"""异步串口读取器."""
def __init__(
self,
device: str,
baudrate: int = DEFAULT_BAUDRATE,
bytesize: int = 8,
parity: str = "N",
stopbits: int = 1,
timeout: float = 0.1,
) -> None:
"""初始化串口读取器."""
self.device = device
self.baudrate = baudrate
self.bytesize = bytesize
self.parity = parity
self.stopbits = stopbits
self.timeout = timeout
self._serial: serial.Serial | None = None
self._running = False
self._read_task: asyncio.Task | None = None
self._buffer = bytearray()
# 回调函数
self._on_data_callback: Callable[[SerialDataEvent], None] | None = None
self._on_mesh_message_callback: Callable[[MeshMessageEvent], None] | None = None
self._on_prov_device_callback: Callable[[ProvDeviceEvent], None] | None = None
self._on_disconnect_callback: Callable[[], None] | None = None
@property
def is_connected(self) -> bool:
"""检查串口是否已连接."""
return self._serial is not None and self._serial.is_open
def set_callbacks(
self,
on_data: Callable[[SerialDataEvent], None] | None = None,
on_mesh_message: Callable[[MeshMessageEvent], None] | None = None,
on_prov_device: Callable[[ProvDeviceEvent], None] | None = None,
on_disconnect: Callable[[], None] | None = None,
) -> None:
"""设置回调函数."""
self._on_data_callback = on_data
self._on_mesh_message_callback = on_mesh_message
self._on_prov_device_callback = on_prov_device
self._on_disconnect_callback = on_disconnect
def _parse_event_line(self, line: str) -> None:
"""解析事件行."""
if not line.startswith(SERIAL_EVENT_PREFIX):
return
_LOGGER.debug("解析事件:%s", line)
# 触发通用数据回调
if self._on_data_callback:
self._on_data_callback(
SerialDataEvent(
timestamp=datetime.now(),
raw_data=line.encode(),
decoded_line=line,
)
)
# Mesh 消息接收
if line.startswith(SERIAL_MESH_RECV):
self._parse_mesh_message(line)
# 设备加入
elif line.startswith(SERIAL_PROV_DEVICE_JOINED):
self._parse_prov_device_joined(line)
# 设备离开
elif line.startswith(SERIAL_PROV_DEVICE_LEFT):
self._parse_prov_device_left(line)
def _parse_mesh_message(self, line: str) -> None:
"""解析 Mesh 消息."""
# 格式:+EVENT=MESH,recv,<src>,<dst>,<opcode>,<hex_payload>
try:
parts = line.split(",")
if len(parts) < 5:
_LOGGER.warning("无效的 Mesh 消息格式:%s", line)
return
src_addr = parts[2]
dst_addr = parts[3]
opcode = int(parts[4], 16) if parts[4].startswith("0x") else int(parts[4])
# 解析 payload (十六进制字符串)
payload_hex = parts[5] if len(parts) > 5 else ""
payload = bytes.fromhex(payload_hex) if payload_hex else b""
if self._on_mesh_message_callback:
self._on_mesh_message_callback(
MeshMessageEvent(
timestamp=datetime.now(),
src_address=src_addr,
dst_address=dst_addr,
opcode=opcode,
payload=payload,
)
)
except (ValueError, IndexError) as e:
_LOGGER.error("解析 Mesh 消息失败:%s, 错误:%s", line, e)
def _parse_prov_device_joined(self, line: str) -> None:
"""解析设备加入事件."""
# 格式:+EVENT=PROV,device_joined,<mac>,<element_count>
try:
parts = line.split(",")
if len(parts) < 4:
_LOGGER.warning("无效的设备加入格式:%s", line)
return
mac_addr = parts[2]
element_count = int(parts[3])
if self._on_prov_device_callback:
self._on_prov_device_callback(
ProvDeviceEvent(
timestamp=datetime.now(),
event_type="joined",
mac_address=mac_addr,
element_count=element_count,
)
)
except (ValueError, IndexError) as e:
_LOGGER.error("解析设备加入事件失败:%s, 错误:%s", line, e)
def _parse_prov_device_left(self, line: str) -> None:
"""解析设备离开事件."""
# 格式:+EVENT=PROV,device_left,<mac>
try:
parts = line.split(",")
if len(parts) < 3:
_LOGGER.warning("无效的设备离开格式:%s", line)
return
mac_addr = parts[2]
if self._on_prov_device_callback:
self._on_prov_device_callback(
ProvDeviceEvent(
timestamp=datetime.now(),
event_type="left",
mac_address=mac_addr,
element_count=None,
)
)
except (ValueError, IndexError) as e:
_LOGGER.error("解析设备离开事件失败:%s, 错误:%s", line, e)
async def connect(self) -> None:
"""连接串口."""
if self.is_connected:
_LOGGER.warning("串口已连接")
return
try:
self._serial = serial.serial_for_url(
self.device,
baudrate=self.baudrate,
bytesize=self.bytesize,
parity=self.parity,
stopbits=self.stopbits,
timeout=self.timeout,
exclusive=True,
)
self._running = True
_LOGGER.info(
"串口已连接:%s, 波特率:%d",
self.device,
self.baudrate,
)
except serial.SerialException as e:
_LOGGER.error("串口连接失败:%s, 错误:%s", self.device, e)
raise
async def disconnect(self) -> None:
"""断开串口连接."""
self._running = False
if self._read_task:
self._read_task.cancel()
try:
await self._read_task
except asyncio.CancelledError:
pass
self._read_task = None
if self._serial:
self._serial.close()
self._serial = None
_LOGGER.info("串口已断开:%s", self.device)
async def start_reading(self) -> None:
"""开始读取串口数据."""
if not self.is_connected:
raise RuntimeError("串口未连接")
self._read_task = asyncio.create_task(self._read_loop())
async def _read_loop(self) -> None:
"""串口读取循环."""
_LOGGER.debug("开始串口读取循环")
while self._running:
try:
if self._serial and self._serial.in_waiting:
data = self._serial.read(self._serial.in_waiting)
self._buffer.extend(data)
# 按行处理
while b"\r\n" in self._buffer:
line_bytes, self._buffer = self._buffer.split(b"\r\n", 1)
try:
line = line_bytes.decode("utf-8").strip()
_LOGGER.debug("串口接收:%s", line)
_LOGGER.debug("串口接收原始数据:%s", line_bytes.hex().upper())
self._parse_event_line(line)
except UnicodeDecodeError as e:
_LOGGER.warning("解码失败:%s, 错误:%s", line_bytes, e)
await asyncio.sleep(0.01) # 避免 CPU 占用过高
except asyncio.CancelledError:
_LOGGER.debug("读取循环被取消")
break
except Exception as e:
_LOGGER.error("读取循环错误:%s", e)
if self._on_disconnect_callback:
self._on_disconnect_callback()
break
_LOGGER.debug("读取循环结束")
async def write(self, data: bytes) -> int:
"""写入数据到串口."""
_LOGGER.debug("write 方法is_connected=%s, _serial=%s", self.is_connected, self._serial)
if self._serial is None:
_LOGGER.error("串口对象未初始化")
raise RuntimeError("串口未连接")
try:
result = self._serial.write(data)
_LOGGER.debug("串口写入成功:%d 字节", result)
return result
except Exception as e:
_LOGGER.error("串口写入失败:%s (is_open=%s)", e, self._serial.is_open)
raise
async def write_command(self, command: str) -> int:
"""写入 AT 命令."""
cmd_bytes = f"{command}\r\n".encode()
_LOGGER.debug("发送命令:%s (原始数据:%s)", command, cmd_bytes.hex())
try:
result = await self.write(cmd_bytes)
_LOGGER.debug("命令发送成功,发送了 %d 字节", result)
return result
except Exception as e:
_LOGGER.error("命令发送失败:%s", e)
raise
def list_serial_ports() -> list[dict[str, str]]:
"""列出可用的串口端口."""
ports = serial.tools.list_ports.comports()
return [
{"device": p.device, "description": p.description, "hwid": p.hwid}
for p in ports
]