From b68566edd19d70984b837c2c49a605aaee0c28d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=8D=E6=B1=82=E5=9C=A3=E5=89=91?= Date: Tue, 18 Nov 2025 16:59:52 +0800 Subject: [PATCH] feat: implement data coordinator for JackeryHome integration - Introduced a new `JackeryDataCoordinator` class to manage MQTT subscriptions and data retrieval for all sensor entities. - Updated the setup process to create and start the coordinator, allowing shared data handling among sensors. - Enhanced sensor lifecycle management by registering and unregistering sensors with the coordinator. - Improved logging for better visibility into the coordinator's operations and sensor updates. --- custom_components/JackeryHome/__init__.py | 14 +- custom_components/JackeryHome/sensor.py | 485 +++++++++++----------- 2 files changed, 264 insertions(+), 235 deletions(-) diff --git a/custom_components/JackeryHome/__init__.py b/custom_components/JackeryHome/__init__.py index fd89024..ff2aa1a 100644 --- a/custom_components/JackeryHome/__init__.py +++ b/custom_components/JackeryHome/__init__.py @@ -15,9 +15,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up JackeryHome from a config entry.""" _LOGGER.info("Setting up JackeryHome integration") - # 存储配置数据 + # 初始化存储结构 hass.data.setdefault(DOMAIN, {}) - hass.data[DOMAIN][entry.entry_id] = entry.data + hass.data[DOMAIN][entry.entry_id] = { + "config": entry.data, + "coordinator": None, # 将在 sensor.py 中设置 + } # 加载传感器平台 await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) @@ -29,6 +32,13 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unload a config entry.""" _LOGGER.info("Unloading JackeryHome integration") + # 停止协调器 + entry_data = hass.data[DOMAIN].get(entry.entry_id, {}) + coordinator = entry_data.get("coordinator") + if coordinator: + await coordinator.async_stop() + _LOGGER.info("Coordinator stopped") + # 卸载传感器平台 unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) diff --git a/custom_components/JackeryHome/sensor.py b/custom_components/JackeryHome/sensor.py index ae70d33..5b3fd1d 100644 --- a/custom_components/JackeryHome/sensor.py +++ b/custom_components/JackeryHome/sensor.py @@ -4,7 +4,7 @@ import json import logging import time import random -from typing import Any +from typing import Any, Callable from homeassistant.components import mqtt as ha_mqtt from homeassistant.components.sensor import ( @@ -139,6 +139,230 @@ SENSORS = { } +class JackeryDataCoordinator: + """协调器:管理MQTT订阅和数据获取,供所有传感器实体共享使用.""" + + def __init__(self, hass: HomeAssistant, topic_prefix: str) -> None: + """初始化协调器.""" + self.hass = hass + self._topic_prefix = topic_prefix + self._data_topic = "v1/iot_gw/gw/data" # 接收设备响应数据的主题 + self._data_get_topic = "v1/iot_gw/cloud/data" # 发送数据请求的主题 + self._gw_lwt_topic = "v1/iot_gw/gw_lwt" # LWT 主题 + self._device_sn = "26392658575364" # 默认设备序列号 + self._sensors = {} # 存储所有传感器实体的引用 {sensor_id: entity} + self._data_task = None # 定时数据请求任务 + self._subscribed = False # 标记是否已订阅 + + def register_sensor(self, sensor_id: str, entity: "JackeryHomeSensor") -> None: + """注册传感器实体到协调器.""" + self._sensors[sensor_id] = entity + _LOGGER.debug(f"Registered sensor {sensor_id} to coordinator") + + def unregister_sensor(self, sensor_id: str) -> None: + """从协调器注销传感器实体.""" + if sensor_id in self._sensors: + del self._sensors[sensor_id] + _LOGGER.debug(f"Unregistered sensor {sensor_id} from coordinator") + + async def async_start(self) -> None: + """启动协调器:订阅MQTT主题并开始定期请求数据.""" + if self._subscribed: + return + + # 订阅 LWT topic + @callback + def lwt_message_received(msg): + """处理 LWT 消息.""" + self._handle_lwt_message(msg) + + await ha_mqtt.async_subscribe( + self.hass, + self._gw_lwt_topic, + lwt_message_received, + 1 + ) + _LOGGER.info(f"Coordinator subscribed to LWT topic: {self._gw_lwt_topic}") + + # 订阅数据响应 topic + @callback + def data_message_received(msg): + """处理数据响应消息.""" + self._handle_data_message(msg) + + await ha_mqtt.async_subscribe( + self.hass, + self._data_topic, + data_message_received, + 1 + ) + _LOGGER.info(f"Coordinator subscribed to data topic: {self._data_topic}") + + self._subscribed = True + + # 启动定时请求任务 + self._data_task = asyncio.create_task(self._periodic_data_request()) + + async def async_stop(self) -> None: + """停止协调器:取消定时任务.""" + if self._data_task and not self._data_task.done(): + self._data_task.cancel() + try: + await self._data_task + except asyncio.CancelledError: + pass + _LOGGER.info("Coordinator stopped") + + def _handle_lwt_message(self, msg) -> None: + """处理 LWT 消息,获取设备序列号.""" + try: + payload = msg.payload + if isinstance(payload, bytes): + payload = payload.decode("utf-8") + + _LOGGER.debug(f"Coordinator received LWT message: {payload}") + + data = json.loads(payload) + if isinstance(data, dict) and "gw_sn" in data: + self._device_sn = data["gw_sn"] + _LOGGER.info(f"Device serial number updated: {self._device_sn}") + + except Exception as e: + _LOGGER.error(f"Error processing LWT message: {e}") + + def _handle_data_message(self, msg) -> None: + """处理数据响应消息,解析后分发给所有传感器.""" + try: + payload = msg.payload + if isinstance(payload, bytes): + payload = payload.decode("utf-8") + + _LOGGER.debug(f"Coordinator received data message: {payload}") + + try: + data = json.loads(payload) + except json.JSONDecodeError: + _LOGGER.warning(f"Failed to parse data message: {payload}") + return + + # 处理 data_get 响应格式 + if isinstance(data, dict) and data.get("cmd") == "data_get": + self._parse_and_distribute_data(data) + + except Exception as e: + _LOGGER.error(f"Error processing data message: {e}") + + def _parse_and_distribute_data(self, data: dict) -> None: + """解析 data_get 响应并分发给对应的传感器.""" + try: + info = data.get("info", {}) + dev_list = info.get("dev_list", []) + + # 遍历所有设备和meter + for dev in dev_list: + meter_list = dev.get("meter_list", []) + for meter in meter_list: + # 响应格式:meter 是 [meter_sn, meter_value] + if not isinstance(meter, (list, tuple)) or len(meter) < 2: + continue + + meter_sn = str(meter[0]) + try: + meter_value_float = float(meter[1]) + # 如果小数部分为 0,转换为 int + meter_value = ( + int(meter_value_float) + if meter_value_float == int(meter_value_float) + else meter_value_float + ) + except (ValueError, TypeError): + _LOGGER.debug(f"Invalid meter value: {meter[1]}") + continue + + # 根据 meter_sn 找到对应的传感器并更新 + self._update_sensors_by_meter_sn(meter_sn, meter_value) + + except Exception as e: + _LOGGER.error(f"Error parsing and distributing data: {e}") + + def _update_sensors_by_meter_sn(self, meter_sn: str, meter_value: float) -> None: + """根据 meter_sn 更新对应的传感器.""" + # 遍历所有传感器,找到匹配的 meter_sn + for sensor_id, entity in self._sensors.items(): + if str(entity._meter_sn) == meter_sn: + # 处理特殊传感器的值(如正负分离) + processed_value = entity._process_meter_value(meter_value) + entity._update_sensor_value(processed_value) + + def _construct_data_get_request(self) -> dict: + """构造 data_get 请求,包含所有传感器的 meter_sn.""" + # 收集所有唯一的 meter_sn + meter_sns = set() + for entity in self._sensors.values(): + if entity._meter_sn: + meter_sns.add(entity._meter_sn) + + return { + "cmd": "data_get", + "gw_sn": self._device_sn or "", + "timestamp": str(int(time.time() * 1000)), + "token": str(random.randint(1000, 9999)), + "info": { + "dev_list": [ + { + "dev_sn": f"ems_{self._device_sn}" if self._device_sn else "ems_", + "meter_list": list(meter_sns), # 一次性请求所有 meter_sn + } + ] + } + } + + async def _periodic_data_request(self) -> None: + """定期发送数据请求(所有传感器共用一个请求).""" + _LOGGER.info("Coordinator starting periodic data request...") + await asyncio.sleep(2) # 等待 MQTT 连接建立 + + while True: + try: + if not self._device_sn: + _LOGGER.debug("Device serial number not available, waiting...") + await asyncio.sleep(REQUEST_INTERVAL) + continue + + if not self._sensors: + _LOGGER.debug("No sensors registered yet, waiting...") + await asyncio.sleep(REQUEST_INTERVAL) + continue + + try: + # 构造并发送包含所有 meter_sn 的请求 + request_data = self._construct_data_get_request() + await ha_mqtt.async_publish( + self.hass, + self._data_get_topic, + json.dumps(request_data, ensure_ascii=False), + 1, + False + ) + _LOGGER.debug( + f"Coordinator sent data_get request for {len(self._sensors)} sensors " + f"to {self._data_get_topic}" + ) + except Exception as mqtt_error: + _LOGGER.warning( + f"MQTT not ready: {mqtt_error}. Will retry in {REQUEST_INTERVAL} seconds..." + ) + + await asyncio.sleep(REQUEST_INTERVAL) + + except asyncio.CancelledError: + _LOGGER.info("Coordinator periodic data request task cancelled") + raise + except Exception as e: + _LOGGER.error(f"Unexpected error in coordinator periodic data request: {e}") + await asyncio.sleep(REQUEST_INTERVAL) + + async def async_setup_entry( hass: HomeAssistant, config_entry: ConfigEntry, @@ -153,6 +377,12 @@ async def async_setup_entry( _LOGGER.info(f"Topic prefix: {topic_prefix}") + # 创建协调器(全局唯一,所有传感器共享) + coordinator = JackeryDataCoordinator(hass, topic_prefix) + + # 将协调器存储到 hass.data 中,供其他地方使用 + hass.data[DOMAIN][config_entry.entry_id]["coordinator"] = coordinator + # 创建所有传感器实体 entities = [] for sensor_id, sensor_config in SENSORS.items(): @@ -165,11 +395,17 @@ async def async_setup_entry( state_class=sensor_config["state_class"], topic_prefix=topic_prefix, config_entry_id=config_entry.entry_id, + coordinator=coordinator, # 传入协调器 ) entities.append(entity) + # 添加实体 async_add_entities(entities) - _LOGGER.info(f"Added {len(entities)} JackeryHome sensors") + + # 启动协调器(在所有传感器添加后) + await coordinator.async_start() + + _LOGGER.info(f"Added {len(entities)} JackeryHome sensors with shared coordinator") class JackeryHomeSensor(SensorEntity): @@ -185,6 +421,7 @@ class JackeryHomeSensor(SensorEntity): state_class: SensorStateClass, topic_prefix: str, config_entry_id: str, + coordinator: JackeryDataCoordinator, ) -> None: """Initialize the sensor.""" self._sensor_id = sensor_id @@ -201,17 +438,12 @@ class JackeryHomeSensor(SensorEntity): "model": "Energy Monitor", "sw_version": "1.0.5", } - self._topic = f"{topic_prefix}/{sensor_id}/state" - self._data_topic = "v1/iot_gw/gw/data" # 接收设备响应数据的主题 - self._data_get_topic = "v1/iot_gw/cloud/data" # 发送数据请求的基础主题(需要加上 device_sn) - self._gw_lwt_topic = "v1/iot_gw/gw_lwt" self._attr_native_value = None self._attr_available = False - self._data_task = None - ###TODO: 从 LWT 消息中获取设备序列号 - self._device_sn = "26392658575364" # 设备序列号(从 LWT 消息中获取) self._attr_should_poll = False - self._attr_has_entity_name = True + self._attr_has_entity_name = True + self._coordinator = coordinator # 协调器引用 + # 获取 meter_sn,对于功率传感器,使用对应的 _power 键 meter_sn_key_map = { "grid_import": "grid_import_power", @@ -230,26 +462,6 @@ class JackeryHomeSensor(SensorEntity): """No polling needed.""" return False - def _handle_lwt_message(self, msg) -> None: - """Handle LWT messages to get device serial number.""" - try: - payload = msg.payload - if isinstance(payload, bytes): - payload = payload.decode("utf-8") - - _LOGGER.debug(f"Received LWT message: {payload}") - - try: - data = json.loads(payload) - if isinstance(data, dict) and "gw_sn" in data: - self._device_sn = data["gw_sn"] - _LOGGER.info(f"Device serial number updated: {self._device_sn}") - except json.JSONDecodeError: - _LOGGER.warning(f"Failed to parse LWT message: {payload}") - - except Exception as e: - _LOGGER.error(f"Error processing LWT message: {e}") - def _process_meter_value(self, meter_value: float) -> float: """根据传感器类型处理 meter 值.""" if self._sensor_id == "grid_import": @@ -263,216 +475,26 @@ class JackeryHomeSensor(SensorEntity): else: return meter_value - def _handle_data_message(self, msg) -> None: - """Handle new MQTT messages from device/data topic.""" - try: - payload = msg.payload - if isinstance(payload, bytes): - payload = payload.decode("utf-8") - - _LOGGER.debug(f"Received data message for {self._sensor_id}: {payload}") - - data = None - try: - data = json.loads(payload) - except json.JSONDecodeError: - data = None - - # 处理 data_get 响应 - if isinstance(data, dict) and data.get("cmd") == "data_get": - value = self._parse_data_get_response(data) - if value is not None: - self._update_sensor_value(value) - return - - # 兼容旧格式的数据 - if isinstance(data, dict): - if self._sensor_id in data: - value = data[self._sensor_id] - elif "value" in data: - value = data["value"] - else: - value = data - else: - try: - value = float(payload) - except ValueError: - self._attr_available = False - self.async_write_ha_state() - return - - if value is not None: - self._update_sensor_value(value) - - except Exception as e: - _LOGGER.error(f"Error processing data message for {self._sensor_id}: {e}") - def _update_sensor_value(self, value: Any) -> None: - """更新传感器值并通知 Home Assistant.""" + """更新传感器值并通知 Home Assistant(由协调器调用).""" self._attr_native_value = value self._attr_available = True self.async_write_ha_state() _LOGGER.debug(f"Updated {self._sensor_id} with value: {value}") async def async_added_to_hass(self) -> None: - """Set up the sensor.""" + """传感器添加到 Home Assistant 时,注册到协调器.""" await super().async_added_to_hass() - _LOGGER.info(f"JackeryHome sensor {self._sensor_id} added to Home Assistant") - - # 创建 LWT 消息处理回调 - @callback - def lwt_message_received(msg): - """Callback wrapper for LWT messages.""" - self._handle_lwt_message(msg) - - # 创建数据消息处理回调 - @callback - def data_message_received(msg): - """Callback wrapper for data messages.""" - self._handle_data_message(msg) - - # 订阅 LWT topic 以获取设备序列号 - await ha_mqtt.async_subscribe( - self.hass, - self._gw_lwt_topic, - lwt_message_received, - 1 - ) - _LOGGER.info(f"Subscribed to MQTT topic: {self._gw_lwt_topic}") - - # 订阅 device/data topic - await ha_mqtt.async_subscribe( - self.hass, - self._data_topic, - data_message_received, - 1 - ) - _LOGGER.info(f"Subscribed to MQTT topic: {self._data_topic}") - - # 启动定时器,每隔5秒向 device/data-get 发送数据获取请求 - self._data_task = asyncio.create_task(self._periodic_data_request()) - - - def _construct_data_get_request(self) -> dict: - """构造 data_get 格式的请求数据.""" - return { - "cmd": "data_get", - "gw_sn": self._device_sn or "", - "timestamp": str(int(time.time() * 1000)), - "token": str(random.randint(1000, 9999)), - "info": { - "dev_list": [ - { - "dev_sn": f"ems_{self._device_sn}" if self._device_sn else "ems_", - "meter_list": [self._meter_sn], - } - ] - } - } - - def _parse_data_get_response(self, data: dict) -> Any: - """解析 data_get 格式的响应数据. - - 请求格式: meter_list 中只包含 meter_sn (整数) - 响应格式: meter_list 中包含 [meter_sn, meter_value] (列表) - """ - try: - cmd = data.get("cmd") - if cmd != "data_get": - return None - - info = data.get("info", {}) - dev_list = info.get("dev_list", []) - target_meter_sn = str(self._meter_sn) - - for dev in dev_list: - meter_list = dev.get("meter_list", []) - for meter in meter_list: - # 响应格式:meter 是 [meter_sn, meter_value] 格式的列表 - if not isinstance(meter, (list, tuple)) or len(meter) < 2: - continue - - meter_sn = str(meter[0]) - if meter_sn != target_meter_sn: - continue - - try: - meter_value_float = float(meter[1]) - except (ValueError, TypeError): - _LOGGER.debug(f"Invalid meter value for {self._sensor_id}: {meter[1]}") - continue - - # 如果小数部分为 0,则转换为 int,否则保留 float - meter_value = int(meter_value_float) if meter_value_float == int(meter_value_float) else meter_value_float - - # 使用统一的方法处理 meter 值 - return self._process_meter_value(meter_value) - - _LOGGER.debug(f"No matching data found for {self._sensor_id} in data_get payload") - return None - - except Exception as e: - _LOGGER.error(f"Error parsing data_get response: {e}") - return None - - async def _periodic_data_request(self) -> None: - """Periodically send data request to device/data-get topic.""" - # 启动时等待一段时间,确保 MQTT 连接已建立 - _LOGGER.info(f"Starting periodic data request for {self._sensor_id}, waiting for MQTT connection...") - await asyncio.sleep(2) - - while True: - try: - # 如果还没有设备序列号,等待一段时间再重试 - if not self._device_sn: - _LOGGER.debug( - f"Device serial number not available yet for {self._sensor_id}, waiting..." - ) - await asyncio.sleep(REQUEST_INTERVAL) - continue - - # 检查 MQTT 是否可用 - try: - # 构造并发送 data_get 格式的请求 - request_data = self._construct_data_get_request() - await ha_mqtt.async_publish( - self.hass, - self._data_get_topic, - json.dumps(request_data, ensure_ascii=False), - 1, - False - ) - _LOGGER.debug( - f"Sent data_get request for {self._sensor_id} " - f"(meter_sn: {self._meter_sn}) to {self._data_get_topic}" - ) - except Exception as mqtt_error: - # MQTT 连接错误,记录警告但继续运行 - _LOGGER.warning( - f"MQTT not ready for {self._sensor_id}: {mqtt_error}. " - f"Will retry in {REQUEST_INTERVAL} seconds..." - ) - - await asyncio.sleep(REQUEST_INTERVAL) - - except asyncio.CancelledError: - # 任务被取消时正常退出 - _LOGGER.info(f"Periodic data request task cancelled for {self._sensor_id}") - raise - except Exception as e: - _LOGGER.error(f"Unexpected error in periodic data request for {self._sensor_id}: {e}") - await asyncio.sleep(REQUEST_INTERVAL) + # 注册到协调器 + self._coordinator.register_sensor(self._sensor_id, self) + _LOGGER.info(f"JackeryHome sensor {self._sensor_id} registered to coordinator") async def async_will_remove_from_hass(self) -> None: - """Clean up when sensor is removed.""" - if self._data_task and not self._data_task.done(): - self._data_task.cancel() - try: - await self._data_task - except asyncio.CancelledError: - pass - _LOGGER.info(f"JackeryHome sensor {self._sensor_id} removed from Home Assistant") + """传感器从 Home Assistant 移除时,从协调器注销.""" + # 从协调器注销 + self._coordinator.unregister_sensor(self._sensor_id) + _LOGGER.info(f"JackeryHome sensor {self._sensor_id} unregistered from coordinator") await super().async_will_remove_from_hass() @@ -481,9 +503,6 @@ class JackeryHomeSensor(SensorEntity): """Return the state attributes.""" return { "sensor_id": self._sensor_id, - "mqtt_topic": self._topic, - "data_topic": self._data_topic, - "data_get_topic": self._data_get_topic, "meter_sn": self._meter_sn, - "device_sn": self._device_sn, + "device_sn": self._coordinator._device_sn, } \ No newline at end of file