From b61d99c2e074a54b6126931873b3de98a75e1442 Mon Sep 17 00:00:00 2001 From: impressionyang Date: Thu, 16 Apr 2026 17:24:28 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=20HCI=20=E7=BD=91?= =?UTF-8?q?=E5=85=B3=E5=8D=8F=E8=AE=AE=E6=94=AF=E6=8C=81=20E104-BT12USP?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 新建 hci_gateway.py - HCI 协议实现 - HCI 命令包构建和解析 - 支持配网扫描、配置密钥等操作 - 支持 Mesh 消息发送 2. 更新 serial_reader.py - 集成 HciGateway - 使用 HCI 协议解析数据(而非 AT 命令) 3. 更新 provisioning.py - 使用 HCI 协议发送扫描命令 - 移除 AT+PROV=SCAN 命令 原因:E104-BT12USP 网关使用 HCI 固件,不是 AT 命令固件 --- .../sigmesh_gateway/hci_gateway.py | 347 ++++++++++++++++++ .../sigmesh_gateway/provisioning.py | 23 +- .../sigmesh_gateway/serial_reader.py | 42 +-- 3 files changed, 380 insertions(+), 32 deletions(-) create mode 100644 custom_components/sigmesh_gateway/hci_gateway.py diff --git a/custom_components/sigmesh_gateway/hci_gateway.py b/custom_components/sigmesh_gateway/hci_gateway.py new file mode 100644 index 0000000..18d147a --- /dev/null +++ b/custom_components/sigmesh_gateway/hci_gateway.py @@ -0,0 +1,347 @@ +"""E104-BT12USP HCI 网关协议模块. + +根据亿佰特 E104-BT12USP 网关的 HCI 协议实现通信。 +该网关使用 HCI 数据包格式,不是 AT 命令。 +""" + +from __future__ import annotations + +import asyncio +import logging +from dataclasses import dataclass +from enum import IntEnum + +_LOGGER = logging.getLogger(__name__) + + +class HciGatewayOp(IntEnum): + """HCI 网关操作码.""" + + # 网关控制 + GATEWAY_RESET = 0x0001 # 网关复位 + GATEWAY_VERSION = 0x0002 # 获取版本 + + # 配网相关 + PROV_GET_STS = 0x000C # 获取配网状态 + PROV_START = 0x0010 # 开始配网 + PROV_STOP = 0x0011 # 停止配网 + PROV_SCAN = 0x0012 # 扫描设备 + + # 密钥配置 + CFG_NETKEY = 0x0020 # 配置网络密钥 + CFG_APPKEY = 0x0021 # 配置应用密钥 + + # 数据发送 + MESH_SEND = 0x0030 # 发送 Mesh 数据 + MESH_RECV = 0x0031 # 接收 Mesh 数据 + + # 事件 + EVENT_PROV_DEVICE = 0x0040 # 配网设备事件 + EVENT_MESH_DATA = 0x0041 # Mesh 数据事件 + + +# HCI 数据包格式: +# [0xE9][0xFF][Opcode(2)][Length(2)][Payload(N)][Checksum(1)] +# 或 +# [0xE8][0xFF][Opcode(2)][Length(2)][Payload(N)] + +HCI_CMD_PREFIX = b"\xe9\xff" +HCI_RSP_PREFIX = b"\x91" +HCI_EVT_PREFIX = b"\xe8\xff" + + +@dataclass +class HciPacket: + """HCI 数据包.""" + + opcode: int + payload: bytes + is_response: bool = False + + +def build_hci_command(opcode: int, payload: bytes = b"") -> bytes: + """构建 HCI 命令包。 + + 格式:E9 FF [OPCODE(2)] [LEN(2)] [PAYLOAD] [CHECKSUM] + """ + length = len(payload) + cmd = HCI_CMD_PREFIX + opcode.to_bytes(2, "little") + length.to_bytes(2, "little") + payload + + # 计算校验和(简单累加) + checksum = sum(cmd) & 0xFF + return cmd + bytes([checksum]) + + +def build_hci_mesh_send(opcode: int, payload: bytes, dst_addr: int = 0xFFFF) -> bytes: + """构建 Mesh 发送命令。 + + 格式:E9 FF 00 30 [LEN] [DST_ADDR(2)] [OPCODE(2)] [PAYLOAD] [CHECKSUM] + """ + inner_payload = dst_addr.to_bytes(2, "little") + opcode.to_bytes(2, "little") + payload + return build_hci_command(HciGatewayOp.MESH_SEND, inner_payload) + + +def parse_hci_response(data: bytes) -> tuple[int, bytes] | None: + """解析 HCI 响应数据。 + + 返回:(opcode, payload) 或 None + """ + if len(data) < 7: + return None + + # 检查响应头 + if data[0] != 0x91: + return None + + opcode = int.from_bytes(data[1:3], "little") + payload = data[3:] + + return (opcode, payload) + + +def parse_hci_event(data: bytes) -> tuple[int, bytes] | None: + """解析 HCI 事件数据。 + + 返回:(opcode, payload) 或 None + """ + if len(data) < 6: + return None + + # 检查事件头 E8 FF + if data[0:2] != b"\xe8\xff": + return None + + opcode = int.from_bytes(data[2:4], "little") + length = int.from_bytes(data[4:6], "little") + + if len(data) < 6 + length: + return None + + payload = data[6 : 6 + length] + return (opcode, payload) + + +class HciGateway: + """HCI 网关通信类.""" + + def __init__(self, serial_reader) -> None: + """初始化 HCI 网关。 + + Args: + serial_reader: SerialReader 实例 + """ + self.serial_reader = serial_reader + self._buffer = bytearray() + self._response_futures: dict[int, asyncio.Future] = {} + + async def send_command(self, opcode: int, payload: bytes = b"", timeout: float = 5.0) -> bytes | None: + """发送 HCI 命令并等待响应。 + + Args: + opcode: 操作码 + payload: 命令数据 + timeout: 超时时间(秒) + + Returns: + 响应数据或 None + """ + cmd = build_hci_command(opcode, payload) + _LOGGER.debug("发送 HCI 命令:0x%04X, 数据:%s", opcode, cmd.hex().upper()) + + # 创建响应 future + loop = asyncio.get_event_loop() + future = loop.create_future() + self._response_futures[opcode] = future + + try: + # 发送命令 + await self.serial_reader.write(cmd) + + # 等待响应 + response = await asyncio.wait_for(future, timeout) + _LOGGER.debug("收到 HCI 响应:0x%04X, 数据:%s", opcode, response.hex().upper()) + return response + + except asyncio.TimeoutError: + _LOGGER.warning("HCI 命令超时:0x%04X", opcode) + return None + except Exception as e: + _LOGGER.error("HCI 命令失败:0x%04X, 错误:%s", opcode, e) + return None + finally: + self._response_futures.pop(opcode, None) + + def handle_data(self, data: bytes) -> None: + """处理接收到的数据。 + + Args: + data: 原始串口数据 + """ + self._buffer.extend(data) + _LOGGER.debug("HCI 接收原始数据:%s", data.hex().upper()) + + # 尝试解析数据包 + while len(self._buffer) >= 7: + # 检查响应头 0x91 + if self._buffer[0] == 0x91: + if len(self._buffer) < 4: + break + + # 解析响应 + opcode = int.from_bytes(self._buffer[1:3], "little") + # 响应数据长度需要根据具体opcode确定 + # 简化处理:取剩余所有数据 + payload = bytes(self._buffer[3:]) + + # 唤醒等待的 future + if opcode in self._response_futures: + self._response_futures[opcode].set_result(payload) + + self._buffer.clear() + return + + # 检查事件头 E8 FF + elif self._buffer[0:2] == b"\xe8\xff": + if len(self._buffer) < 6: + break + + length = int.from_bytes(self._buffer[4:6], "little") + if len(self._buffer) < 6 + length: + break + + event_data = bytes(self._buffer[: 6 + length]) + self._buffer = self._buffer[6 + length :] + + _LOGGER.debug("HCI 事件:%s", event_data.hex().upper()) + # 事件处理由上层负责 + self._handle_event(event_data) + + else: + # 未知数据,清空 + _LOGGER.warning("未知 HCI 数据头:%s", self._buffer[0:2].hex().upper()) + self._buffer.clear() + return + + def _handle_event(self, event_data: bytes) -> None: + """处理 HCI 事件。 + + Args: + event_data: 事件数据(包含头) + """ + # 由上层回调处理 + pass + + async def get_version(self) -> str | None: + """获取网关版本。 + + Returns: + 版本字符串或 None + """ + response = await self.send_command(HciGatewayOp.GATEWAY_VERSION) + if response: + # 解析版本响应 + try: + return response.decode("utf-8", errors="ignore").strip() + except Exception: + return response.hex().upper() + return None + + async def get_prov_state(self) -> dict | None: + """获取配网状态。 + + Returns: + 配网状态字典或 None + """ + response = await self.send_command(HciGatewayOp.PROV_GET_STS) + if response: + # 解析配网状态 + # 根据 danglo 日志:91 8b 00 [state][...] + state_byte = response[0] if len(response) > 0 else 0 + return { + "provisioned": state_byte == 0x01, + "raw": response.hex().upper(), + } + return None + + async def start_scanning(self) -> bool: + """开始扫描设备。 + + Returns: + True 表示成功,False 表示失败 + """ + response = await self.send_command(HciGatewayOp.PROV_SCAN) + return response is not None + + async def stop_scanning(self) -> bool: + """停止扫描。 + + Returns: + True 表示成功,False 表示失败 + """ + response = await self.send_command(HciGatewayOp.PROV_STOP) + return response is not None + + async def start_provisioning(self, device_mac: bytes) -> bool: + """开始配网设备。 + + Args: + device_mac: 设备 MAC 地址(6 字节) + + Returns: + True 表示成功,False 表示失败 + """ + payload = device_mac + response = await self.send_command(HciGatewayOp.PROV_START, payload) + return response is not None + + async def configure_netkey(self, netkey: bytes) -> bool: + """配置网络密钥。 + + Args: + netkey: 16 字节网络密钥 + + Returns: + True 表示成功,False 表示失败 + """ + if len(netkey) != 16: + _LOGGER.error("网络密钥长度错误:%d", len(netkey)) + return False + + response = await self.send_command(HciGatewayOp.CFG_NETKEY, netkey) + return response is not None + + async def configure_appkey(self, appkey: bytes) -> bool: + """配置应用密钥。 + + Args: + appkey: 16 字节应用密钥 + + Returns: + True 表示成功,False 表示失败 + """ + if len(appkey) != 16: + _LOGGER.error("应用密钥长度错误:%d", len(appkey)) + return False + + response = await self.send_command(HciGatewayOp.CFG_APPKEY, appkey) + return response is not None + + async def mesh_send(self, dst_addr: int, opcode: int, payload: bytes) -> bool: + """发送 Mesh 消息。 + + Args: + dst_addr: 目标地址 + opcode: 操作码 + payload: 数据 + + Returns: + True 表示成功,False 表示失败 + """ + cmd = build_hci_mesh_send(opcode, payload, dst_addr) + _LOGGER.debug("发送 Mesh 消息:DST=0x%04X, OP=0x%04X", dst_addr, opcode) + try: + await self.serial_reader.write(cmd) + return True + except Exception as e: + _LOGGER.error("发送 Mesh 消息失败:%s", e) + return False diff --git a/custom_components/sigmesh_gateway/provisioning.py b/custom_components/sigmesh_gateway/provisioning.py index 0cb0b81..9b629c0 100644 --- a/custom_components/sigmesh_gateway/provisioning.py +++ b/custom_components/sigmesh_gateway/provisioning.py @@ -17,6 +17,7 @@ from .const import ( MeshSigOp, PROV_TIMEOUT, ) +from .hci_gateway import HciGateway, HciGatewayOp from .serial_reader import SerialReader _LOGGER = logging.getLogger(__name__) @@ -55,7 +56,7 @@ class GroupConfig: class ProvisioningManager: - """配网管理器.""" + """配网管理器(HCI 协议).""" def __init__( self, @@ -129,7 +130,7 @@ class ProvisioningManager: self._prov_timeout_handle = None async def start_scanning(self) -> None: - """开始扫描设备.""" + """开始扫描设备(HCI 协议).""" _LOGGER.debug("start_scanning 被调用,当前状态:%s", self._state.value) # 如果已经在扫描中,先重置状态(允许用户重新启动扫描) @@ -147,12 +148,20 @@ class ProvisioningManager: self._devices = {} self._scan_result = [] - # 发送扫描命令:AT+PROV=SCAN - # 网关会开始扫描周围的配网设备,设备响应后通过串口上报 + # 使用 HCI 协议发送扫描命令 try: - _LOGGER.debug("准备发送扫描命令:AT+PROV=SCAN") - await self.serial_reader.write_command("AT+PROV=SCAN") - _LOGGER.info("已发送扫描命令,等待设备响应...") + if not self.serial_reader.hci: + _LOGGER.error("HCI 网关未初始化") + self._set_state(ProvState.PROV_FAILED) + return + + _LOGGER.debug("发送 HCI 扫描命令") + result = await self.serial_reader.hci.start_scanning() + if result: + _LOGGER.info("已发送 HCI 扫描命令,等待设备响应...") + else: + _LOGGER.error("HCI 扫描命令无响应") + self._set_state(ProvState.PROV_FAILED) except Exception as e: _LOGGER.error("发送扫描命令失败:%s", e) self._set_state(ProvState.PROV_FAILED) diff --git a/custom_components/sigmesh_gateway/serial_reader.py b/custom_components/sigmesh_gateway/serial_reader.py index e9db749..4cfa19a 100644 --- a/custom_components/sigmesh_gateway/serial_reader.py +++ b/custom_components/sigmesh_gateway/serial_reader.py @@ -19,6 +19,7 @@ from .const import ( SERIAL_PROV_DEVICE_JOINED, SERIAL_PROV_DEVICE_LEFT, ) +from .hci_gateway import HciGateway _LOGGER = logging.getLogger(__name__) @@ -54,7 +55,7 @@ class ProvDeviceEvent: class SerialReader: - """异步串口读取器.""" + """异步串口读取器(HCI 协议).""" def __init__( self, @@ -78,6 +79,9 @@ class SerialReader: self._read_task: asyncio.Task | None = None self._buffer = bytearray() + # HCI 网关 + self.hci: HciGateway | None = None + # 回调函数 self._on_data_callback: Callable[[SerialDataEvent], None] | None = None self._on_mesh_message_callback: Callable[[MeshMessageEvent], None] | None = None @@ -223,6 +227,8 @@ class SerialReader: exclusive=True, ) self._running = True + # 初始化 HCI 网关 + self.hci = HciGateway(self) _LOGGER.info( "串口已连接:%s, 波特率:%d", self.device, @@ -257,25 +263,17 @@ class SerialReader: self._read_task = asyncio.create_task(self._read_loop()) async def _read_loop(self) -> None: - """串口读取循环.""" - _LOGGER.debug("开始串口读取循环") + """串口读取循环(HCI 协议).""" + _LOGGER.debug("开始串口读取循环(HCI 模式)") while self._running: try: if self._serial and self._serial.in_waiting: data = self._serial.read(self._serial.in_waiting) - self._buffer.extend(data) - # 按行处理 - while b"\r\n" in self._buffer: - line_bytes, self._buffer = self._buffer.split(b"\r\n", 1) - try: - line = line_bytes.decode("utf-8").strip() - _LOGGER.debug("串口接收:%s", line) - _LOGGER.debug("串口接收原始数据:%s", line_bytes.hex().upper()) - self._parse_event_line(line) - except UnicodeDecodeError as e: - _LOGGER.warning("解码失败:%s, 错误:%s", line_bytes, e) + # 直接交给 HCI 网关处理 + if self.hci: + self.hci.handle_data(data) await asyncio.sleep(0.01) # 避免 CPU 占用过高 @@ -292,7 +290,7 @@ class SerialReader: async def write(self, data: bytes) -> int: """写入数据到串口.""" - _LOGGER.debug("write 方法:is_connected=%s, _serial=%s", self.is_connected, self._serial) + _LOGGER.debug("串口写入:%d 字节", len(data)) if self._serial is None: _LOGGER.error("串口对象未初始化") raise RuntimeError("串口未连接") @@ -302,20 +300,14 @@ class SerialReader: _LOGGER.debug("串口写入成功:%d 字节", result) return result except Exception as e: - _LOGGER.error("串口写入失败:%s (is_open=%s)", e, self._serial.is_open) + _LOGGER.error("串口写入失败:%s", e) raise async def write_command(self, command: str) -> int: - """写入 AT 命令.""" + """写入 AT 命令(兼容旧接口,实际不使用).""" + _LOGGER.warning("write_command 被调用,但 HCI 模式下应使用 hci.send_command") cmd_bytes = f"{command}\r\n".encode() - _LOGGER.debug("发送命令:%s (原始数据:%s)", command, cmd_bytes.hex()) - try: - result = await self.write(cmd_bytes) - _LOGGER.debug("命令发送成功,发送了 %d 字节", result) - return result - except Exception as e: - _LOGGER.error("命令发送失败:%s", e) - raise + return await self.write(cmd_bytes) def list_serial_ports() -> list[dict[str, str]]: