impress_sig_mesh_hacs/custom_components/sigmesh_gateway/config_flow.py
impressionyang e7a34282a3 fix: config_flow.py 缺少 _LOGGER 定义
添加 logging 导入和 _LOGGER 定义
2026-04-16 17:33:32 +08:00

428 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""SigMesh Gateway 配置流程."""
from __future__ import annotations
import logging
from typing import Any
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import selector
from .const import (
CONF_APP_KEY,
CONF_BAUDRATE,
CONF_GROUP_ADDRESS,
CONF_IV_INDEX,
CONF_NETWORK_ID,
CONF_NETWORK_KEY,
CONF_SERIAL_DEVICE,
DEFAULT_APP_KEY,
DEFAULT_BAUDRATE,
DEFAULT_GROUP_ADDRESS_START,
DEFAULT_IV_INDEX,
DEFAULT_NAME,
DEFAULT_NETWORK_ID,
DEFAULT_NETWORK_KEY,
DOMAIN,
PROV_TIMEOUT,
)
_LOGGER = logging.getLogger(__name__)
def _get_serial_ports() -> list:
"""获取可用串口列表(在同步线程中执行)."""
import serial.tools.list_ports
return serial.tools.list_ports.comports()
def _test_serial_connection(device: str, baudrate: int) -> bool:
"""测试串口连接(在同步线程中执行)."""
import serial
test_serial = serial.Serial(device, baudrate=baudrate, timeout=0.5)
test_serial.close()
return True
class SigMeshGatewayConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""SigMesh Gateway 配置流程."""
VERSION = 1
def __init__(self) -> None:
"""初始化配置流程."""
self._errors: dict[str, str] = {}
self._user_input: dict[str, Any] | None = None
self._prov_config: dict[str, Any] | None = None
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""处理用户配置步骤 - 串口配置."""
self._errors = {}
if user_input is not None:
# 验证串口连接(在线程池中执行,避免阻塞事件循环)
try:
await self.hass.async_add_executor_job(
_test_serial_connection,
user_input[CONF_SERIAL_DEVICE],
user_input.get(CONF_BAUDRATE, DEFAULT_BAUDRATE),
)
self._user_input = user_input
# 进入配网配置步骤
return await self.async_step_prov_config()
except (ImportError, serial.SerialException) as e:
self._errors[CONF_SERIAL_DEVICE] = f"无法打开串口:{e}"
except Exception as e:
self._errors["base"] = f"验证失败:{e}"
# 获取可用串口列表(在线程池中执行,避免阻塞事件循环)
try:
ports = await self.hass.async_add_executor_job(_get_serial_ports)
port_list = [
selector.SelectOptionDict(value=p.device, label=f"{p.device} - {p.description}")
for p in ports
]
except Exception:
port_list = [
selector.SelectOptionDict(value="/dev/ttyUSB0", label="/dev/ttyUSB0"),
selector.SelectOptionDict(value="/dev/ttyUSB1", label="/dev/ttyUSB1"),
selector.SelectOptionDict(value="/dev/ttyACM0", label="/dev/ttyACM0"),
]
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_SERIAL_DEVICE, default="/dev/ttyUSB0"): selector.SelectSelector(
selector.SelectSelectorConfig(options=port_list),
),
vol.Required(CONF_BAUDRATE, default=DEFAULT_BAUDRATE): vol.Coerce(int),
}
),
errors=self._errors,
)
async def async_step_prov_config(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""处理配网配置步骤 - 首次使用需要配置。
根据文档USB dongle 首次使用需要设置配网参数:
- Network Key (16 字节网络密钥)
- App Key (16 字节应用密钥)
- Network ID (2 字节网络 ID)
- IV Index (4 字节)
"""
self._errors = {}
if user_input is not None:
self._prov_config = user_input
# 保存所有配置,完成配置流程
return self.async_create_entry(
title=self._user_input.get(CONF_SERIAL_DEVICE, DEFAULT_NAME),
data={
**self._user_input,
**user_input,
},
)
# 配网配置表单
return self.async_show_form(
step_id="prov_config",
data_schema=vol.Schema(
{
vol.Required(CONF_NETWORK_KEY, default=DEFAULT_NETWORK_KEY): str,
vol.Required(CONF_APP_KEY, default=DEFAULT_APP_KEY): str,
vol.Required(CONF_NETWORK_ID, default=DEFAULT_NETWORK_ID): str,
vol.Required(CONF_IV_INDEX, default=DEFAULT_IV_INDEX): int,
vol.Required(
CONF_GROUP_ADDRESS,
default=hex(DEFAULT_GROUP_ADDRESS_START),
): str,
}
),
description_placeholders={
"timeout": PROV_TIMEOUT,
},
errors=self._errors,
)
@callback
def async_get_options_flow(
self,
) -> SigMeshGatewayOptionsFlow:
"""获取选项流程."""
return SigMeshGatewayOptionsFlow()
class SigMeshGatewayOptionsFlow(config_entries.OptionsFlow):
"""SigMesh Gateway 选项流程."""
def __init__(self) -> None:
"""初始化选项流程."""
self._errors: dict[str, str] = {}
self._prov_action: str | None = None
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""管理选项."""
# 显示主菜单
return self.async_show_menu(
step_id="init",
menu_options=["poll_config", "prov_action", "group_config"],
)
async def async_step_poll_config(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""配置轮询间隔."""
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
return self.async_show_form(
step_id="poll_config",
data_schema=vol.Schema(
{
vol.Required(
"poll_interval",
default=self.config_entry.options.get("poll_interval", 30),
): int,
}
),
)
async def async_step_prov_action(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""配网操作选择."""
errors = {}
if user_input is not None:
action = user_input.get("action")
if action == "start_scan":
return await self.async_step_start_scan()
elif action == "stop_prov":
return await self.async_step_stop_prov()
elif action == "bind_appkey":
return await self.async_step_bind_appkey()
errors["base"] = "无效的操作"
return self.async_show_form(
step_id="prov_action",
data_schema=vol.Schema(
{
vol.Required("action"): selector.SelectSelector(
selector.SelectSelectorConfig(
options=[
selector.SelectOptionDict(value="start_scan", label="开始扫描设备"),
selector.SelectOptionDict(value="stop_prov", label="停止配网"),
selector.SelectOptionDict(value="bind_appkey", label="绑定 App Key"),
],
),
),
}
),
errors=errors,
)
async def async_step_start_scan(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""开始扫描设备."""
if user_input is not None:
# 调用配网管理器开始扫描
try:
_LOGGER.info("开始调用 start_scanningDOMAIN 数据:%s", list(self.hass.data.get(DOMAIN, {}).keys()))
for coordinator in self.hass.data.get(DOMAIN, {}).values():
if hasattr(coordinator, 'start_scanning'):
_LOGGER.info("调用 coordinator.start_scanning()")
await coordinator.start_scanning()
_LOGGER.info("coordinator.start_scanning() 调用完成")
except Exception as e:
_LOGGER.error("start_scanning 调用失败:%s", e, exc_info=True)
self._errors["base"] = str(e)
return await self.async_step_prov_action()
return self.async_create_entry(title="", data={})
return self.async_show_form(
step_id="start_scan",
description_placeholders={"timeout": PROV_TIMEOUT},
data_schema=vol.Schema(
{
vol.Required("confirm"): bool,
}
),
errors=self._errors,
)
async def async_step_stop_prov(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""停止配网."""
if user_input is not None:
# 调用配网管理器停止配网
try:
for coordinator in self.hass.data.get(DOMAIN, {}).values():
if hasattr(coordinator, 'stop_provisioning'):
await coordinator.stop_provisioning()
except Exception as e:
self._errors["base"] = str(e)
return await self.async_step_prov_action()
return self.async_create_entry(title="", data={})
return self.async_show_form(
step_id="stop_prov",
data_schema=vol.Schema(
{
vol.Required("confirm"): bool,
}
),
errors=self._errors,
)
async def async_step_bind_appkey(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""绑定 App Key."""
if user_input is not None:
# 调用配网管理器绑定 App Key
try:
device_address = user_input.get("device_address")
element_address = user_input.get("element_address", 0)
for coordinator in self.hass.data.get(DOMAIN, {}).values():
if hasattr(coordinator, 'bind_app_key'):
await coordinator.bind_app_key(device_address, element_address)
except Exception as e:
self._errors["base"] = str(e)
return await self.async_step_prov_action()
return self.async_create_entry(title="", data={})
return self.async_show_form(
step_id="bind_appkey",
data_schema=vol.Schema(
{
vol.Required("device_address"): str,
vol.Required("element_address", default=0): int,
}
),
errors=self._errors,
)
async def async_step_group_config(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""分组配置."""
if user_input is not None:
action = user_input.get("group_action")
if action == "add_to_group":
return await self.async_step_add_to_group()
elif action == "remove_from_group":
return await self.async_step_remove_from_group()
return self.async_show_form(
step_id="group_config",
data_schema=vol.Schema(
{
vol.Required("group_action"): selector.SelectSelector(
selector.SelectSelectorConfig(
options=[
selector.SelectOptionDict(value="add_to_group", label="添加到组"),
selector.SelectOptionDict(value="remove_from_group", label="从组移除"),
],
),
),
}
),
)
async def async_step_add_to_group(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""添加设备到组."""
if user_input is not None:
# 调用配网管理器添加设备到组
try:
target_address = user_input.get("target_address")
element_address = user_input.get("element_address", 0)
group_address = user_input.get("group_address")
model_id = user_input.get("model_id", 4352)
is_sig = user_input.get("is_sig", True)
# 解析组地址
if isinstance(group_address, str):
group_address = int(group_address, 16)
for coordinator in self.hass.data.get(DOMAIN, {}).values():
if hasattr(coordinator, 'add_device_to_group'):
await coordinator.add_device_to_group(
target_address, element_address, group_address, model_id, is_sig
)
except Exception as e:
self._errors["base"] = str(e)
return await self.async_step_group_config()
return self.async_create_entry(title="", data={})
return self.async_show_form(
step_id="add_to_group",
data_schema=vol.Schema(
{
vol.Required("target_address"): str,
vol.Required("element_address", default=0): int,
vol.Required("group_address", default=hex(DEFAULT_GROUP_ADDRESS_START)): str,
vol.Required("model_id", default=4352): int,
vol.Required("is_sig", default=True): bool,
}
),
description_placeholders={
"default_group": hex(DEFAULT_GROUP_ADDRESS_START),
},
errors=self._errors,
)
async def async_step_remove_from_group(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""从组中移除设备."""
if user_input is not None:
# 调用配网管理器移除设备
try:
target_address = user_input.get("target_address")
element_address = user_input.get("element_address", 0)
group_address = user_input.get("group_address")
model_id = user_input.get("model_id", 4352)
is_sig = user_input.get("is_sig", True)
# 解析组地址
if isinstance(group_address, str):
group_address = int(group_address, 16)
for coordinator in self.hass.data.get(DOMAIN, {}).values():
if hasattr(coordinator, 'remove_device_from_group'):
await coordinator.remove_device_from_group(
target_address, element_address, group_address, model_id, is_sig
)
except Exception as e:
self._errors["base"] = str(e)
return await self.async_step_group_config()
return self.async_create_entry(title="", data={})
return self.async_show_form(
step_id="remove_from_group",
data_schema=vol.Schema(
{
vol.Required("target_address"): str,
vol.Required("element_address", default=0): int,
vol.Required("group_address", default=hex(DEFAULT_GROUP_ADDRESS_START)): str,
vol.Required("model_id", default=4352): int,
vol.Required("is_sig", default=True): bool,
}
),
errors=self._errors,
)