添加 send_raw_command 服务用于调试

This commit is contained in:
impressionyang 2026-04-17 11:09:56 +08:00
parent 1535b96e79
commit 0ecf542270
2 changed files with 48 additions and 0 deletions

View File

@ -297,3 +297,26 @@ class HciGateway:
""" """
response = await self.send_command(HciGatewayOp.GATEWAY_RESET) response = await self.send_command(HciGatewayOp.GATEWAY_RESET)
return response is not None return response is not None
async def test_at_command(self) -> bool:
"""发送 AT 命令测试网关响应。
用于检测网关是否为 AT 命令固件
Returns:
True 表示收到响应False 表示无响应
"""
# 发送简单的 AT 命令
test_cmd = b"AT\r\n"
_LOGGER.info("=== 发送 AT 测试命令 ===")
_LOGGER.info("CMD: %s", test_cmd.decode())
try:
await self.serial_reader.write(test_cmd)
# 等待响应 2 秒
await asyncio.sleep(2)
_LOGGER.info("AT 测试完成,检查是否有响应")
return True
except Exception as e:
_LOGGER.error("AT 测试失败:%s", e)
return False

View File

@ -54,6 +54,10 @@ SERVICE_SCHEMA_SEND_VENDOR_COMMAND = {
vol.Required("payload"): cv.string, vol.Required("payload"): cv.string,
} }
SERVICE_SCHEMA_SEND_RAW_COMMAND = {
vol.Required("hex_data"): cv.string,
}
def setup_services(hass, coordinators: dict) -> None: def setup_services(hass, coordinators: dict) -> None:
"""设置 HA 服务。 """设置 HA 服务。
@ -141,6 +145,20 @@ def setup_services(hass, coordinators: dict) -> None:
target_address, element_address, opcode, payload target_address, element_address, opcode, payload
) )
async def handle_send_raw_command(call) -> None:
"""处理发送原始串口命令服务调用."""
hex_data = call.data.get("hex_data")
try:
raw_data = bytes.fromhex(hex_data)
for coordinator in coordinators.values():
if coordinator.serial_reader.is_connected:
await coordinator.serial_reader.write(raw_data)
_LOGGER.info("已发送原始命令:%s", hex_data)
else:
_LOGGER.warning("串口未连接,无法发送命令")
except ValueError as e:
_LOGGER.error("无效的十六进制数据:%s, 错误:%s", hex_data, e)
# 注册服务 # 注册服务
hass.services.async_register( hass.services.async_register(
DOMAIN, DOMAIN,
@ -190,3 +208,10 @@ def setup_services(hass, coordinators: dict) -> None:
handle_send_vendor_command, handle_send_vendor_command,
schema=vol.Schema(SERVICE_SCHEMA_SEND_VENDOR_COMMAND), schema=vol.Schema(SERVICE_SCHEMA_SEND_VENDOR_COMMAND),
) )
hass.services.async_register(
DOMAIN,
"send_raw_command",
handle_send_raw_command,
schema=vol.Schema(SERVICE_SCHEMA_SEND_RAW_COMMAND),
)