修复 HCI 命令格式和扫描前密钥配置

- hci_gateway.py: 修复 build_hci_command 始终包含长度字段
- hci_gateway.py: 增加响应解析的详细日志
- provisioning.py: start_scanning 先配置网络密钥和应用密钥
- 解决网关无响应问题,命令格式现在符合 E9 FF OPCODE LEN 规范

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
impressionyang 2026-04-17 11:03:50 +08:00
parent c3e2785d39
commit 1535b96e79
2 changed files with 46 additions and 9 deletions

View File

@ -45,15 +45,15 @@ HCI_MESH_PREFIX = b"\xe8\xff" # Mesh 数据命令头
def build_hci_command(opcode: int, payload: bytes = b"") -> bytes: def build_hci_command(opcode: int, payload: bytes = b"") -> bytes:
"""构建 HCI 命令包。 """构建 HCI 命令包。
根据 danglo 工具日志格式E9 FF [OPCODE(1)] 格式E9 FF [OPCODE(1)] [LEN(1)] [PAYLOAD...]
对于简单命令 payload只有 3 字节 长度字段始终存在即使为 0
""" """
# 简单格式:E9 FF OPCODE # E9 FF OPCODE LEN
cmd = HCI_CMD_PREFIX + bytes([opcode & 0xFF]) cmd = HCI_CMD_PREFIX + bytes([opcode & 0xFF, len(payload)])
# 如果有 payload添加长度和数据 # 如果有 payload添加数据
if payload: if payload:
cmd += bytes([len(payload)]) + payload cmd += payload
return cmd return cmd
@ -138,6 +138,8 @@ class HciGateway:
opcode = self._buffer[1] opcode = self._buffer[1]
length = self._buffer[2] length = self._buffer[2]
_LOGGER.info("收到 91 响应头opcode=0x%02X, length=%d", opcode, length)
if len(self._buffer) < 3 + length: if len(self._buffer) < 3 + length:
_LOGGER.debug("数据不完整:需要 %d 字节,当前 %d 字节", 3 + length, len(self._buffer)) _LOGGER.debug("数据不完整:需要 %d 字节,当前 %d 字节", 3 + length, len(self._buffer))
break # 数据不完整,等待更多 break # 数据不完整,等待更多
@ -148,7 +150,11 @@ class HciGateway:
# 唤醒等待的 future # 唤醒等待的 future
if opcode in self._response_futures: if opcode in self._response_futures:
_LOGGER.info("找到匹配的 future唤醒等待的命令 0x%02X", opcode)
self._response_futures[opcode].set_result(payload) self._response_futures[opcode].set_result(payload)
else:
_LOGGER.warning("未找到匹配的 future (opcode=0x%02X),已发送的命令有:%s",
opcode, list(self._response_futures.keys()))
self._buffer.clear() self._buffer.clear()
return return
@ -161,6 +167,8 @@ class HciGateway:
opcode = self._buffer[2] opcode = self._buffer[2]
length = self._buffer[3] length = self._buffer[3]
_LOGGER.info("收到 E8 事件头opcode=0x%02X, length=%d", opcode, length)
if len(self._buffer) < 4 + length: if len(self._buffer) < 4 + length:
_LOGGER.debug("事件数据不完整:需要 %d 字节,当前 %d 字节", 4 + length, len(self._buffer)) _LOGGER.debug("事件数据不完整:需要 %d 字节,当前 %d 字节", 4 + length, len(self._buffer))
break # 数据不完整 break # 数据不完整
@ -173,8 +181,9 @@ class HciGateway:
self._handle_event(event_data) self._handle_event(event_data)
else: else:
# 未知数据,清空 # 未知数据,打印警告并清空
_LOGGER.warning("未知 HCI 数据头0x%02X,清空缓冲区", self._buffer[0]) _LOGGER.warning("未知 HCI 数据头0x%02X,缓冲区前 20 字节:%s",
self._buffer[0], self._buffer[:20].hex().upper())
self._buffer.clear() self._buffer.clear()
return return
@ -279,3 +288,12 @@ class HciGateway:
response = await self.send_command(HciGatewayOp.CFG_APPKEY, appkey) response = await self.send_command(HciGatewayOp.CFG_APPKEY, appkey)
return response is not None return response is not None
async def reset_gateway(self) -> bool:
"""复位网关。
Returns:
True 表示成功False 表示失败
"""
response = await self.send_command(HciGatewayOp.GATEWAY_RESET)
return response is not None

View File

@ -154,11 +154,30 @@ class ProvisioningManager:
self._set_state(ProvState.PROV_FAILED) self._set_state(ProvState.PROV_FAILED)
return return
# 1. 先配置网关密钥(如果未配置)
try:
_LOGGER.info("正在配置网关密钥...")
netkey_bytes = bytes.fromhex(self.network_key)
appkey_bytes = bytes.fromhex(self.app_key)
# 配置网络密钥
netkey_result = await self.serial_reader.hci.configure_netkey(netkey_bytes)
_LOGGER.info("网络密钥配置结果:%s", "成功" if netkey_result else "失败")
# 配置应用密钥
appkey_result = await self.serial_reader.hci.configure_appkey(appkey_bytes)
_LOGGER.info("应用密钥配置结果:%s", "成功" if appkey_result else "失败")
# 等待网关处理
await asyncio.sleep(0.5)
except Exception as e:
_LOGGER.error("配置网关密钥失败:%s", e)
self._set_state(ProvState.SCANNING) self._set_state(ProvState.SCANNING)
self._devices = {} self._devices = {}
self._scan_result = [] self._scan_result = []
# 使用 HCI 协议发送扫描命令 # 2. 使用 HCI 协议发送扫描命令
try: try:
_LOGGER.debug("发送 HCI 扫描命令") _LOGGER.debug("发送 HCI 扫描命令")
result = await self.serial_reader.hci.start_scanning() result = await self.serial_reader.hci.start_scanning()