"""SigMesh Gateway 配置流程.""" from __future__ import annotations import logging from typing import Any import voluptuous as vol from homeassistant import config_entries from homeassistant.core import callback from homeassistant.data_entry_flow import FlowResult from homeassistant.helpers import selector from .const import ( CONF_APP_KEY, CONF_BAUDRATE, CONF_GROUP_ADDRESS, CONF_IV_INDEX, CONF_NETWORK_ID, CONF_NETWORK_KEY, CONF_SERIAL_DEVICE, DEFAULT_APP_KEY, DEFAULT_BAUDRATE, DEFAULT_GROUP_ADDRESS_START, DEFAULT_IV_INDEX, DEFAULT_NAME, DEFAULT_NETWORK_ID, DEFAULT_NETWORK_KEY, DOMAIN, PROV_TIMEOUT, ) _LOGGER = logging.getLogger(__name__) def _get_serial_ports() -> list: """获取可用串口列表(在同步线程中执行).""" import serial.tools.list_ports return serial.tools.list_ports.comports() def _test_serial_connection(device: str, baudrate: int) -> bool: """测试串口连接(在同步线程中执行).""" import serial test_serial = serial.Serial(device, baudrate=baudrate, timeout=0.5) test_serial.close() return True class SigMeshGatewayConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): """SigMesh Gateway 配置流程.""" VERSION = 1 def __init__(self) -> None: """初始化配置流程.""" self._errors: dict[str, str] = {} self._user_input: dict[str, Any] | None = None self._prov_config: dict[str, Any] | None = None async def async_step_user( self, user_input: dict[str, Any] | None = None ) -> FlowResult: """处理用户配置步骤 - 串口配置.""" self._errors = {} if user_input is not None: # 验证串口连接(在线程池中执行,避免阻塞事件循环) try: await self.hass.async_add_executor_job( _test_serial_connection, user_input[CONF_SERIAL_DEVICE], user_input.get(CONF_BAUDRATE, DEFAULT_BAUDRATE), ) self._user_input = user_input # 进入配网配置步骤 return await self.async_step_prov_config() except (ImportError, serial.SerialException) as e: self._errors[CONF_SERIAL_DEVICE] = f"无法打开串口:{e}" except Exception as e: self._errors["base"] = f"验证失败:{e}" # 获取可用串口列表(在线程池中执行,避免阻塞事件循环) try: ports = await self.hass.async_add_executor_job(_get_serial_ports) port_list = [ selector.SelectOptionDict(value=p.device, label=f"{p.device} - {p.description}") for p in ports ] except Exception: port_list = [ selector.SelectOptionDict(value="/dev/ttyUSB0", label="/dev/ttyUSB0"), selector.SelectOptionDict(value="/dev/ttyUSB1", label="/dev/ttyUSB1"), selector.SelectOptionDict(value="/dev/ttyACM0", label="/dev/ttyACM0"), ] return self.async_show_form( step_id="user", data_schema=vol.Schema( { vol.Required(CONF_SERIAL_DEVICE, default="/dev/ttyUSB0"): selector.SelectSelector( selector.SelectSelectorConfig(options=port_list), ), vol.Required(CONF_BAUDRATE, default=DEFAULT_BAUDRATE): vol.Coerce(int), } ), errors=self._errors, ) async def async_step_prov_config( self, user_input: dict[str, Any] | None = None ) -> FlowResult: """处理配网配置步骤 - 首次使用需要配置。 根据文档,USB dongle 首次使用需要设置配网参数: - Network Key (16 字节网络密钥) - App Key (16 字节应用密钥) - Network ID (2 字节网络 ID) - IV Index (4 字节) """ self._errors = {} if user_input is not None: self._prov_config = user_input # 保存所有配置,完成配置流程 return self.async_create_entry( title=self._user_input.get(CONF_SERIAL_DEVICE, DEFAULT_NAME), data={ **self._user_input, **user_input, }, ) # 配网配置表单 return self.async_show_form( step_id="prov_config", data_schema=vol.Schema( { vol.Required(CONF_NETWORK_KEY, default=DEFAULT_NETWORK_KEY): str, vol.Required(CONF_APP_KEY, default=DEFAULT_APP_KEY): str, vol.Required(CONF_NETWORK_ID, default=DEFAULT_NETWORK_ID): str, vol.Required(CONF_IV_INDEX, default=DEFAULT_IV_INDEX): int, vol.Required( CONF_GROUP_ADDRESS, default=hex(DEFAULT_GROUP_ADDRESS_START), ): str, } ), description_placeholders={ "timeout": PROV_TIMEOUT, }, errors=self._errors, ) @callback def async_get_options_flow( self, ) -> SigMeshGatewayOptionsFlow: """获取选项流程.""" return SigMeshGatewayOptionsFlow() class SigMeshGatewayOptionsFlow(config_entries.OptionsFlow): """SigMesh Gateway 选项流程.""" def __init__(self) -> None: """初始化选项流程.""" self._errors: dict[str, str] = {} self._prov_action: str | None = None async def async_step_init( self, user_input: dict[str, Any] | None = None ) -> FlowResult: """管理选项.""" # 显示主菜单 return self.async_show_menu( step_id="init", menu_options=["poll_config", "prov_action", "group_config"], ) async def async_step_poll_config( self, user_input: dict[str, Any] | None = None ) -> FlowResult: """配置轮询间隔.""" if user_input is not None: return self.async_create_entry(title="", data=user_input) return self.async_show_form( step_id="poll_config", data_schema=vol.Schema( { vol.Required( "poll_interval", default=self.config_entry.options.get("poll_interval", 30), ): int, } ), ) async def async_step_prov_action( self, user_input: dict[str, Any] | None = None ) -> FlowResult: """配网操作选择.""" errors = {} if user_input is not None: action = user_input.get("action") if action == "start_scan": return await self.async_step_start_scan() elif action == "stop_prov": return await self.async_step_stop_prov() elif action == "bind_appkey": return await self.async_step_bind_appkey() errors["base"] = "无效的操作" return self.async_show_form( step_id="prov_action", data_schema=vol.Schema( { vol.Required("action"): selector.SelectSelector( selector.SelectSelectorConfig( options=[ selector.SelectOptionDict(value="start_scan", label="开始扫描设备"), selector.SelectOptionDict(value="stop_prov", label="停止配网"), selector.SelectOptionDict(value="bind_appkey", label="绑定 App Key"), ], ), ), } ), errors=errors, ) async def async_step_start_scan( self, user_input: dict[str, Any] | None = None ) -> FlowResult: """开始扫描设备.""" if user_input is not None: # 调用配网管理器开始扫描 try: _LOGGER.info("开始调用 start_scanning,DOMAIN 数据:%s", list(self.hass.data.get(DOMAIN, {}).keys())) for coordinator in self.hass.data.get(DOMAIN, {}).values(): if hasattr(coordinator, 'start_scanning'): _LOGGER.info("调用 coordinator.start_scanning()") await coordinator.start_scanning() _LOGGER.info("coordinator.start_scanning() 调用完成") except Exception as e: _LOGGER.error("start_scanning 调用失败:%s", e, exc_info=True) self._errors["base"] = str(e) return await self.async_step_prov_action() return self.async_create_entry(title="", data={}) return self.async_show_form( step_id="start_scan", description_placeholders={"timeout": PROV_TIMEOUT}, data_schema=vol.Schema( { vol.Required("confirm"): bool, } ), errors=self._errors, ) async def async_step_stop_prov( self, user_input: dict[str, Any] | None = None ) -> FlowResult: """停止配网.""" if user_input is not None: # 调用配网管理器停止配网 try: for coordinator in self.hass.data.get(DOMAIN, {}).values(): if hasattr(coordinator, 'stop_provisioning'): await coordinator.stop_provisioning() except Exception as e: self._errors["base"] = str(e) return await self.async_step_prov_action() return self.async_create_entry(title="", data={}) return self.async_show_form( step_id="stop_prov", data_schema=vol.Schema( { vol.Required("confirm"): bool, } ), errors=self._errors, ) async def async_step_bind_appkey( self, user_input: dict[str, Any] | None = None ) -> FlowResult: """绑定 App Key.""" if user_input is not None: # 调用配网管理器绑定 App Key try: device_address = user_input.get("device_address") element_address = user_input.get("element_address", 0) for coordinator in self.hass.data.get(DOMAIN, {}).values(): if hasattr(coordinator, 'bind_app_key'): await coordinator.bind_app_key(device_address, element_address) except Exception as e: self._errors["base"] = str(e) return await self.async_step_prov_action() return self.async_create_entry(title="", data={}) return self.async_show_form( step_id="bind_appkey", data_schema=vol.Schema( { vol.Required("device_address"): str, vol.Required("element_address", default=0): int, } ), errors=self._errors, ) async def async_step_group_config( self, user_input: dict[str, Any] | None = None ) -> FlowResult: """分组配置.""" if user_input is not None: action = user_input.get("group_action") if action == "add_to_group": return await self.async_step_add_to_group() elif action == "remove_from_group": return await self.async_step_remove_from_group() return self.async_show_form( step_id="group_config", data_schema=vol.Schema( { vol.Required("group_action"): selector.SelectSelector( selector.SelectSelectorConfig( options=[ selector.SelectOptionDict(value="add_to_group", label="添加到组"), selector.SelectOptionDict(value="remove_from_group", label="从组移除"), ], ), ), } ), ) async def async_step_add_to_group( self, user_input: dict[str, Any] | None = None ) -> FlowResult: """添加设备到组.""" if user_input is not None: # 调用配网管理器添加设备到组 try: target_address = user_input.get("target_address") element_address = user_input.get("element_address", 0) group_address = user_input.get("group_address") model_id = user_input.get("model_id", 4352) is_sig = user_input.get("is_sig", True) # 解析组地址 if isinstance(group_address, str): group_address = int(group_address, 16) for coordinator in self.hass.data.get(DOMAIN, {}).values(): if hasattr(coordinator, 'add_device_to_group'): await coordinator.add_device_to_group( target_address, element_address, group_address, model_id, is_sig ) except Exception as e: self._errors["base"] = str(e) return await self.async_step_group_config() return self.async_create_entry(title="", data={}) return self.async_show_form( step_id="add_to_group", data_schema=vol.Schema( { vol.Required("target_address"): str, vol.Required("element_address", default=0): int, vol.Required("group_address", default=hex(DEFAULT_GROUP_ADDRESS_START)): str, vol.Required("model_id", default=4352): int, vol.Required("is_sig", default=True): bool, } ), description_placeholders={ "default_group": hex(DEFAULT_GROUP_ADDRESS_START), }, errors=self._errors, ) async def async_step_remove_from_group( self, user_input: dict[str, Any] | None = None ) -> FlowResult: """从组中移除设备.""" if user_input is not None: # 调用配网管理器移除设备 try: target_address = user_input.get("target_address") element_address = user_input.get("element_address", 0) group_address = user_input.get("group_address") model_id = user_input.get("model_id", 4352) is_sig = user_input.get("is_sig", True) # 解析组地址 if isinstance(group_address, str): group_address = int(group_address, 16) for coordinator in self.hass.data.get(DOMAIN, {}).values(): if hasattr(coordinator, 'remove_device_from_group'): await coordinator.remove_device_from_group( target_address, element_address, group_address, model_id, is_sig ) except Exception as e: self._errors["base"] = str(e) return await self.async_step_group_config() return self.async_create_entry(title="", data={}) return self.async_show_form( step_id="remove_from_group", data_schema=vol.Schema( { vol.Required("target_address"): str, vol.Required("element_address", default=0): int, vol.Required("group_address", default=hex(DEFAULT_GROUP_ADDRESS_START)): str, vol.Required("model_id", default=4352): int, vol.Required("is_sig", default=True): bool, } ), errors=self._errors, )