diff --git a/custom_components/sigmesh_gateway/hci_gateway.py b/custom_components/sigmesh_gateway/hci_gateway.py index d9149bd..f637b04 100644 --- a/custom_components/sigmesh_gateway/hci_gateway.py +++ b/custom_components/sigmesh_gateway/hci_gateway.py @@ -297,3 +297,26 @@ class HciGateway: """ response = await self.send_command(HciGatewayOp.GATEWAY_RESET) return response is not None + + async def test_at_command(self) -> bool: + """发送 AT 命令测试网关响应。 + + 用于检测网关是否为 AT 命令固件。 + + Returns: + True 表示收到响应,False 表示无响应 + """ + # 发送简单的 AT 命令 + test_cmd = b"AT\r\n" + _LOGGER.info("=== 发送 AT 测试命令 ===") + _LOGGER.info("CMD: %s", test_cmd.decode()) + + try: + await self.serial_reader.write(test_cmd) + # 等待响应 2 秒 + await asyncio.sleep(2) + _LOGGER.info("AT 测试完成,检查是否有响应") + return True + except Exception as e: + _LOGGER.error("AT 测试失败:%s", e) + return False diff --git a/custom_components/sigmesh_gateway/services.py b/custom_components/sigmesh_gateway/services.py index d871fe6..ac1a439 100644 --- a/custom_components/sigmesh_gateway/services.py +++ b/custom_components/sigmesh_gateway/services.py @@ -54,6 +54,10 @@ SERVICE_SCHEMA_SEND_VENDOR_COMMAND = { vol.Required("payload"): cv.string, } +SERVICE_SCHEMA_SEND_RAW_COMMAND = { + vol.Required("hex_data"): cv.string, +} + def setup_services(hass, coordinators: dict) -> None: """设置 HA 服务。 @@ -141,6 +145,20 @@ def setup_services(hass, coordinators: dict) -> None: target_address, element_address, opcode, payload ) + async def handle_send_raw_command(call) -> None: + """处理发送原始串口命令服务调用.""" + hex_data = call.data.get("hex_data") + try: + raw_data = bytes.fromhex(hex_data) + for coordinator in coordinators.values(): + if coordinator.serial_reader.is_connected: + await coordinator.serial_reader.write(raw_data) + _LOGGER.info("已发送原始命令:%s", hex_data) + else: + _LOGGER.warning("串口未连接,无法发送命令") + except ValueError as e: + _LOGGER.error("无效的十六进制数据:%s, 错误:%s", hex_data, e) + # 注册服务 hass.services.async_register( DOMAIN, @@ -190,3 +208,10 @@ def setup_services(hass, coordinators: dict) -> None: handle_send_vendor_command, schema=vol.Schema(SERVICE_SCHEMA_SEND_VENDOR_COMMAND), ) + + hass.services.async_register( + DOMAIN, + "send_raw_command", + handle_send_raw_command, + schema=vol.Schema(SERVICE_SCHEMA_SEND_RAW_COMMAND), + )