impress_sig_mesh_hacs/custom_components/sigmesh_gateway/hci_gateway.py
impressionyang e007735197 fix: 修复 HCI 协议格式
根据 danglo 工具日志,协议格式应为:
- 命令:E9 FF [OPCODE(1)]  (3 字节,无 payload)
- 响应:91 [OPCODE(1)] [LEN(1)] [PAYLOAD...]

之前使用 2 字节 opcode 和 2 字节长度字段是错误的。
2026-04-16 19:59:52 +08:00

280 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""E104-BT12USP HCI 网关协议模块.
根据亿佰特 E104-BT12USP 网关的 HCI 协议实现通信。
该网关使用 HCI 数据包格式,不是 AT 命令。
协议格式根据 danglo 工具日志分析:
- 命令E9 FF [OPCODE(1)]
- 响应91 [OPCODE(1)] [LEN(1)] [PAYLOAD...]
"""
from __future__ import annotations
import asyncio
import logging
from enum import IntEnum
_LOGGER = logging.getLogger(__name__)
class HciGatewayOp(IntEnum):
"""HCI 网关操作码."""
# 网关控制
GATEWAY_RESET = 0x01
GATEWAY_VERSION = 0x02
# 配网相关
PROV_GET_STS = 0x0C # 获取配网状态
PROV_START = 0x10 # 开始配网
PROV_STOP = 0x11 # 停止配网
PROV_SCAN = 0x12 # 扫描设备
# 密钥配置
CFG_NETKEY = 0x20 # 配置网络密钥
CFG_APPKEY = 0x21 # 配置应用密钥
# 数据发送
MESH_SEND = 0x30 # 发送 Mesh 数据
MESH_RECV = 0x31 # 接收 Mesh 数据
# 事件
EVENT_PROV_DEVICE = 0x40 # 配网设备事件
EVENT_MESH_DATA = 0x41 # Mesh 数据事件
# HCI 数据包格式根据 danglo 工具日志:
# 命令E9 FF [OPCODE(1)]
# 响应91 [OPCODE(1)] [LEN(1)] [PAYLOAD...]
HCI_CMD_PREFIX = b"\xe9\xff"
HCI_RSP_PREFIX = b"\x91"
HCI_EVT_PREFIX = b"\xe8\xff"
def build_hci_command(opcode: int, payload: bytes = b"") -> bytes:
"""构建 HCI 命令包。
根据 danglo 工具日志格式E9 FF [OPCODE(1)]
对于简单命令(无 payload只有 3 字节。
"""
# 简单格式E9 FF OPCODE
cmd = HCI_CMD_PREFIX + bytes([opcode & 0xFF])
# 如果有 payload添加长度和数据
if payload:
cmd += bytes([len(payload)]) + payload
return cmd
class HciGateway:
"""HCI 网关通信类."""
def __init__(self, serial_reader) -> None:
"""初始化 HCI 网关。
Args:
serial_reader: SerialReader 实例
"""
self.serial_reader = serial_reader
self._buffer = bytearray()
self._response_futures: dict[int, asyncio.Future] = {}
async def send_command(self, opcode: int, payload: bytes = b"", timeout: float = 5.0) -> bytes | None:
"""发送 HCI 命令并等待响应。
Args:
opcode: 操作码
payload: 命令数据
timeout: 超时时间(秒)
Returns:
响应数据或 None
"""
cmd = build_hci_command(opcode, payload)
_LOGGER.debug("发送 HCI 命令0x%02X, 数据:%s", opcode, cmd.hex().upper())
# 创建响应 future
loop = asyncio.get_event_loop()
future = loop.create_future()
self._response_futures[opcode] = future
try:
# 发送命令
await self.serial_reader.write(cmd)
# 等待响应
response = await asyncio.wait_for(future, timeout)
_LOGGER.debug("收到 HCI 响应0x%02X, 数据:%s", opcode, response.hex().upper())
return response
except asyncio.TimeoutError:
_LOGGER.warning("HCI 命令超时0x%02X", opcode)
return None
except Exception as e:
_LOGGER.error("HCI 命令失败0x%02X, 错误:%s", opcode, e)
return None
finally:
self._response_futures.pop(opcode, None)
def handle_data(self, data: bytes) -> None:
"""处理接收到的数据。
Args:
data: 原始串口数据
"""
self._buffer.extend(data)
_LOGGER.debug("HCI 接收原始数据:%s", data.hex().upper())
# 尝试解析数据包
while len(self._buffer) >= 3:
# 检查响应头 0x91
if self._buffer[0] == 0x91:
if len(self._buffer) < 3:
break
# 解析响应91 [OPCODE(1)] [LEN(1)] [PAYLOAD...]
opcode = self._buffer[1]
length = self._buffer[2]
if len(self._buffer) < 3 + length:
break # 数据不完整,等待更多
payload = bytes(self._buffer[3 : 3 + length])
_LOGGER.debug("HCI 响应opcode=0x%02X, payload=%s", opcode, payload.hex().upper())
# 唤醒等待的 future
if opcode in self._response_futures:
self._response_futures[opcode].set_result(payload)
self._buffer.clear()
return
# 检查事件头 E8 FF
elif self._buffer[0:2] == b"\xe8\xff":
if len(self._buffer) < 4:
break
opcode = self._buffer[2]
length = self._buffer[3]
if len(self._buffer) < 4 + length:
break # 数据不完整
event_data = bytes(self._buffer[: 4 + length])
self._buffer = self._buffer[4 + length :]
_LOGGER.debug("HCI 事件opcode=0x%02X, data=%s", opcode, event_data.hex().upper())
# 事件处理由上层负责
self._handle_event(event_data)
else:
# 未知数据,清空
_LOGGER.warning("未知 HCI 数据头:%s", self._buffer[0:1].hex().upper())
self._buffer.clear()
return
def _handle_event(self, event_data: bytes) -> None:
"""处理 HCI 事件。
Args:
event_data: 事件数据(包含头)
"""
# 由上层回调处理
pass
async def get_version(self) -> str | None:
"""获取网关版本。
Returns:
版本字符串或 None
"""
response = await self.send_command(HciGatewayOp.GATEWAY_VERSION)
if response:
try:
return response.decode("utf-8", errors="ignore").strip()
except Exception:
return response.hex().upper()
return None
async def get_prov_state(self) -> dict | None:
"""获取配网状态。
Returns:
配网状态字典或 None
"""
response = await self.send_command(HciGatewayOp.PROV_GET_STS)
if response:
# 根据 danglo 日志91 8b 00 [state][...]
state_byte = response[0] if len(response) > 0 else 0
return {
"provisioned": state_byte == 0x01,
"raw": response.hex().upper(),
}
return None
async def start_scanning(self) -> bool:
"""开始扫描设备。
Returns:
True 表示成功False 表示失败
"""
response = await self.send_command(HciGatewayOp.PROV_SCAN)
return response is not None
async def stop_scanning(self) -> bool:
"""停止扫描。
Returns:
True 表示成功False 表示失败
"""
response = await self.send_command(HciGatewayOp.PROV_STOP)
return response is not None
async def start_provisioning(self, device_mac: bytes) -> bool:
"""开始配网设备。
Args:
device_mac: 设备 MAC 地址6 字节)
Returns:
True 表示成功False 表示失败
"""
payload = device_mac
response = await self.send_command(HciGatewayOp.PROV_START, payload)
return response is not None
async def configure_netkey(self, netkey: bytes) -> bool:
"""配置网络密钥。
Args:
netkey: 16 字节网络密钥
Returns:
True 表示成功False 表示失败
"""
if len(netkey) != 16:
_LOGGER.error("网络密钥长度错误:%d", len(netkey))
return False
response = await self.send_command(HciGatewayOp.CFG_NETKEY, netkey)
return response is not None
async def configure_appkey(self, appkey: bytes) -> bool:
"""配置应用密钥。
Args:
appkey: 16 字节应用密钥
Returns:
True 表示成功False 表示失败
"""
if len(appkey) != 16:
_LOGGER.error("应用密钥长度错误:%d", len(appkey))
return False
response = await self.send_command(HciGatewayOp.CFG_APPKEY, appkey)
return response is not None