impress_sig_mesh_hacs/custom_components/sigmesh_gateway/protocol_parser.py
impressionyang 6a66c9b474 初始提交:SigMesh Gateway HACS 集成
项目结构:
- custom_components/sigmesh_gateway/ - Home Assistant 集成
  - serial_reader.py - 串口读取器
  - protocol_parser.py - 协议解析器
  - coordinator.py - 数据协调器
  - platforms/ - 传感器/开关/灯光/设备追踪实体

文档:
- PRD.md - 产品需求文档
- README.md - 用户使用指南
- 可行性分析.md - 技术可行性分析
- 参数配置表.md - 配置参数记录
- 调试检查清单.md - 问题排查指南

功能特性:
- 串口通信 (115200 波特率)
- Bluetooth Mesh 协议解析
- 支持 200+ 设备接入
- UI 配置界面
- 多平台实体支持

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-15 18:20:48 +08:00

320 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""SigMesh Gateway 协议解析器模块."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from enum import IntEnum
from typing import Any
from .const import MeshOpcode, MeshPropertyId, SensorUnit
_LOGGER = logging.getLogger(__name__)
@dataclass
class ParsedMeshMessage:
"""解析后的 Mesh 消息."""
opcode: MeshOpcode | int
opcode_name: str
source_address: str
destination_address: str
model_id: int | None
data: dict[str, Any]
raw_payload: bytes
@dataclass
class DeviceState:
"""设备状态."""
mac_address: str
element_index: int
model_id: int | None
states: dict[str, Any]
last_update: float
class ProtocolParser:
"""SigMesh 协议解析器."""
# Opcode 到模型 ID 的映射
OPCODE_MODEL_MAP = {
MeshOpcode.ONOFF_GET: 0x1000,
MeshOpcode.ONOFF_SET: 0x1000,
MeshOpcode.ONOFF_STATUS: 0x1000,
MeshOpcode.LIGHT_LIGHTNESS_GET: 0x1300,
MeshOpcode.LIGHT_LIGHTNESS_SET: 0x1300,
MeshOpcode.LIGHT_LIGHTNESS_STATUS: 0x1300,
MeshOpcode.LIGHT_HSL_SET: 0x1307,
MeshOpcode.LIGHT_HSL_STATUS: 0x1307,
MeshOpcode.LIGHT_CTL_SET: 0x130D,
MeshOpcode.LIGHT_CTL_STATUS: 0x130D,
MeshOpcode.LIGHT_COLOR_SET: 0x130C,
MeshOpcode.LIGHT_COLOR_STATUS: 0x130C,
MeshOpcode.SENSOR_GET: 0x1100,
MeshOpcode.SENSOR_STATUS: 0x1100,
MeshOpcode.BATTERY_STATUS: 0x1000,
}
def parse_message(
self,
src_address: str,
dst_address: str,
opcode: int,
payload: bytes,
) -> ParsedMeshMessage:
"""解析 Mesh 消息."""
# 查找对应的模型 ID
model_id = self.OPCODE_MODEL_MAP.get(opcode)
# 获取 opcode 名称
opcode_name = self._get_opcode_name(opcode)
# 根据 opcode 解析 payload
data = self._parse_payload(opcode, payload)
return ParsedMeshMessage(
opcode=opcode,
opcode_name=opcode_name,
source_address=src_address,
destination_address=dst_address,
model_id=model_id,
data=data,
raw_payload=payload,
)
def _get_opcode_name(self, opcode: int) -> str:
"""获取 Opcode 名称."""
try:
return MeshOpcode(opcode).name
except ValueError:
return f"UNKNOWN_0x{opcode:04X}"
def _parse_payload(self, opcode: int, payload: bytes) -> dict[str, Any]:
"""根据 Opcode 解析 payload 数据."""
if opcode == MeshOpcode.ONOFF_STATUS:
return self._parse_onoff_status(payload)
elif opcode == MeshOpcode.LIGHT_LIGHTNESS_STATUS:
return self._parse_light_lightness_status(payload)
elif opcode == MeshOpcode.LIGHT_HSL_STATUS:
return self._parse_light_hsl_status(payload)
elif opcode == MeshOpcode.LIGHT_CTL_STATUS:
return self._parse_light_ctl_status(payload)
elif opcode == MeshOpcode.SENSOR_STATUS:
return self._parse_sensor_status(payload)
elif opcode == MeshOpcode.BATTERY_STATUS:
return self._parse_battery_status(payload)
else:
return {"raw": payload.hex()}
def _parse_onoff_status(self, payload: bytes) -> dict[str, Any]:
"""解析开关状态."""
if len(payload) < 1:
return {"onoff": None}
onoff_value = payload[0]
return {
"onoff": onoff_value == 0x01,
"onoff_raw": onoff_value,
}
def _parse_light_lightness_status(self, payload: bytes) -> dict[str, Any]:
"""解析灯光亮度状态."""
if len(payload) < 2:
return {"lightness": None}
# 亮度值 (uint16, 小端)
lightness = int.from_bytes(payload[0:2], byteorder="little")
return {
"lightness": lightness,
"lightness_percent": round(lightness / 655.35, 1), # 0-65535 -> 0-100%
}
def _parse_light_hsl_status(self, payload: bytes) -> dict[str, Any]:
"""解析 HSL 灯光状态."""
if len(payload) < 6:
return {"hsl": None}
hue = int.from_bytes(payload[0:2], byteorder="little")
saturation = int.from_bytes(payload[2:4], byteorder="little")
lightness = int.from_bytes(payload[4:6], byteorder="little")
return {
"hue": hue,
"hue_percent": round(hue / 655.35, 1),
"saturation": saturation,
"saturation_percent": round(saturation / 655.35, 1),
"lightness": lightness,
"lightness_percent": round(lightness / 655.35, 1),
}
def _parse_light_ctl_status(self, payload: bytes) -> dict[str, Any]:
"""解析 CTL (色温) 灯光状态."""
if len(payload) < 4:
return {"ctl": None}
ctl = int.from_bytes(payload[0:2], byteorder="little")
delta_uv = int.from_bytes(payload[2:4], byteorder="little", signed=True)
# 转换为色温 (K)
color_temp = ctl
return {
"color_temp": color_temp,
"delta_uv": delta_uv,
}
def _parse_sensor_status(self, payload: bytes) -> dict[str, Any]:
"""解析传感器状态."""
if len(payload) < 2:
return {"sensor_data": None}
# 解析传感器数据
result = {}
# 尝试解析为已知属性
property_id = int.from_bytes(payload[0:2], byteorder="little")
if len(payload) >= 4:
value = int.from_bytes(payload[2:4], byteorder="little", signed=True)
unit, formatted_value = self._format_sensor_value(property_id, value)
result.update(
{
"property_id": property_id,
"property_name": self._get_property_name(property_id),
"value": value,
"unit": unit,
"formatted": formatted_value,
}
)
else:
result["raw"] = payload.hex()
return result
def _parse_battery_status(self, payload: bytes) -> dict[str, Any]:
"""解析电池状态."""
if len(payload) < 1:
return {"battery": None}
battery_level = payload[0]
# 电池百分比 (0-100, 255=未知)
percentage = battery_level if battery_level <= 100 else None
return {
"battery_level": percentage,
"battery_raw": battery_level,
}
def _get_property_name(self, property_id: int) -> str:
"""获取属性名称."""
try:
return MeshPropertyId(property_id).name
except ValueError:
return f"UNKNOWN_0x{property_id:04X}"
def _format_sensor_value(
self, property_id: int, value: int
) -> tuple[SensorUnit, str]:
"""格式化传感器值."""
unit = SensorUnit.NONE
# 根据属性类型格式化
if property_id == MeshPropertyId.AMBIENT_TEMPERATURE:
# 温度:除以 100 得到摄氏度
formatted = f"{value / 100:.1f}°C"
unit = SensorUnit.CELSIUS
elif property_id == MeshPropertyId.AMBIENT_HUMIDITY:
# 湿度:除以 100 得到百分比
formatted = f"{value / 100:.1f}%"
unit = SensorUnit.PERCENTAGE
elif property_id == MeshPropertyId.LIGHT_INTENSITY:
# 光照lux
formatted = f"{value} lx"
unit = SensorUnit.LUX
elif property_id == MeshPropertyId.BATTERY_LEVEL:
# 电池:百分比
formatted = f"{value}%"
unit = SensorUnit.PERCENTAGE
elif property_id == MeshPropertyId.CO2_CONCENTRATION:
# CO2: ppm
formatted = f"{value} ppm"
unit = SensorUnit.PPM
elif property_id == MeshPropertyId.PM2_5_CONCENTRATION:
# PM2.5: μg/m³
formatted = f"{value} μg/m³"
unit = SensorUnit.UG_M3
elif property_id in (
MeshPropertyId.PRESENCE_DETECTED,
MeshPropertyId.MOTION_DETECTED,
):
# 存在/运动检测
formatted = "检测到" if value != 0 else "未检测到"
else:
formatted = str(value)
return unit, formatted
class DeviceManager:
"""Mesh 设备管理器."""
def __init__(self) -> None:
"""初始化设备管理器."""
self._devices: dict[str, DeviceState] = {}
self._parser = ProtocolParser()
def update_device_state(
self, src_address: str, parsed_message: ParsedMeshMessage
) -> DeviceState | None:
"""更新设备状态."""
# 使用源地址作为设备标识
device_key = src_address
if device_key not in self._devices:
self._devices[device_key] = DeviceState(
mac_address=src_address,
element_index=0,
model_id=parsed_message.model_id,
states={},
last_update=0,
)
device = self._devices[device_key]
device.last_update = parsed_message.data.get("timestamp", 0)
device.states.update(parsed_message.data)
device.model_id = parsed_message.model_id
return device
def add_device(self, mac_address: str, element_count: int = 1) -> DeviceState:
"""添加设备."""
device_key = mac_address
if device_key not in self._devices:
self._devices[device_key] = DeviceState(
mac_address=mac_address,
element_index=0,
model_id=None,
states={},
last_update=0,
)
return self._devices[device_key]
def remove_device(self, mac_address: str) -> None:
"""移除设备."""
if mac_address in self._devices:
del self._devices[mac_address]
def get_device(self, mac_address: str) -> DeviceState | None:
"""获取设备状态."""
return self._devices.get(mac_address)
def get_all_devices(self) -> list[DeviceState]:
"""获取所有设备."""
return list(self._devices.values())