"""SigMesh Gateway 协议解析器模块.""" from __future__ import annotations import logging from dataclasses import dataclass from enum import IntEnum from typing import Any from .const import MeshOpcode, MeshPropertyId, SensorUnit _LOGGER = logging.getLogger(__name__) @dataclass class ParsedMeshMessage: """解析后的 Mesh 消息.""" opcode: MeshOpcode | int opcode_name: str source_address: str destination_address: str model_id: int | None data: dict[str, Any] raw_payload: bytes @dataclass class DeviceState: """设备状态.""" mac_address: str element_index: int model_id: int | None states: dict[str, Any] last_update: float class ProtocolParser: """SigMesh 协议解析器.""" # Opcode 到模型 ID 的映射 OPCODE_MODEL_MAP = { MeshOpcode.ONOFF_GET: 0x1000, MeshOpcode.ONOFF_SET: 0x1000, MeshOpcode.ONOFF_STATUS: 0x1000, MeshOpcode.LIGHT_LIGHTNESS_GET: 0x1300, MeshOpcode.LIGHT_LIGHTNESS_SET: 0x1300, MeshOpcode.LIGHT_LIGHTNESS_STATUS: 0x1300, MeshOpcode.LIGHT_HSL_SET: 0x1307, MeshOpcode.LIGHT_HSL_STATUS: 0x1307, MeshOpcode.LIGHT_CTL_SET: 0x130D, MeshOpcode.LIGHT_CTL_STATUS: 0x130D, MeshOpcode.LIGHT_COLOR_SET: 0x130C, MeshOpcode.LIGHT_COLOR_STATUS: 0x130C, MeshOpcode.SENSOR_GET: 0x1100, MeshOpcode.SENSOR_STATUS: 0x1100, MeshOpcode.BATTERY_STATUS: 0x1000, } def parse_message( self, src_address: str, dst_address: str, opcode: int, payload: bytes, ) -> ParsedMeshMessage: """解析 Mesh 消息.""" # 查找对应的模型 ID model_id = self.OPCODE_MODEL_MAP.get(opcode) # 获取 opcode 名称 opcode_name = self._get_opcode_name(opcode) # 根据 opcode 解析 payload data = self._parse_payload(opcode, payload) return ParsedMeshMessage( opcode=opcode, opcode_name=opcode_name, source_address=src_address, destination_address=dst_address, model_id=model_id, data=data, raw_payload=payload, ) def _get_opcode_name(self, opcode: int) -> str: """获取 Opcode 名称.""" try: return MeshOpcode(opcode).name except ValueError: return f"UNKNOWN_0x{opcode:04X}" def _parse_payload(self, opcode: int, payload: bytes) -> dict[str, Any]: """根据 Opcode 解析 payload 数据.""" if opcode == MeshOpcode.ONOFF_STATUS: return self._parse_onoff_status(payload) elif opcode == MeshOpcode.LIGHT_LIGHTNESS_STATUS: return self._parse_light_lightness_status(payload) elif opcode == MeshOpcode.LIGHT_HSL_STATUS: return self._parse_light_hsl_status(payload) elif opcode == MeshOpcode.LIGHT_CTL_STATUS: return self._parse_light_ctl_status(payload) elif opcode == MeshOpcode.SENSOR_STATUS: return self._parse_sensor_status(payload) elif opcode == MeshOpcode.BATTERY_STATUS: return self._parse_battery_status(payload) else: return {"raw": payload.hex()} def _parse_onoff_status(self, payload: bytes) -> dict[str, Any]: """解析开关状态.""" if len(payload) < 1: return {"onoff": None} onoff_value = payload[0] return { "onoff": onoff_value == 0x01, "onoff_raw": onoff_value, } def _parse_light_lightness_status(self, payload: bytes) -> dict[str, Any]: """解析灯光亮度状态.""" if len(payload) < 2: return {"lightness": None} # 亮度值 (uint16, 小端) lightness = int.from_bytes(payload[0:2], byteorder="little") return { "lightness": lightness, "lightness_percent": round(lightness / 655.35, 1), # 0-65535 -> 0-100% } def _parse_light_hsl_status(self, payload: bytes) -> dict[str, Any]: """解析 HSL 灯光状态.""" if len(payload) < 6: return {"hsl": None} hue = int.from_bytes(payload[0:2], byteorder="little") saturation = int.from_bytes(payload[2:4], byteorder="little") lightness = int.from_bytes(payload[4:6], byteorder="little") return { "hue": hue, "hue_percent": round(hue / 655.35, 1), "saturation": saturation, "saturation_percent": round(saturation / 655.35, 1), "lightness": lightness, "lightness_percent": round(lightness / 655.35, 1), } def _parse_light_ctl_status(self, payload: bytes) -> dict[str, Any]: """解析 CTL (色温) 灯光状态.""" if len(payload) < 4: return {"ctl": None} ctl = int.from_bytes(payload[0:2], byteorder="little") delta_uv = int.from_bytes(payload[2:4], byteorder="little", signed=True) # 转换为色温 (K) color_temp = ctl return { "color_temp": color_temp, "delta_uv": delta_uv, } def _parse_sensor_status(self, payload: bytes) -> dict[str, Any]: """解析传感器状态.""" if len(payload) < 2: return {"sensor_data": None} # 解析传感器数据 result = {} # 尝试解析为已知属性 property_id = int.from_bytes(payload[0:2], byteorder="little") if len(payload) >= 4: value = int.from_bytes(payload[2:4], byteorder="little", signed=True) unit, formatted_value = self._format_sensor_value(property_id, value) result.update( { "property_id": property_id, "property_name": self._get_property_name(property_id), "value": value, "unit": unit, "formatted": formatted_value, } ) else: result["raw"] = payload.hex() return result def _parse_battery_status(self, payload: bytes) -> dict[str, Any]: """解析电池状态.""" if len(payload) < 1: return {"battery": None} battery_level = payload[0] # 电池百分比 (0-100, 255=未知) percentage = battery_level if battery_level <= 100 else None return { "battery_level": percentage, "battery_raw": battery_level, } def _get_property_name(self, property_id: int) -> str: """获取属性名称.""" try: return MeshPropertyId(property_id).name except ValueError: return f"UNKNOWN_0x{property_id:04X}" def _format_sensor_value( self, property_id: int, value: int ) -> tuple[SensorUnit, str]: """格式化传感器值.""" unit = SensorUnit.NONE # 根据属性类型格式化 if property_id == MeshPropertyId.AMBIENT_TEMPERATURE: # 温度:除以 100 得到摄氏度 formatted = f"{value / 100:.1f}°C" unit = SensorUnit.CELSIUS elif property_id == MeshPropertyId.AMBIENT_HUMIDITY: # 湿度:除以 100 得到百分比 formatted = f"{value / 100:.1f}%" unit = SensorUnit.PERCENTAGE elif property_id == MeshPropertyId.LIGHT_INTENSITY: # 光照:lux formatted = f"{value} lx" unit = SensorUnit.LUX elif property_id == MeshPropertyId.BATTERY_LEVEL: # 电池:百分比 formatted = f"{value}%" unit = SensorUnit.PERCENTAGE elif property_id == MeshPropertyId.CO2_CONCENTRATION: # CO2: ppm formatted = f"{value} ppm" unit = SensorUnit.PPM elif property_id == MeshPropertyId.PM2_5_CONCENTRATION: # PM2.5: μg/m³ formatted = f"{value} μg/m³" unit = SensorUnit.UG_M3 elif property_id in ( MeshPropertyId.PRESENCE_DETECTED, MeshPropertyId.MOTION_DETECTED, ): # 存在/运动检测 formatted = "检测到" if value != 0 else "未检测到" else: formatted = str(value) return unit, formatted class DeviceManager: """Mesh 设备管理器.""" def __init__(self) -> None: """初始化设备管理器.""" self._devices: dict[str, DeviceState] = {} self._parser = ProtocolParser() def update_device_state( self, src_address: str, parsed_message: ParsedMeshMessage ) -> DeviceState | None: """更新设备状态.""" # 使用源地址作为设备标识 device_key = src_address if device_key not in self._devices: self._devices[device_key] = DeviceState( mac_address=src_address, element_index=0, model_id=parsed_message.model_id, states={}, last_update=0, ) device = self._devices[device_key] device.last_update = parsed_message.data.get("timestamp", 0) device.states.update(parsed_message.data) device.model_id = parsed_message.model_id return device def add_device(self, mac_address: str, element_count: int = 1) -> DeviceState: """添加设备.""" device_key = mac_address if device_key not in self._devices: self._devices[device_key] = DeviceState( mac_address=mac_address, element_index=0, model_id=None, states={}, last_update=0, ) return self._devices[device_key] def remove_device(self, mac_address: str) -> None: """移除设备.""" if mac_address in self._devices: del self._devices[mac_address] def get_device(self, mac_address: str) -> DeviceState | None: """获取设备状态.""" return self._devices.get(mac_address) def get_all_devices(self) -> list[DeviceState]: """获取所有设备.""" return list(self._devices.values())