feat: implement data coordinator for JackeryHome integration

- Introduced a new `JackeryDataCoordinator` class to manage MQTT subscriptions and data retrieval for all sensor entities.
- Updated the setup process to create and start the coordinator, allowing shared data handling among sensors.
- Enhanced sensor lifecycle management by registering and unregistering sensors with the coordinator.
- Improved logging for better visibility into the coordinator's operations and sensor updates.
This commit is contained in:
不求圣剑
2025-11-18 16:59:52 +08:00
parent a0a310cf92
commit b68566edd1
2 changed files with 264 additions and 235 deletions

View File

@@ -15,9 +15,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up JackeryHome from a config entry."""
_LOGGER.info("Setting up JackeryHome integration")
# 存储配置数据
# 初始化存储结构
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = entry.data
hass.data[DOMAIN][entry.entry_id] = {
"config": entry.data,
"coordinator": None, # 将在 sensor.py 中设置
}
# 加载传感器平台
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
@@ -29,6 +32,13 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
_LOGGER.info("Unloading JackeryHome integration")
# 停止协调器
entry_data = hass.data[DOMAIN].get(entry.entry_id, {})
coordinator = entry_data.get("coordinator")
if coordinator:
await coordinator.async_stop()
_LOGGER.info("Coordinator stopped")
# 卸载传感器平台
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -4,7 +4,7 @@ import json
import logging
import time
import random
from typing import Any
from typing import Any, Callable
from homeassistant.components import mqtt as ha_mqtt
from homeassistant.components.sensor import (
@@ -139,6 +139,230 @@ SENSORS = {
}
class JackeryDataCoordinator:
"""协调器管理MQTT订阅和数据获取供所有传感器实体共享使用."""
def __init__(self, hass: HomeAssistant, topic_prefix: str) -> None:
"""初始化协调器."""
self.hass = hass
self._topic_prefix = topic_prefix
self._data_topic = "v1/iot_gw/gw/data" # 接收设备响应数据的主题
self._data_get_topic = "v1/iot_gw/cloud/data" # 发送数据请求的主题
self._gw_lwt_topic = "v1/iot_gw/gw_lwt" # LWT 主题
self._device_sn = "26392658575364" # 默认设备序列号
self._sensors = {} # 存储所有传感器实体的引用 {sensor_id: entity}
self._data_task = None # 定时数据请求任务
self._subscribed = False # 标记是否已订阅
def register_sensor(self, sensor_id: str, entity: "JackeryHomeSensor") -> None:
"""注册传感器实体到协调器."""
self._sensors[sensor_id] = entity
_LOGGER.debug(f"Registered sensor {sensor_id} to coordinator")
def unregister_sensor(self, sensor_id: str) -> None:
"""从协调器注销传感器实体."""
if sensor_id in self._sensors:
del self._sensors[sensor_id]
_LOGGER.debug(f"Unregistered sensor {sensor_id} from coordinator")
async def async_start(self) -> None:
"""启动协调器订阅MQTT主题并开始定期请求数据."""
if self._subscribed:
return
# 订阅 LWT topic
@callback
def lwt_message_received(msg):
"""处理 LWT 消息."""
self._handle_lwt_message(msg)
await ha_mqtt.async_subscribe(
self.hass,
self._gw_lwt_topic,
lwt_message_received,
1
)
_LOGGER.info(f"Coordinator subscribed to LWT topic: {self._gw_lwt_topic}")
# 订阅数据响应 topic
@callback
def data_message_received(msg):
"""处理数据响应消息."""
self._handle_data_message(msg)
await ha_mqtt.async_subscribe(
self.hass,
self._data_topic,
data_message_received,
1
)
_LOGGER.info(f"Coordinator subscribed to data topic: {self._data_topic}")
self._subscribed = True
# 启动定时请求任务
self._data_task = asyncio.create_task(self._periodic_data_request())
async def async_stop(self) -> None:
"""停止协调器:取消定时任务."""
if self._data_task and not self._data_task.done():
self._data_task.cancel()
try:
await self._data_task
except asyncio.CancelledError:
pass
_LOGGER.info("Coordinator stopped")
def _handle_lwt_message(self, msg) -> None:
"""处理 LWT 消息,获取设备序列号."""
try:
payload = msg.payload
if isinstance(payload, bytes):
payload = payload.decode("utf-8")
_LOGGER.debug(f"Coordinator received LWT message: {payload}")
data = json.loads(payload)
if isinstance(data, dict) and "gw_sn" in data:
self._device_sn = data["gw_sn"]
_LOGGER.info(f"Device serial number updated: {self._device_sn}")
except Exception as e:
_LOGGER.error(f"Error processing LWT message: {e}")
def _handle_data_message(self, msg) -> None:
"""处理数据响应消息,解析后分发给所有传感器."""
try:
payload = msg.payload
if isinstance(payload, bytes):
payload = payload.decode("utf-8")
_LOGGER.debug(f"Coordinator received data message: {payload}")
try:
data = json.loads(payload)
except json.JSONDecodeError:
_LOGGER.warning(f"Failed to parse data message: {payload}")
return
# 处理 data_get 响应格式
if isinstance(data, dict) and data.get("cmd") == "data_get":
self._parse_and_distribute_data(data)
except Exception as e:
_LOGGER.error(f"Error processing data message: {e}")
def _parse_and_distribute_data(self, data: dict) -> None:
"""解析 data_get 响应并分发给对应的传感器."""
try:
info = data.get("info", {})
dev_list = info.get("dev_list", [])
# 遍历所有设备和meter
for dev in dev_list:
meter_list = dev.get("meter_list", [])
for meter in meter_list:
# 响应格式meter 是 [meter_sn, meter_value]
if not isinstance(meter, (list, tuple)) or len(meter) < 2:
continue
meter_sn = str(meter[0])
try:
meter_value_float = float(meter[1])
# 如果小数部分为 0转换为 int
meter_value = (
int(meter_value_float)
if meter_value_float == int(meter_value_float)
else meter_value_float
)
except (ValueError, TypeError):
_LOGGER.debug(f"Invalid meter value: {meter[1]}")
continue
# 根据 meter_sn 找到对应的传感器并更新
self._update_sensors_by_meter_sn(meter_sn, meter_value)
except Exception as e:
_LOGGER.error(f"Error parsing and distributing data: {e}")
def _update_sensors_by_meter_sn(self, meter_sn: str, meter_value: float) -> None:
"""根据 meter_sn 更新对应的传感器."""
# 遍历所有传感器,找到匹配的 meter_sn
for sensor_id, entity in self._sensors.items():
if str(entity._meter_sn) == meter_sn:
# 处理特殊传感器的值(如正负分离)
processed_value = entity._process_meter_value(meter_value)
entity._update_sensor_value(processed_value)
def _construct_data_get_request(self) -> dict:
"""构造 data_get 请求,包含所有传感器的 meter_sn."""
# 收集所有唯一的 meter_sn
meter_sns = set()
for entity in self._sensors.values():
if entity._meter_sn:
meter_sns.add(entity._meter_sn)
return {
"cmd": "data_get",
"gw_sn": self._device_sn or "",
"timestamp": str(int(time.time() * 1000)),
"token": str(random.randint(1000, 9999)),
"info": {
"dev_list": [
{
"dev_sn": f"ems_{self._device_sn}" if self._device_sn else "ems_",
"meter_list": list(meter_sns), # 一次性请求所有 meter_sn
}
]
}
}
async def _periodic_data_request(self) -> None:
"""定期发送数据请求(所有传感器共用一个请求)."""
_LOGGER.info("Coordinator starting periodic data request...")
await asyncio.sleep(2) # 等待 MQTT 连接建立
while True:
try:
if not self._device_sn:
_LOGGER.debug("Device serial number not available, waiting...")
await asyncio.sleep(REQUEST_INTERVAL)
continue
if not self._sensors:
_LOGGER.debug("No sensors registered yet, waiting...")
await asyncio.sleep(REQUEST_INTERVAL)
continue
try:
# 构造并发送包含所有 meter_sn 的请求
request_data = self._construct_data_get_request()
await ha_mqtt.async_publish(
self.hass,
self._data_get_topic,
json.dumps(request_data, ensure_ascii=False),
1,
False
)
_LOGGER.debug(
f"Coordinator sent data_get request for {len(self._sensors)} sensors "
f"to {self._data_get_topic}"
)
except Exception as mqtt_error:
_LOGGER.warning(
f"MQTT not ready: {mqtt_error}. Will retry in {REQUEST_INTERVAL} seconds..."
)
await asyncio.sleep(REQUEST_INTERVAL)
except asyncio.CancelledError:
_LOGGER.info("Coordinator periodic data request task cancelled")
raise
except Exception as e:
_LOGGER.error(f"Unexpected error in coordinator periodic data request: {e}")
await asyncio.sleep(REQUEST_INTERVAL)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
@@ -153,6 +377,12 @@ async def async_setup_entry(
_LOGGER.info(f"Topic prefix: {topic_prefix}")
# 创建协调器(全局唯一,所有传感器共享)
coordinator = JackeryDataCoordinator(hass, topic_prefix)
# 将协调器存储到 hass.data 中,供其他地方使用
hass.data[DOMAIN][config_entry.entry_id]["coordinator"] = coordinator
# 创建所有传感器实体
entities = []
for sensor_id, sensor_config in SENSORS.items():
@@ -165,11 +395,17 @@ async def async_setup_entry(
state_class=sensor_config["state_class"],
topic_prefix=topic_prefix,
config_entry_id=config_entry.entry_id,
coordinator=coordinator, # 传入协调器
)
entities.append(entity)
# 添加实体
async_add_entities(entities)
_LOGGER.info(f"Added {len(entities)} JackeryHome sensors")
# 启动协调器(在所有传感器添加后)
await coordinator.async_start()
_LOGGER.info(f"Added {len(entities)} JackeryHome sensors with shared coordinator")
class JackeryHomeSensor(SensorEntity):
@@ -185,6 +421,7 @@ class JackeryHomeSensor(SensorEntity):
state_class: SensorStateClass,
topic_prefix: str,
config_entry_id: str,
coordinator: JackeryDataCoordinator,
) -> None:
"""Initialize the sensor."""
self._sensor_id = sensor_id
@@ -201,17 +438,12 @@ class JackeryHomeSensor(SensorEntity):
"model": "Energy Monitor",
"sw_version": "1.0.5",
}
self._topic = f"{topic_prefix}/{sensor_id}/state"
self._data_topic = "v1/iot_gw/gw/data" # 接收设备响应数据的主题
self._data_get_topic = "v1/iot_gw/cloud/data" # 发送数据请求的基础主题(需要加上 device_sn
self._gw_lwt_topic = "v1/iot_gw/gw_lwt"
self._attr_native_value = None
self._attr_available = False
self._data_task = None
###TODO: 从 LWT 消息中获取设备序列号
self._device_sn = "26392658575364" # 设备序列号(从 LWT 消息中获取)
self._attr_should_poll = False
self._attr_has_entity_name = True
self._attr_has_entity_name = True
self._coordinator = coordinator # 协调器引用
# 获取 meter_sn对于功率传感器使用对应的 _power 键
meter_sn_key_map = {
"grid_import": "grid_import_power",
@@ -230,26 +462,6 @@ class JackeryHomeSensor(SensorEntity):
"""No polling needed."""
return False
def _handle_lwt_message(self, msg) -> None:
"""Handle LWT messages to get device serial number."""
try:
payload = msg.payload
if isinstance(payload, bytes):
payload = payload.decode("utf-8")
_LOGGER.debug(f"Received LWT message: {payload}")
try:
data = json.loads(payload)
if isinstance(data, dict) and "gw_sn" in data:
self._device_sn = data["gw_sn"]
_LOGGER.info(f"Device serial number updated: {self._device_sn}")
except json.JSONDecodeError:
_LOGGER.warning(f"Failed to parse LWT message: {payload}")
except Exception as e:
_LOGGER.error(f"Error processing LWT message: {e}")
def _process_meter_value(self, meter_value: float) -> float:
"""根据传感器类型处理 meter 值."""
if self._sensor_id == "grid_import":
@@ -263,216 +475,26 @@ class JackeryHomeSensor(SensorEntity):
else:
return meter_value
def _handle_data_message(self, msg) -> None:
"""Handle new MQTT messages from device/data topic."""
try:
payload = msg.payload
if isinstance(payload, bytes):
payload = payload.decode("utf-8")
_LOGGER.debug(f"Received data message for {self._sensor_id}: {payload}")
data = None
try:
data = json.loads(payload)
except json.JSONDecodeError:
data = None
# 处理 data_get 响应
if isinstance(data, dict) and data.get("cmd") == "data_get":
value = self._parse_data_get_response(data)
if value is not None:
self._update_sensor_value(value)
return
# 兼容旧格式的数据
if isinstance(data, dict):
if self._sensor_id in data:
value = data[self._sensor_id]
elif "value" in data:
value = data["value"]
else:
value = data
else:
try:
value = float(payload)
except ValueError:
self._attr_available = False
self.async_write_ha_state()
return
if value is not None:
self._update_sensor_value(value)
except Exception as e:
_LOGGER.error(f"Error processing data message for {self._sensor_id}: {e}")
def _update_sensor_value(self, value: Any) -> None:
"""更新传感器值并通知 Home Assistant."""
"""更新传感器值并通知 Home Assistant(由协调器调用)."""
self._attr_native_value = value
self._attr_available = True
self.async_write_ha_state()
_LOGGER.debug(f"Updated {self._sensor_id} with value: {value}")
async def async_added_to_hass(self) -> None:
"""Set up the sensor."""
"""传感器添加到 Home Assistant 时,注册到协调器."""
await super().async_added_to_hass()
_LOGGER.info(f"JackeryHome sensor {self._sensor_id} added to Home Assistant")
# 创建 LWT 消息处理回调
@callback
def lwt_message_received(msg):
"""Callback wrapper for LWT messages."""
self._handle_lwt_message(msg)
# 创建数据消息处理回调
@callback
def data_message_received(msg):
"""Callback wrapper for data messages."""
self._handle_data_message(msg)
# 订阅 LWT topic 以获取设备序列号
await ha_mqtt.async_subscribe(
self.hass,
self._gw_lwt_topic,
lwt_message_received,
1
)
_LOGGER.info(f"Subscribed to MQTT topic: {self._gw_lwt_topic}")
# 订阅 device/data topic
await ha_mqtt.async_subscribe(
self.hass,
self._data_topic,
data_message_received,
1
)
_LOGGER.info(f"Subscribed to MQTT topic: {self._data_topic}")
# 启动定时器每隔5秒向 device/data-get 发送数据获取请求
self._data_task = asyncio.create_task(self._periodic_data_request())
def _construct_data_get_request(self) -> dict:
"""构造 data_get 格式的请求数据."""
return {
"cmd": "data_get",
"gw_sn": self._device_sn or "",
"timestamp": str(int(time.time() * 1000)),
"token": str(random.randint(1000, 9999)),
"info": {
"dev_list": [
{
"dev_sn": f"ems_{self._device_sn}" if self._device_sn else "ems_",
"meter_list": [self._meter_sn],
}
]
}
}
def _parse_data_get_response(self, data: dict) -> Any:
"""解析 data_get 格式的响应数据.
请求格式: meter_list 中只包含 meter_sn (整数)
响应格式: meter_list 中包含 [meter_sn, meter_value] (列表)
"""
try:
cmd = data.get("cmd")
if cmd != "data_get":
return None
info = data.get("info", {})
dev_list = info.get("dev_list", [])
target_meter_sn = str(self._meter_sn)
for dev in dev_list:
meter_list = dev.get("meter_list", [])
for meter in meter_list:
# 响应格式meter 是 [meter_sn, meter_value] 格式的列表
if not isinstance(meter, (list, tuple)) or len(meter) < 2:
continue
meter_sn = str(meter[0])
if meter_sn != target_meter_sn:
continue
try:
meter_value_float = float(meter[1])
except (ValueError, TypeError):
_LOGGER.debug(f"Invalid meter value for {self._sensor_id}: {meter[1]}")
continue
# 如果小数部分为 0则转换为 int否则保留 float
meter_value = int(meter_value_float) if meter_value_float == int(meter_value_float) else meter_value_float
# 使用统一的方法处理 meter 值
return self._process_meter_value(meter_value)
_LOGGER.debug(f"No matching data found for {self._sensor_id} in data_get payload")
return None
except Exception as e:
_LOGGER.error(f"Error parsing data_get response: {e}")
return None
async def _periodic_data_request(self) -> None:
"""Periodically send data request to device/data-get topic."""
# 启动时等待一段时间,确保 MQTT 连接已建立
_LOGGER.info(f"Starting periodic data request for {self._sensor_id}, waiting for MQTT connection...")
await asyncio.sleep(2)
while True:
try:
# 如果还没有设备序列号,等待一段时间再重试
if not self._device_sn:
_LOGGER.debug(
f"Device serial number not available yet for {self._sensor_id}, waiting..."
)
await asyncio.sleep(REQUEST_INTERVAL)
continue
# 检查 MQTT 是否可用
try:
# 构造并发送 data_get 格式的请求
request_data = self._construct_data_get_request()
await ha_mqtt.async_publish(
self.hass,
self._data_get_topic,
json.dumps(request_data, ensure_ascii=False),
1,
False
)
_LOGGER.debug(
f"Sent data_get request for {self._sensor_id} "
f"(meter_sn: {self._meter_sn}) to {self._data_get_topic}"
)
except Exception as mqtt_error:
# MQTT 连接错误,记录警告但继续运行
_LOGGER.warning(
f"MQTT not ready for {self._sensor_id}: {mqtt_error}. "
f"Will retry in {REQUEST_INTERVAL} seconds..."
)
await asyncio.sleep(REQUEST_INTERVAL)
except asyncio.CancelledError:
# 任务被取消时正常退出
_LOGGER.info(f"Periodic data request task cancelled for {self._sensor_id}")
raise
except Exception as e:
_LOGGER.error(f"Unexpected error in periodic data request for {self._sensor_id}: {e}")
await asyncio.sleep(REQUEST_INTERVAL)
# 注册到协调器
self._coordinator.register_sensor(self._sensor_id, self)
_LOGGER.info(f"JackeryHome sensor {self._sensor_id} registered to coordinator")
async def async_will_remove_from_hass(self) -> None:
"""Clean up when sensor is removed."""
if self._data_task and not self._data_task.done():
self._data_task.cancel()
try:
await self._data_task
except asyncio.CancelledError:
pass
_LOGGER.info(f"JackeryHome sensor {self._sensor_id} removed from Home Assistant")
"""传感器从 Home Assistant 移除时,从协调器注销."""
# 从协调器注销
self._coordinator.unregister_sensor(self._sensor_id)
_LOGGER.info(f"JackeryHome sensor {self._sensor_id} unregistered from coordinator")
await super().async_will_remove_from_hass()
@@ -481,9 +503,6 @@ class JackeryHomeSensor(SensorEntity):
"""Return the state attributes."""
return {
"sensor_id": self._sensor_id,
"mqtt_topic": self._topic,
"data_topic": self._data_topic,
"data_get_topic": self._data_get_topic,
"meter_sn": self._meter_sn,
"device_sn": self._device_sn,
"device_sn": self._coordinator._device_sn,
}