feat: 实现 HCI 网关协议支持 E104-BT12USP
1. 新建 hci_gateway.py - HCI 协议实现 - HCI 命令包构建和解析 - 支持配网扫描、配置密钥等操作 - 支持 Mesh 消息发送 2. 更新 serial_reader.py - 集成 HciGateway - 使用 HCI 协议解析数据(而非 AT 命令) 3. 更新 provisioning.py - 使用 HCI 协议发送扫描命令 - 移除 AT+PROV=SCAN 命令 原因:E104-BT12USP 网关使用 HCI 固件,不是 AT 命令固件
This commit is contained in:
parent
93778ef861
commit
b61d99c2e0
347
custom_components/sigmesh_gateway/hci_gateway.py
Normal file
347
custom_components/sigmesh_gateway/hci_gateway.py
Normal file
@ -0,0 +1,347 @@
|
|||||||
|
"""E104-BT12USP HCI 网关协议模块.
|
||||||
|
|
||||||
|
根据亿佰特 E104-BT12USP 网关的 HCI 协议实现通信。
|
||||||
|
该网关使用 HCI 数据包格式,不是 AT 命令。
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import IntEnum
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class HciGatewayOp(IntEnum):
|
||||||
|
"""HCI 网关操作码."""
|
||||||
|
|
||||||
|
# 网关控制
|
||||||
|
GATEWAY_RESET = 0x0001 # 网关复位
|
||||||
|
GATEWAY_VERSION = 0x0002 # 获取版本
|
||||||
|
|
||||||
|
# 配网相关
|
||||||
|
PROV_GET_STS = 0x000C # 获取配网状态
|
||||||
|
PROV_START = 0x0010 # 开始配网
|
||||||
|
PROV_STOP = 0x0011 # 停止配网
|
||||||
|
PROV_SCAN = 0x0012 # 扫描设备
|
||||||
|
|
||||||
|
# 密钥配置
|
||||||
|
CFG_NETKEY = 0x0020 # 配置网络密钥
|
||||||
|
CFG_APPKEY = 0x0021 # 配置应用密钥
|
||||||
|
|
||||||
|
# 数据发送
|
||||||
|
MESH_SEND = 0x0030 # 发送 Mesh 数据
|
||||||
|
MESH_RECV = 0x0031 # 接收 Mesh 数据
|
||||||
|
|
||||||
|
# 事件
|
||||||
|
EVENT_PROV_DEVICE = 0x0040 # 配网设备事件
|
||||||
|
EVENT_MESH_DATA = 0x0041 # Mesh 数据事件
|
||||||
|
|
||||||
|
|
||||||
|
# HCI 数据包格式:
|
||||||
|
# [0xE9][0xFF][Opcode(2)][Length(2)][Payload(N)][Checksum(1)]
|
||||||
|
# 或
|
||||||
|
# [0xE8][0xFF][Opcode(2)][Length(2)][Payload(N)]
|
||||||
|
|
||||||
|
HCI_CMD_PREFIX = b"\xe9\xff"
|
||||||
|
HCI_RSP_PREFIX = b"\x91"
|
||||||
|
HCI_EVT_PREFIX = b"\xe8\xff"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class HciPacket:
|
||||||
|
"""HCI 数据包."""
|
||||||
|
|
||||||
|
opcode: int
|
||||||
|
payload: bytes
|
||||||
|
is_response: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
def build_hci_command(opcode: int, payload: bytes = b"") -> bytes:
|
||||||
|
"""构建 HCI 命令包。
|
||||||
|
|
||||||
|
格式:E9 FF [OPCODE(2)] [LEN(2)] [PAYLOAD] [CHECKSUM]
|
||||||
|
"""
|
||||||
|
length = len(payload)
|
||||||
|
cmd = HCI_CMD_PREFIX + opcode.to_bytes(2, "little") + length.to_bytes(2, "little") + payload
|
||||||
|
|
||||||
|
# 计算校验和(简单累加)
|
||||||
|
checksum = sum(cmd) & 0xFF
|
||||||
|
return cmd + bytes([checksum])
|
||||||
|
|
||||||
|
|
||||||
|
def build_hci_mesh_send(opcode: int, payload: bytes, dst_addr: int = 0xFFFF) -> bytes:
|
||||||
|
"""构建 Mesh 发送命令。
|
||||||
|
|
||||||
|
格式:E9 FF 00 30 [LEN] [DST_ADDR(2)] [OPCODE(2)] [PAYLOAD] [CHECKSUM]
|
||||||
|
"""
|
||||||
|
inner_payload = dst_addr.to_bytes(2, "little") + opcode.to_bytes(2, "little") + payload
|
||||||
|
return build_hci_command(HciGatewayOp.MESH_SEND, inner_payload)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_hci_response(data: bytes) -> tuple[int, bytes] | None:
|
||||||
|
"""解析 HCI 响应数据。
|
||||||
|
|
||||||
|
返回:(opcode, payload) 或 None
|
||||||
|
"""
|
||||||
|
if len(data) < 7:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 检查响应头
|
||||||
|
if data[0] != 0x91:
|
||||||
|
return None
|
||||||
|
|
||||||
|
opcode = int.from_bytes(data[1:3], "little")
|
||||||
|
payload = data[3:]
|
||||||
|
|
||||||
|
return (opcode, payload)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_hci_event(data: bytes) -> tuple[int, bytes] | None:
|
||||||
|
"""解析 HCI 事件数据。
|
||||||
|
|
||||||
|
返回:(opcode, payload) 或 None
|
||||||
|
"""
|
||||||
|
if len(data) < 6:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 检查事件头 E8 FF
|
||||||
|
if data[0:2] != b"\xe8\xff":
|
||||||
|
return None
|
||||||
|
|
||||||
|
opcode = int.from_bytes(data[2:4], "little")
|
||||||
|
length = int.from_bytes(data[4:6], "little")
|
||||||
|
|
||||||
|
if len(data) < 6 + length:
|
||||||
|
return None
|
||||||
|
|
||||||
|
payload = data[6 : 6 + length]
|
||||||
|
return (opcode, payload)
|
||||||
|
|
||||||
|
|
||||||
|
class HciGateway:
|
||||||
|
"""HCI 网关通信类."""
|
||||||
|
|
||||||
|
def __init__(self, serial_reader) -> None:
|
||||||
|
"""初始化 HCI 网关。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
serial_reader: SerialReader 实例
|
||||||
|
"""
|
||||||
|
self.serial_reader = serial_reader
|
||||||
|
self._buffer = bytearray()
|
||||||
|
self._response_futures: dict[int, asyncio.Future] = {}
|
||||||
|
|
||||||
|
async def send_command(self, opcode: int, payload: bytes = b"", timeout: float = 5.0) -> bytes | None:
|
||||||
|
"""发送 HCI 命令并等待响应。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
opcode: 操作码
|
||||||
|
payload: 命令数据
|
||||||
|
timeout: 超时时间(秒)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
响应数据或 None
|
||||||
|
"""
|
||||||
|
cmd = build_hci_command(opcode, payload)
|
||||||
|
_LOGGER.debug("发送 HCI 命令:0x%04X, 数据:%s", opcode, cmd.hex().upper())
|
||||||
|
|
||||||
|
# 创建响应 future
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
future = loop.create_future()
|
||||||
|
self._response_futures[opcode] = future
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 发送命令
|
||||||
|
await self.serial_reader.write(cmd)
|
||||||
|
|
||||||
|
# 等待响应
|
||||||
|
response = await asyncio.wait_for(future, timeout)
|
||||||
|
_LOGGER.debug("收到 HCI 响应:0x%04X, 数据:%s", opcode, response.hex().upper())
|
||||||
|
return response
|
||||||
|
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
_LOGGER.warning("HCI 命令超时:0x%04X", opcode)
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
_LOGGER.error("HCI 命令失败:0x%04X, 错误:%s", opcode, e)
|
||||||
|
return None
|
||||||
|
finally:
|
||||||
|
self._response_futures.pop(opcode, None)
|
||||||
|
|
||||||
|
def handle_data(self, data: bytes) -> None:
|
||||||
|
"""处理接收到的数据。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data: 原始串口数据
|
||||||
|
"""
|
||||||
|
self._buffer.extend(data)
|
||||||
|
_LOGGER.debug("HCI 接收原始数据:%s", data.hex().upper())
|
||||||
|
|
||||||
|
# 尝试解析数据包
|
||||||
|
while len(self._buffer) >= 7:
|
||||||
|
# 检查响应头 0x91
|
||||||
|
if self._buffer[0] == 0x91:
|
||||||
|
if len(self._buffer) < 4:
|
||||||
|
break
|
||||||
|
|
||||||
|
# 解析响应
|
||||||
|
opcode = int.from_bytes(self._buffer[1:3], "little")
|
||||||
|
# 响应数据长度需要根据具体opcode确定
|
||||||
|
# 简化处理:取剩余所有数据
|
||||||
|
payload = bytes(self._buffer[3:])
|
||||||
|
|
||||||
|
# 唤醒等待的 future
|
||||||
|
if opcode in self._response_futures:
|
||||||
|
self._response_futures[opcode].set_result(payload)
|
||||||
|
|
||||||
|
self._buffer.clear()
|
||||||
|
return
|
||||||
|
|
||||||
|
# 检查事件头 E8 FF
|
||||||
|
elif self._buffer[0:2] == b"\xe8\xff":
|
||||||
|
if len(self._buffer) < 6:
|
||||||
|
break
|
||||||
|
|
||||||
|
length = int.from_bytes(self._buffer[4:6], "little")
|
||||||
|
if len(self._buffer) < 6 + length:
|
||||||
|
break
|
||||||
|
|
||||||
|
event_data = bytes(self._buffer[: 6 + length])
|
||||||
|
self._buffer = self._buffer[6 + length :]
|
||||||
|
|
||||||
|
_LOGGER.debug("HCI 事件:%s", event_data.hex().upper())
|
||||||
|
# 事件处理由上层负责
|
||||||
|
self._handle_event(event_data)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# 未知数据,清空
|
||||||
|
_LOGGER.warning("未知 HCI 数据头:%s", self._buffer[0:2].hex().upper())
|
||||||
|
self._buffer.clear()
|
||||||
|
return
|
||||||
|
|
||||||
|
def _handle_event(self, event_data: bytes) -> None:
|
||||||
|
"""处理 HCI 事件。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_data: 事件数据(包含头)
|
||||||
|
"""
|
||||||
|
# 由上层回调处理
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def get_version(self) -> str | None:
|
||||||
|
"""获取网关版本。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
版本字符串或 None
|
||||||
|
"""
|
||||||
|
response = await self.send_command(HciGatewayOp.GATEWAY_VERSION)
|
||||||
|
if response:
|
||||||
|
# 解析版本响应
|
||||||
|
try:
|
||||||
|
return response.decode("utf-8", errors="ignore").strip()
|
||||||
|
except Exception:
|
||||||
|
return response.hex().upper()
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def get_prov_state(self) -> dict | None:
|
||||||
|
"""获取配网状态。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
配网状态字典或 None
|
||||||
|
"""
|
||||||
|
response = await self.send_command(HciGatewayOp.PROV_GET_STS)
|
||||||
|
if response:
|
||||||
|
# 解析配网状态
|
||||||
|
# 根据 danglo 日志:91 8b 00 [state][...]
|
||||||
|
state_byte = response[0] if len(response) > 0 else 0
|
||||||
|
return {
|
||||||
|
"provisioned": state_byte == 0x01,
|
||||||
|
"raw": response.hex().upper(),
|
||||||
|
}
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def start_scanning(self) -> bool:
|
||||||
|
"""开始扫描设备。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 表示成功,False 表示失败
|
||||||
|
"""
|
||||||
|
response = await self.send_command(HciGatewayOp.PROV_SCAN)
|
||||||
|
return response is not None
|
||||||
|
|
||||||
|
async def stop_scanning(self) -> bool:
|
||||||
|
"""停止扫描。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 表示成功,False 表示失败
|
||||||
|
"""
|
||||||
|
response = await self.send_command(HciGatewayOp.PROV_STOP)
|
||||||
|
return response is not None
|
||||||
|
|
||||||
|
async def start_provisioning(self, device_mac: bytes) -> bool:
|
||||||
|
"""开始配网设备。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
device_mac: 设备 MAC 地址(6 字节)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 表示成功,False 表示失败
|
||||||
|
"""
|
||||||
|
payload = device_mac
|
||||||
|
response = await self.send_command(HciGatewayOp.PROV_START, payload)
|
||||||
|
return response is not None
|
||||||
|
|
||||||
|
async def configure_netkey(self, netkey: bytes) -> bool:
|
||||||
|
"""配置网络密钥。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
netkey: 16 字节网络密钥
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 表示成功,False 表示失败
|
||||||
|
"""
|
||||||
|
if len(netkey) != 16:
|
||||||
|
_LOGGER.error("网络密钥长度错误:%d", len(netkey))
|
||||||
|
return False
|
||||||
|
|
||||||
|
response = await self.send_command(HciGatewayOp.CFG_NETKEY, netkey)
|
||||||
|
return response is not None
|
||||||
|
|
||||||
|
async def configure_appkey(self, appkey: bytes) -> bool:
|
||||||
|
"""配置应用密钥。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
appkey: 16 字节应用密钥
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 表示成功,False 表示失败
|
||||||
|
"""
|
||||||
|
if len(appkey) != 16:
|
||||||
|
_LOGGER.error("应用密钥长度错误:%d", len(appkey))
|
||||||
|
return False
|
||||||
|
|
||||||
|
response = await self.send_command(HciGatewayOp.CFG_APPKEY, appkey)
|
||||||
|
return response is not None
|
||||||
|
|
||||||
|
async def mesh_send(self, dst_addr: int, opcode: int, payload: bytes) -> bool:
|
||||||
|
"""发送 Mesh 消息。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
dst_addr: 目标地址
|
||||||
|
opcode: 操作码
|
||||||
|
payload: 数据
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True 表示成功,False 表示失败
|
||||||
|
"""
|
||||||
|
cmd = build_hci_mesh_send(opcode, payload, dst_addr)
|
||||||
|
_LOGGER.debug("发送 Mesh 消息:DST=0x%04X, OP=0x%04X", dst_addr, opcode)
|
||||||
|
try:
|
||||||
|
await self.serial_reader.write(cmd)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
_LOGGER.error("发送 Mesh 消息失败:%s", e)
|
||||||
|
return False
|
||||||
@ -17,6 +17,7 @@ from .const import (
|
|||||||
MeshSigOp,
|
MeshSigOp,
|
||||||
PROV_TIMEOUT,
|
PROV_TIMEOUT,
|
||||||
)
|
)
|
||||||
|
from .hci_gateway import HciGateway, HciGatewayOp
|
||||||
from .serial_reader import SerialReader
|
from .serial_reader import SerialReader
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
@ -55,7 +56,7 @@ class GroupConfig:
|
|||||||
|
|
||||||
|
|
||||||
class ProvisioningManager:
|
class ProvisioningManager:
|
||||||
"""配网管理器."""
|
"""配网管理器(HCI 协议)."""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@ -129,7 +130,7 @@ class ProvisioningManager:
|
|||||||
self._prov_timeout_handle = None
|
self._prov_timeout_handle = None
|
||||||
|
|
||||||
async def start_scanning(self) -> None:
|
async def start_scanning(self) -> None:
|
||||||
"""开始扫描设备."""
|
"""开始扫描设备(HCI 协议)."""
|
||||||
_LOGGER.debug("start_scanning 被调用,当前状态:%s", self._state.value)
|
_LOGGER.debug("start_scanning 被调用,当前状态:%s", self._state.value)
|
||||||
|
|
||||||
# 如果已经在扫描中,先重置状态(允许用户重新启动扫描)
|
# 如果已经在扫描中,先重置状态(允许用户重新启动扫描)
|
||||||
@ -147,12 +148,20 @@ class ProvisioningManager:
|
|||||||
self._devices = {}
|
self._devices = {}
|
||||||
self._scan_result = []
|
self._scan_result = []
|
||||||
|
|
||||||
# 发送扫描命令:AT+PROV=SCAN
|
# 使用 HCI 协议发送扫描命令
|
||||||
# 网关会开始扫描周围的配网设备,设备响应后通过串口上报
|
|
||||||
try:
|
try:
|
||||||
_LOGGER.debug("准备发送扫描命令:AT+PROV=SCAN")
|
if not self.serial_reader.hci:
|
||||||
await self.serial_reader.write_command("AT+PROV=SCAN")
|
_LOGGER.error("HCI 网关未初始化")
|
||||||
_LOGGER.info("已发送扫描命令,等待设备响应...")
|
self._set_state(ProvState.PROV_FAILED)
|
||||||
|
return
|
||||||
|
|
||||||
|
_LOGGER.debug("发送 HCI 扫描命令")
|
||||||
|
result = await self.serial_reader.hci.start_scanning()
|
||||||
|
if result:
|
||||||
|
_LOGGER.info("已发送 HCI 扫描命令,等待设备响应...")
|
||||||
|
else:
|
||||||
|
_LOGGER.error("HCI 扫描命令无响应")
|
||||||
|
self._set_state(ProvState.PROV_FAILED)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
_LOGGER.error("发送扫描命令失败:%s", e)
|
_LOGGER.error("发送扫描命令失败:%s", e)
|
||||||
self._set_state(ProvState.PROV_FAILED)
|
self._set_state(ProvState.PROV_FAILED)
|
||||||
|
|||||||
@ -19,6 +19,7 @@ from .const import (
|
|||||||
SERIAL_PROV_DEVICE_JOINED,
|
SERIAL_PROV_DEVICE_JOINED,
|
||||||
SERIAL_PROV_DEVICE_LEFT,
|
SERIAL_PROV_DEVICE_LEFT,
|
||||||
)
|
)
|
||||||
|
from .hci_gateway import HciGateway
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -54,7 +55,7 @@ class ProvDeviceEvent:
|
|||||||
|
|
||||||
|
|
||||||
class SerialReader:
|
class SerialReader:
|
||||||
"""异步串口读取器."""
|
"""异步串口读取器(HCI 协议)."""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@ -78,6 +79,9 @@ class SerialReader:
|
|||||||
self._read_task: asyncio.Task | None = None
|
self._read_task: asyncio.Task | None = None
|
||||||
self._buffer = bytearray()
|
self._buffer = bytearray()
|
||||||
|
|
||||||
|
# HCI 网关
|
||||||
|
self.hci: HciGateway | None = None
|
||||||
|
|
||||||
# 回调函数
|
# 回调函数
|
||||||
self._on_data_callback: Callable[[SerialDataEvent], None] | None = None
|
self._on_data_callback: Callable[[SerialDataEvent], None] | None = None
|
||||||
self._on_mesh_message_callback: Callable[[MeshMessageEvent], None] | None = None
|
self._on_mesh_message_callback: Callable[[MeshMessageEvent], None] | None = None
|
||||||
@ -223,6 +227,8 @@ class SerialReader:
|
|||||||
exclusive=True,
|
exclusive=True,
|
||||||
)
|
)
|
||||||
self._running = True
|
self._running = True
|
||||||
|
# 初始化 HCI 网关
|
||||||
|
self.hci = HciGateway(self)
|
||||||
_LOGGER.info(
|
_LOGGER.info(
|
||||||
"串口已连接:%s, 波特率:%d",
|
"串口已连接:%s, 波特率:%d",
|
||||||
self.device,
|
self.device,
|
||||||
@ -257,25 +263,17 @@ class SerialReader:
|
|||||||
self._read_task = asyncio.create_task(self._read_loop())
|
self._read_task = asyncio.create_task(self._read_loop())
|
||||||
|
|
||||||
async def _read_loop(self) -> None:
|
async def _read_loop(self) -> None:
|
||||||
"""串口读取循环."""
|
"""串口读取循环(HCI 协议)."""
|
||||||
_LOGGER.debug("开始串口读取循环")
|
_LOGGER.debug("开始串口读取循环(HCI 模式)")
|
||||||
|
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
if self._serial and self._serial.in_waiting:
|
if self._serial and self._serial.in_waiting:
|
||||||
data = self._serial.read(self._serial.in_waiting)
|
data = self._serial.read(self._serial.in_waiting)
|
||||||
self._buffer.extend(data)
|
|
||||||
|
|
||||||
# 按行处理
|
# 直接交给 HCI 网关处理
|
||||||
while b"\r\n" in self._buffer:
|
if self.hci:
|
||||||
line_bytes, self._buffer = self._buffer.split(b"\r\n", 1)
|
self.hci.handle_data(data)
|
||||||
try:
|
|
||||||
line = line_bytes.decode("utf-8").strip()
|
|
||||||
_LOGGER.debug("串口接收:%s", line)
|
|
||||||
_LOGGER.debug("串口接收原始数据:%s", line_bytes.hex().upper())
|
|
||||||
self._parse_event_line(line)
|
|
||||||
except UnicodeDecodeError as e:
|
|
||||||
_LOGGER.warning("解码失败:%s, 错误:%s", line_bytes, e)
|
|
||||||
|
|
||||||
await asyncio.sleep(0.01) # 避免 CPU 占用过高
|
await asyncio.sleep(0.01) # 避免 CPU 占用过高
|
||||||
|
|
||||||
@ -292,7 +290,7 @@ class SerialReader:
|
|||||||
|
|
||||||
async def write(self, data: bytes) -> int:
|
async def write(self, data: bytes) -> int:
|
||||||
"""写入数据到串口."""
|
"""写入数据到串口."""
|
||||||
_LOGGER.debug("write 方法:is_connected=%s, _serial=%s", self.is_connected, self._serial)
|
_LOGGER.debug("串口写入:%d 字节", len(data))
|
||||||
if self._serial is None:
|
if self._serial is None:
|
||||||
_LOGGER.error("串口对象未初始化")
|
_LOGGER.error("串口对象未初始化")
|
||||||
raise RuntimeError("串口未连接")
|
raise RuntimeError("串口未连接")
|
||||||
@ -302,20 +300,14 @@ class SerialReader:
|
|||||||
_LOGGER.debug("串口写入成功:%d 字节", result)
|
_LOGGER.debug("串口写入成功:%d 字节", result)
|
||||||
return result
|
return result
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
_LOGGER.error("串口写入失败:%s (is_open=%s)", e, self._serial.is_open)
|
_LOGGER.error("串口写入失败:%s", e)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
async def write_command(self, command: str) -> int:
|
async def write_command(self, command: str) -> int:
|
||||||
"""写入 AT 命令."""
|
"""写入 AT 命令(兼容旧接口,实际不使用)."""
|
||||||
|
_LOGGER.warning("write_command 被调用,但 HCI 模式下应使用 hci.send_command")
|
||||||
cmd_bytes = f"{command}\r\n".encode()
|
cmd_bytes = f"{command}\r\n".encode()
|
||||||
_LOGGER.debug("发送命令:%s (原始数据:%s)", command, cmd_bytes.hex())
|
return await self.write(cmd_bytes)
|
||||||
try:
|
|
||||||
result = await self.write(cmd_bytes)
|
|
||||||
_LOGGER.debug("命令发送成功,发送了 %d 字节", result)
|
|
||||||
return result
|
|
||||||
except Exception as e:
|
|
||||||
_LOGGER.error("命令发送失败:%s", e)
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def list_serial_ports() -> list[dict[str, str]]:
|
def list_serial_ports() -> list[dict[str, str]]:
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user