From e00773519709537d2bd58310c419ee12f721410c Mon Sep 17 00:00:00 2001 From: impressionyang Date: Thu, 16 Apr 2026 19:59:52 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20HCI=20=E5=8D=8F?= =?UTF-8?q?=E8=AE=AE=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 根据 danglo 工具日志,协议格式应为: - 命令:E9 FF [OPCODE(1)] (3 字节,无 payload) - 响应:91 [OPCODE(1)] [LEN(1)] [PAYLOAD...] 之前使用 2 字节 opcode 和 2 字节长度字段是错误的。 --- .../sigmesh_gateway/hci_gateway.py | 174 ++++++------------ 1 file changed, 53 insertions(+), 121 deletions(-) diff --git a/custom_components/sigmesh_gateway/hci_gateway.py b/custom_components/sigmesh_gateway/hci_gateway.py index 18d147a..e04ca99 100644 --- a/custom_components/sigmesh_gateway/hci_gateway.py +++ b/custom_components/sigmesh_gateway/hci_gateway.py @@ -2,13 +2,16 @@ 根据亿佰特 E104-BT12USP 网关的 HCI 协议实现通信。 该网关使用 HCI 数据包格式,不是 AT 命令。 + +协议格式根据 danglo 工具日志分析: +- 命令:E9 FF [OPCODE(1)] +- 响应:91 [OPCODE(1)] [LEN(1)] [PAYLOAD...] """ from __future__ import annotations import asyncio import logging -from dataclasses import dataclass from enum import IntEnum _LOGGER = logging.getLogger(__name__) @@ -18,107 +21,51 @@ class HciGatewayOp(IntEnum): """HCI 网关操作码.""" # 网关控制 - GATEWAY_RESET = 0x0001 # 网关复位 - GATEWAY_VERSION = 0x0002 # 获取版本 + GATEWAY_RESET = 0x01 + GATEWAY_VERSION = 0x02 # 配网相关 - PROV_GET_STS = 0x000C # 获取配网状态 - PROV_START = 0x0010 # 开始配网 - PROV_STOP = 0x0011 # 停止配网 - PROV_SCAN = 0x0012 # 扫描设备 + PROV_GET_STS = 0x0C # 获取配网状态 + PROV_START = 0x10 # 开始配网 + PROV_STOP = 0x11 # 停止配网 + PROV_SCAN = 0x12 # 扫描设备 # 密钥配置 - CFG_NETKEY = 0x0020 # 配置网络密钥 - CFG_APPKEY = 0x0021 # 配置应用密钥 + CFG_NETKEY = 0x20 # 配置网络密钥 + CFG_APPKEY = 0x21 # 配置应用密钥 # 数据发送 - MESH_SEND = 0x0030 # 发送 Mesh 数据 - MESH_RECV = 0x0031 # 接收 Mesh 数据 + MESH_SEND = 0x30 # 发送 Mesh 数据 + MESH_RECV = 0x31 # 接收 Mesh 数据 # 事件 - EVENT_PROV_DEVICE = 0x0040 # 配网设备事件 - EVENT_MESH_DATA = 0x0041 # Mesh 数据事件 + EVENT_PROV_DEVICE = 0x40 # 配网设备事件 + EVENT_MESH_DATA = 0x41 # Mesh 数据事件 -# HCI 数据包格式: -# [0xE9][0xFF][Opcode(2)][Length(2)][Payload(N)][Checksum(1)] -# 或 -# [0xE8][0xFF][Opcode(2)][Length(2)][Payload(N)] +# HCI 数据包格式根据 danglo 工具日志: +# 命令:E9 FF [OPCODE(1)] +# 响应:91 [OPCODE(1)] [LEN(1)] [PAYLOAD...] HCI_CMD_PREFIX = b"\xe9\xff" HCI_RSP_PREFIX = b"\x91" HCI_EVT_PREFIX = b"\xe8\xff" -@dataclass -class HciPacket: - """HCI 数据包.""" - - opcode: int - payload: bytes - is_response: bool = False - - def build_hci_command(opcode: int, payload: bytes = b"") -> bytes: """构建 HCI 命令包。 - 格式:E9 FF [OPCODE(2)] [LEN(2)] [PAYLOAD] [CHECKSUM] + 根据 danglo 工具日志,格式:E9 FF [OPCODE(1)] + 对于简单命令(无 payload),只有 3 字节。 """ - length = len(payload) - cmd = HCI_CMD_PREFIX + opcode.to_bytes(2, "little") + length.to_bytes(2, "little") + payload + # 简单格式:E9 FF OPCODE + cmd = HCI_CMD_PREFIX + bytes([opcode & 0xFF]) - # 计算校验和(简单累加) - checksum = sum(cmd) & 0xFF - return cmd + bytes([checksum]) + # 如果有 payload,添加长度和数据 + if payload: + cmd += bytes([len(payload)]) + payload - -def build_hci_mesh_send(opcode: int, payload: bytes, dst_addr: int = 0xFFFF) -> bytes: - """构建 Mesh 发送命令。 - - 格式:E9 FF 00 30 [LEN] [DST_ADDR(2)] [OPCODE(2)] [PAYLOAD] [CHECKSUM] - """ - inner_payload = dst_addr.to_bytes(2, "little") + opcode.to_bytes(2, "little") + payload - return build_hci_command(HciGatewayOp.MESH_SEND, inner_payload) - - -def parse_hci_response(data: bytes) -> tuple[int, bytes] | None: - """解析 HCI 响应数据。 - - 返回:(opcode, payload) 或 None - """ - if len(data) < 7: - return None - - # 检查响应头 - if data[0] != 0x91: - return None - - opcode = int.from_bytes(data[1:3], "little") - payload = data[3:] - - return (opcode, payload) - - -def parse_hci_event(data: bytes) -> tuple[int, bytes] | None: - """解析 HCI 事件数据。 - - 返回:(opcode, payload) 或 None - """ - if len(data) < 6: - return None - - # 检查事件头 E8 FF - if data[0:2] != b"\xe8\xff": - return None - - opcode = int.from_bytes(data[2:4], "little") - length = int.from_bytes(data[4:6], "little") - - if len(data) < 6 + length: - return None - - payload = data[6 : 6 + length] - return (opcode, payload) + return cmd class HciGateway: @@ -146,7 +93,7 @@ class HciGateway: 响应数据或 None """ cmd = build_hci_command(opcode, payload) - _LOGGER.debug("发送 HCI 命令:0x%04X, 数据:%s", opcode, cmd.hex().upper()) + _LOGGER.debug("发送 HCI 命令:0x%02X, 数据:%s", opcode, cmd.hex().upper()) # 创建响应 future loop = asyncio.get_event_loop() @@ -159,14 +106,14 @@ class HciGateway: # 等待响应 response = await asyncio.wait_for(future, timeout) - _LOGGER.debug("收到 HCI 响应:0x%04X, 数据:%s", opcode, response.hex().upper()) + _LOGGER.debug("收到 HCI 响应:0x%02X, 数据:%s", opcode, response.hex().upper()) return response except asyncio.TimeoutError: - _LOGGER.warning("HCI 命令超时:0x%04X", opcode) + _LOGGER.warning("HCI 命令超时:0x%02X", opcode) return None except Exception as e: - _LOGGER.error("HCI 命令失败:0x%04X, 错误:%s", opcode, e) + _LOGGER.error("HCI 命令失败:0x%02X, 错误:%s", opcode, e) return None finally: self._response_futures.pop(opcode, None) @@ -181,17 +128,22 @@ class HciGateway: _LOGGER.debug("HCI 接收原始数据:%s", data.hex().upper()) # 尝试解析数据包 - while len(self._buffer) >= 7: + while len(self._buffer) >= 3: # 检查响应头 0x91 if self._buffer[0] == 0x91: - if len(self._buffer) < 4: + if len(self._buffer) < 3: break - # 解析响应 - opcode = int.from_bytes(self._buffer[1:3], "little") - # 响应数据长度需要根据具体opcode确定 - # 简化处理:取剩余所有数据 - payload = bytes(self._buffer[3:]) + # 解析响应:91 [OPCODE(1)] [LEN(1)] [PAYLOAD...] + opcode = self._buffer[1] + length = self._buffer[2] + + if len(self._buffer) < 3 + length: + break # 数据不完整,等待更多 + + payload = bytes(self._buffer[3 : 3 + length]) + + _LOGGER.debug("HCI 响应:opcode=0x%02X, payload=%s", opcode, payload.hex().upper()) # 唤醒等待的 future if opcode in self._response_futures: @@ -202,23 +154,25 @@ class HciGateway: # 检查事件头 E8 FF elif self._buffer[0:2] == b"\xe8\xff": - if len(self._buffer) < 6: + if len(self._buffer) < 4: break - length = int.from_bytes(self._buffer[4:6], "little") - if len(self._buffer) < 6 + length: - break + opcode = self._buffer[2] + length = self._buffer[3] - event_data = bytes(self._buffer[: 6 + length]) - self._buffer = self._buffer[6 + length :] + if len(self._buffer) < 4 + length: + break # 数据不完整 - _LOGGER.debug("HCI 事件:%s", event_data.hex().upper()) + event_data = bytes(self._buffer[: 4 + length]) + self._buffer = self._buffer[4 + length :] + + _LOGGER.debug("HCI 事件:opcode=0x%02X, data=%s", opcode, event_data.hex().upper()) # 事件处理由上层负责 self._handle_event(event_data) else: # 未知数据,清空 - _LOGGER.warning("未知 HCI 数据头:%s", self._buffer[0:2].hex().upper()) + _LOGGER.warning("未知 HCI 数据头:%s", self._buffer[0:1].hex().upper()) self._buffer.clear() return @@ -239,7 +193,6 @@ class HciGateway: """ response = await self.send_command(HciGatewayOp.GATEWAY_VERSION) if response: - # 解析版本响应 try: return response.decode("utf-8", errors="ignore").strip() except Exception: @@ -254,7 +207,6 @@ class HciGateway: """ response = await self.send_command(HciGatewayOp.PROV_GET_STS) if response: - # 解析配网状态 # 根据 danglo 日志:91 8b 00 [state][...] state_byte = response[0] if len(response) > 0 else 0 return { @@ -325,23 +277,3 @@ class HciGateway: response = await self.send_command(HciGatewayOp.CFG_APPKEY, appkey) return response is not None - - async def mesh_send(self, dst_addr: int, opcode: int, payload: bytes) -> bool: - """发送 Mesh 消息。 - - Args: - dst_addr: 目标地址 - opcode: 操作码 - payload: 数据 - - Returns: - True 表示成功,False 表示失败 - """ - cmd = build_hci_mesh_send(opcode, payload, dst_addr) - _LOGGER.debug("发送 Mesh 消息:DST=0x%04X, OP=0x%04X", dst_addr, opcode) - try: - await self.serial_reader.write(cmd) - return True - except Exception as e: - _LOGGER.error("发送 Mesh 消息失败:%s", e) - return False