impress_sig_mesh_hacs/custom_components/sigmesh_gateway/hci_gateway.py
impressionyang c3e2785d39 feat: 添加详细的串口调试日志
添加详细的日志输出用于调试:
1. 串口原始数据输入(字节数和 HEX)
2. HCI 命令发送详情(OPCODE、HEX、长度)
3. HCI 响应解析详情
4. 数据完整性检查日志
5. 超时时的详细提示信息

日志级别:
- INFO: 关键操作和响应
- DEBUG: 详细数据流
- WARNING: 异常情况
2026-04-17 10:16:00 +08:00

282 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""E104-BT12USP HCI 网关协议模块.
根据亿佰特 E104-BT12USP 网关的 HCI 协议实现通信。
该网关使用 HCI 数据包格式,不是 AT 命令。
协议格式根据 danglo 工具配置文件分析:
- 网关配置命令E9 FF [OPCODE(1)] [LEN(1)] [PAYLOAD...]
- Mesh 数据命令E8 FF [固定字段 (6)] [目标地址 (2)] [OPCODE(2)] [参数...]
- 网关响应91 [OPCODE(1)] [LEN(1)] [PAYLOAD...]
"""
from __future__ import annotations
import asyncio
import logging
from enum import IntEnum
_LOGGER = logging.getLogger(__name__)
class HciGatewayOp(IntEnum):
"""HCI 网关操作码E9 FF 格式)."""
# 网关控制
GATEWAY_RESET = 0x01
GATEWAY_VERSION = 0x02
# 配网相关
PROV_GET_STS = 0x0C # 获取配网状态
PROV_START = 0x10 # 开始配网
PROV_STOP = 0x11 # 停止配网
PROV_SCAN = 0x12 # 扫描设备
# 密钥配置
CFG_NETKEY = 0x20 # 配置网络密钥
CFG_APPKEY = 0x21 # 配置应用密钥
# HCI 数据包格式
HCI_CMD_PREFIX = b"\xe9\xff" # 网关配置命令头
HCI_RSP_PREFIX = b"\x91" # 网关响应头
HCI_MESH_PREFIX = b"\xe8\xff" # Mesh 数据命令头
def build_hci_command(opcode: int, payload: bytes = b"") -> bytes:
"""构建 HCI 命令包。
根据 danglo 工具日志格式E9 FF [OPCODE(1)]
对于简单命令(无 payload只有 3 字节。
"""
# 简单格式E9 FF OPCODE
cmd = HCI_CMD_PREFIX + bytes([opcode & 0xFF])
# 如果有 payload添加长度和数据
if payload:
cmd += bytes([len(payload)]) + payload
return cmd
class HciGateway:
"""HCI 网关通信类."""
def __init__(self, serial_reader) -> None:
"""初始化 HCI 网关。
Args:
serial_reader: SerialReader 实例
"""
self.serial_reader = serial_reader
self._buffer = bytearray()
self._response_futures: dict[int, asyncio.Future] = {}
async def send_command(self, opcode: int, payload: bytes = b"", timeout: float = 5.0) -> bytes | None:
"""发送 HCI 命令并等待响应。
Args:
opcode: 操作码
payload: 命令数据
timeout: 超时时间(秒)
Returns:
响应数据或 None
"""
cmd = build_hci_command(opcode, payload)
_LOGGER.info("=== 发送 HCI 命令 ===")
_LOGGER.info("OPCODE: 0x%02X", opcode)
_LOGGER.info("CMD HEX: %s", cmd.hex().upper())
_LOGGER.info("CMD LEN: %d 字节", len(cmd))
# 创建响应 future
loop = asyncio.get_event_loop()
future = loop.create_future()
self._response_futures[opcode] = future
try:
# 发送命令
await self.serial_reader.write(cmd)
_LOGGER.info("命令已发送到串口")
# 等待响应
response = await asyncio.wait_for(future, timeout)
_LOGGER.info("收到 HCI 响应opcode=0x%02X, data=%s", opcode, response.hex().upper())
return response
except asyncio.TimeoutError:
_LOGGER.warning("HCI 命令超时0x%02X (超时 %d 秒)", opcode, timeout)
_LOGGER.warning("未收到任何响应,请检查:")
_LOGGER.warning("1. 网关是否已正确配置 (使用 danglo 工具)")
_LOGGER.warning("2. 串口连接是否正常")
_LOGGER.warning("3. 网关固件是否支持该命令")
return None
except Exception as e:
_LOGGER.error("HCI 命令失败0x%02X, 错误:%s", opcode, e)
return None
finally:
self._response_futures.pop(opcode, None)
def handle_data(self, data: bytes) -> None:
"""处理接收到的数据。
Args:
data: 原始串口数据
"""
self._buffer.extend(data)
_LOGGER.debug("=== HCI 接收原始数据 (%d 字节) ===", len(data))
_LOGGER.debug("HEX: %s", data.hex().upper())
_LOGGER.debug("BUF: %s", bytes(self._buffer).hex().upper())
# 尝试解析数据包
while len(self._buffer) >= 3:
# 检查响应头 0x91
if self._buffer[0] == 0x91:
if len(self._buffer) < 3:
break
# 解析响应91 [OPCODE(1)] [LEN(1)] [PAYLOAD...]
opcode = self._buffer[1]
length = self._buffer[2]
if len(self._buffer) < 3 + length:
_LOGGER.debug("数据不完整:需要 %d 字节,当前 %d 字节", 3 + length, len(self._buffer))
break # 数据不完整,等待更多
payload = bytes(self._buffer[3 : 3 + length])
_LOGGER.info("HCI 响应opcode=0x%02X, len=%d, payload=%s", opcode, length, payload.hex().upper())
# 唤醒等待的 future
if opcode in self._response_futures:
self._response_futures[opcode].set_result(payload)
self._buffer.clear()
return
# 检查事件头 E8 FF
elif self._buffer[0:2] == b"\xe8\xff":
if len(self._buffer) < 4:
break
opcode = self._buffer[2]
length = self._buffer[3]
if len(self._buffer) < 4 + length:
_LOGGER.debug("事件数据不完整:需要 %d 字节,当前 %d 字节", 4 + length, len(self._buffer))
break # 数据不完整
event_data = bytes(self._buffer[: 4 + length])
self._buffer = self._buffer[4 + length :]
_LOGGER.info("HCI 事件opcode=0x%02X, len=%d, data=%s", opcode, length, event_data.hex().upper())
# 事件处理由上层负责
self._handle_event(event_data)
else:
# 未知数据,清空
_LOGGER.warning("未知 HCI 数据头0x%02X,清空缓冲区", self._buffer[0])
self._buffer.clear()
return
def _handle_event(self, event_data: bytes) -> None:
"""处理 HCI 事件。
Args:
event_data: 事件数据(包含头)
"""
# 由上层回调处理
pass
async def get_version(self) -> str | None:
"""获取网关版本。
Returns:
版本字符串或 None
"""
response = await self.send_command(HciGatewayOp.GATEWAY_VERSION)
if response:
try:
return response.decode("utf-8", errors="ignore").strip()
except Exception:
return response.hex().upper()
return None
async def get_prov_state(self) -> dict | None:
"""获取配网状态。
Returns:
配网状态字典或 None
"""
response = await self.send_command(HciGatewayOp.PROV_GET_STS)
if response:
# 根据 danglo 日志91 8b 00 [state][...]
state_byte = response[0] if len(response) > 0 else 0
return {
"provisioned": state_byte == 0x01,
"raw": response.hex().upper(),
}
return None
async def start_scanning(self) -> bool:
"""开始扫描设备。
Returns:
True 表示成功False 表示失败
"""
response = await self.send_command(HciGatewayOp.PROV_SCAN)
return response is not None
async def stop_scanning(self) -> bool:
"""停止扫描。
Returns:
True 表示成功False 表示失败
"""
response = await self.send_command(HciGatewayOp.PROV_STOP)
return response is not None
async def start_provisioning(self, device_mac: bytes) -> bool:
"""开始配网设备。
Args:
device_mac: 设备 MAC 地址6 字节)
Returns:
True 表示成功False 表示失败
"""
payload = device_mac
response = await self.send_command(HciGatewayOp.PROV_START, payload)
return response is not None
async def configure_netkey(self, netkey: bytes) -> bool:
"""配置网络密钥。
Args:
netkey: 16 字节网络密钥
Returns:
True 表示成功False 表示失败
"""
if len(netkey) != 16:
_LOGGER.error("网络密钥长度错误:%d", len(netkey))
return False
response = await self.send_command(HciGatewayOp.CFG_NETKEY, netkey)
return response is not None
async def configure_appkey(self, appkey: bytes) -> bool:
"""配置应用密钥。
Args:
appkey: 16 字节应用密钥
Returns:
True 表示成功False 表示失败
"""
if len(appkey) != 16:
_LOGGER.error("应用密钥长度错误:%d", len(appkey))
return False
response = await self.send_command(HciGatewayOp.CFG_APPKEY, appkey)
return response is not None