项目结构: - custom_components/sigmesh_gateway/ - Home Assistant 集成 - serial_reader.py - 串口读取器 - protocol_parser.py - 协议解析器 - coordinator.py - 数据协调器 - platforms/ - 传感器/开关/灯光/设备追踪实体 文档: - PRD.md - 产品需求文档 - README.md - 用户使用指南 - 可行性分析.md - 技术可行性分析 - 参数配置表.md - 配置参数记录 - 调试检查清单.md - 问题排查指南 功能特性: - 串口通信 (115200 波特率) - Bluetooth Mesh 协议解析 - 支持 200+ 设备接入 - UI 配置界面 - 多平台实体支持 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
320 lines
10 KiB
Python
320 lines
10 KiB
Python
"""SigMesh Gateway 协议解析器模块."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
from dataclasses import dataclass
|
||
from enum import IntEnum
|
||
from typing import Any
|
||
|
||
from .const import MeshOpcode, MeshPropertyId, SensorUnit
|
||
|
||
_LOGGER = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class ParsedMeshMessage:
|
||
"""解析后的 Mesh 消息."""
|
||
|
||
opcode: MeshOpcode | int
|
||
opcode_name: str
|
||
source_address: str
|
||
destination_address: str
|
||
model_id: int | None
|
||
data: dict[str, Any]
|
||
raw_payload: bytes
|
||
|
||
|
||
@dataclass
|
||
class DeviceState:
|
||
"""设备状态."""
|
||
|
||
mac_address: str
|
||
element_index: int
|
||
model_id: int | None
|
||
states: dict[str, Any]
|
||
last_update: float
|
||
|
||
|
||
class ProtocolParser:
|
||
"""SigMesh 协议解析器."""
|
||
|
||
# Opcode 到模型 ID 的映射
|
||
OPCODE_MODEL_MAP = {
|
||
MeshOpcode.ONOFF_GET: 0x1000,
|
||
MeshOpcode.ONOFF_SET: 0x1000,
|
||
MeshOpcode.ONOFF_STATUS: 0x1000,
|
||
MeshOpcode.LIGHT_LIGHTNESS_GET: 0x1300,
|
||
MeshOpcode.LIGHT_LIGHTNESS_SET: 0x1300,
|
||
MeshOpcode.LIGHT_LIGHTNESS_STATUS: 0x1300,
|
||
MeshOpcode.LIGHT_HSL_SET: 0x1307,
|
||
MeshOpcode.LIGHT_HSL_STATUS: 0x1307,
|
||
MeshOpcode.LIGHT_CTL_SET: 0x130D,
|
||
MeshOpcode.LIGHT_CTL_STATUS: 0x130D,
|
||
MeshOpcode.LIGHT_COLOR_SET: 0x130C,
|
||
MeshOpcode.LIGHT_COLOR_STATUS: 0x130C,
|
||
MeshOpcode.SENSOR_GET: 0x1100,
|
||
MeshOpcode.SENSOR_STATUS: 0x1100,
|
||
MeshOpcode.BATTERY_STATUS: 0x1000,
|
||
}
|
||
|
||
def parse_message(
|
||
self,
|
||
src_address: str,
|
||
dst_address: str,
|
||
opcode: int,
|
||
payload: bytes,
|
||
) -> ParsedMeshMessage:
|
||
"""解析 Mesh 消息."""
|
||
# 查找对应的模型 ID
|
||
model_id = self.OPCODE_MODEL_MAP.get(opcode)
|
||
|
||
# 获取 opcode 名称
|
||
opcode_name = self._get_opcode_name(opcode)
|
||
|
||
# 根据 opcode 解析 payload
|
||
data = self._parse_payload(opcode, payload)
|
||
|
||
return ParsedMeshMessage(
|
||
opcode=opcode,
|
||
opcode_name=opcode_name,
|
||
source_address=src_address,
|
||
destination_address=dst_address,
|
||
model_id=model_id,
|
||
data=data,
|
||
raw_payload=payload,
|
||
)
|
||
|
||
def _get_opcode_name(self, opcode: int) -> str:
|
||
"""获取 Opcode 名称."""
|
||
try:
|
||
return MeshOpcode(opcode).name
|
||
except ValueError:
|
||
return f"UNKNOWN_0x{opcode:04X}"
|
||
|
||
def _parse_payload(self, opcode: int, payload: bytes) -> dict[str, Any]:
|
||
"""根据 Opcode 解析 payload 数据."""
|
||
if opcode == MeshOpcode.ONOFF_STATUS:
|
||
return self._parse_onoff_status(payload)
|
||
elif opcode == MeshOpcode.LIGHT_LIGHTNESS_STATUS:
|
||
return self._parse_light_lightness_status(payload)
|
||
elif opcode == MeshOpcode.LIGHT_HSL_STATUS:
|
||
return self._parse_light_hsl_status(payload)
|
||
elif opcode == MeshOpcode.LIGHT_CTL_STATUS:
|
||
return self._parse_light_ctl_status(payload)
|
||
elif opcode == MeshOpcode.SENSOR_STATUS:
|
||
return self._parse_sensor_status(payload)
|
||
elif opcode == MeshOpcode.BATTERY_STATUS:
|
||
return self._parse_battery_status(payload)
|
||
else:
|
||
return {"raw": payload.hex()}
|
||
|
||
def _parse_onoff_status(self, payload: bytes) -> dict[str, Any]:
|
||
"""解析开关状态."""
|
||
if len(payload) < 1:
|
||
return {"onoff": None}
|
||
|
||
onoff_value = payload[0]
|
||
return {
|
||
"onoff": onoff_value == 0x01,
|
||
"onoff_raw": onoff_value,
|
||
}
|
||
|
||
def _parse_light_lightness_status(self, payload: bytes) -> dict[str, Any]:
|
||
"""解析灯光亮度状态."""
|
||
if len(payload) < 2:
|
||
return {"lightness": None}
|
||
|
||
# 亮度值 (uint16, 小端)
|
||
lightness = int.from_bytes(payload[0:2], byteorder="little")
|
||
return {
|
||
"lightness": lightness,
|
||
"lightness_percent": round(lightness / 655.35, 1), # 0-65535 -> 0-100%
|
||
}
|
||
|
||
def _parse_light_hsl_status(self, payload: bytes) -> dict[str, Any]:
|
||
"""解析 HSL 灯光状态."""
|
||
if len(payload) < 6:
|
||
return {"hsl": None}
|
||
|
||
hue = int.from_bytes(payload[0:2], byteorder="little")
|
||
saturation = int.from_bytes(payload[2:4], byteorder="little")
|
||
lightness = int.from_bytes(payload[4:6], byteorder="little")
|
||
|
||
return {
|
||
"hue": hue,
|
||
"hue_percent": round(hue / 655.35, 1),
|
||
"saturation": saturation,
|
||
"saturation_percent": round(saturation / 655.35, 1),
|
||
"lightness": lightness,
|
||
"lightness_percent": round(lightness / 655.35, 1),
|
||
}
|
||
|
||
def _parse_light_ctl_status(self, payload: bytes) -> dict[str, Any]:
|
||
"""解析 CTL (色温) 灯光状态."""
|
||
if len(payload) < 4:
|
||
return {"ctl": None}
|
||
|
||
ctl = int.from_bytes(payload[0:2], byteorder="little")
|
||
delta_uv = int.from_bytes(payload[2:4], byteorder="little", signed=True)
|
||
|
||
# 转换为色温 (K)
|
||
color_temp = ctl
|
||
|
||
return {
|
||
"color_temp": color_temp,
|
||
"delta_uv": delta_uv,
|
||
}
|
||
|
||
def _parse_sensor_status(self, payload: bytes) -> dict[str, Any]:
|
||
"""解析传感器状态."""
|
||
if len(payload) < 2:
|
||
return {"sensor_data": None}
|
||
|
||
# 解析传感器数据
|
||
result = {}
|
||
|
||
# 尝试解析为已知属性
|
||
property_id = int.from_bytes(payload[0:2], byteorder="little")
|
||
|
||
if len(payload) >= 4:
|
||
value = int.from_bytes(payload[2:4], byteorder="little", signed=True)
|
||
unit, formatted_value = self._format_sensor_value(property_id, value)
|
||
result.update(
|
||
{
|
||
"property_id": property_id,
|
||
"property_name": self._get_property_name(property_id),
|
||
"value": value,
|
||
"unit": unit,
|
||
"formatted": formatted_value,
|
||
}
|
||
)
|
||
else:
|
||
result["raw"] = payload.hex()
|
||
|
||
return result
|
||
|
||
def _parse_battery_status(self, payload: bytes) -> dict[str, Any]:
|
||
"""解析电池状态."""
|
||
if len(payload) < 1:
|
||
return {"battery": None}
|
||
|
||
battery_level = payload[0]
|
||
|
||
# 电池百分比 (0-100, 255=未知)
|
||
percentage = battery_level if battery_level <= 100 else None
|
||
|
||
return {
|
||
"battery_level": percentage,
|
||
"battery_raw": battery_level,
|
||
}
|
||
|
||
def _get_property_name(self, property_id: int) -> str:
|
||
"""获取属性名称."""
|
||
try:
|
||
return MeshPropertyId(property_id).name
|
||
except ValueError:
|
||
return f"UNKNOWN_0x{property_id:04X}"
|
||
|
||
def _format_sensor_value(
|
||
self, property_id: int, value: int
|
||
) -> tuple[SensorUnit, str]:
|
||
"""格式化传感器值."""
|
||
unit = SensorUnit.NONE
|
||
|
||
# 根据属性类型格式化
|
||
if property_id == MeshPropertyId.AMBIENT_TEMPERATURE:
|
||
# 温度:除以 100 得到摄氏度
|
||
formatted = f"{value / 100:.1f}°C"
|
||
unit = SensorUnit.CELSIUS
|
||
elif property_id == MeshPropertyId.AMBIENT_HUMIDITY:
|
||
# 湿度:除以 100 得到百分比
|
||
formatted = f"{value / 100:.1f}%"
|
||
unit = SensorUnit.PERCENTAGE
|
||
elif property_id == MeshPropertyId.LIGHT_INTENSITY:
|
||
# 光照:lux
|
||
formatted = f"{value} lx"
|
||
unit = SensorUnit.LUX
|
||
elif property_id == MeshPropertyId.BATTERY_LEVEL:
|
||
# 电池:百分比
|
||
formatted = f"{value}%"
|
||
unit = SensorUnit.PERCENTAGE
|
||
elif property_id == MeshPropertyId.CO2_CONCENTRATION:
|
||
# CO2: ppm
|
||
formatted = f"{value} ppm"
|
||
unit = SensorUnit.PPM
|
||
elif property_id == MeshPropertyId.PM2_5_CONCENTRATION:
|
||
# PM2.5: μg/m³
|
||
formatted = f"{value} μg/m³"
|
||
unit = SensorUnit.UG_M3
|
||
elif property_id in (
|
||
MeshPropertyId.PRESENCE_DETECTED,
|
||
MeshPropertyId.MOTION_DETECTED,
|
||
):
|
||
# 存在/运动检测
|
||
formatted = "检测到" if value != 0 else "未检测到"
|
||
else:
|
||
formatted = str(value)
|
||
|
||
return unit, formatted
|
||
|
||
|
||
class DeviceManager:
|
||
"""Mesh 设备管理器."""
|
||
|
||
def __init__(self) -> None:
|
||
"""初始化设备管理器."""
|
||
self._devices: dict[str, DeviceState] = {}
|
||
self._parser = ProtocolParser()
|
||
|
||
def update_device_state(
|
||
self, src_address: str, parsed_message: ParsedMeshMessage
|
||
) -> DeviceState | None:
|
||
"""更新设备状态."""
|
||
# 使用源地址作为设备标识
|
||
device_key = src_address
|
||
|
||
if device_key not in self._devices:
|
||
self._devices[device_key] = DeviceState(
|
||
mac_address=src_address,
|
||
element_index=0,
|
||
model_id=parsed_message.model_id,
|
||
states={},
|
||
last_update=0,
|
||
)
|
||
|
||
device = self._devices[device_key]
|
||
device.last_update = parsed_message.data.get("timestamp", 0)
|
||
device.states.update(parsed_message.data)
|
||
device.model_id = parsed_message.model_id
|
||
|
||
return device
|
||
|
||
def add_device(self, mac_address: str, element_count: int = 1) -> DeviceState:
|
||
"""添加设备."""
|
||
device_key = mac_address
|
||
|
||
if device_key not in self._devices:
|
||
self._devices[device_key] = DeviceState(
|
||
mac_address=mac_address,
|
||
element_index=0,
|
||
model_id=None,
|
||
states={},
|
||
last_update=0,
|
||
)
|
||
|
||
return self._devices[device_key]
|
||
|
||
def remove_device(self, mac_address: str) -> None:
|
||
"""移除设备."""
|
||
if mac_address in self._devices:
|
||
del self._devices[mac_address]
|
||
|
||
def get_device(self, mac_address: str) -> DeviceState | None:
|
||
"""获取设备状态."""
|
||
return self._devices.get(mac_address)
|
||
|
||
def get_all_devices(self) -> list[DeviceState]:
|
||
"""获取所有设备."""
|
||
return list(self._devices.values())
|