feat: 更新协议

This commit is contained in:
不求圣剑
2026-01-21 10:31:12 +08:00
parent 12ed7103f5
commit a66ad4dc88
14 changed files with 452 additions and 611 deletions

View File

@@ -1,15 +0,0 @@
{
"domain": "jackery_home",
"name": "JackeryHome",
"codeowners": [
"@suyulin"
],
"config_flow": true,
"dependencies": [
"mqtt"
],
"documentation": "https://github.com/suyulin/jackery_home",
"issue_tracker": "https://github.com/suyulin/jackery_home/issues",
"iot_class": "local_push",
"version": "1.1.35"
}

View File

@@ -1,535 +0,0 @@
"""JackeryHome Sensor Platform."""
import asyncio
import json
import logging
import time
import random
from typing import Any, Callable
from homeassistant.components import mqtt as ha_mqtt
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.const import UnitOfPower, UnitOfEnergy, PERCENTAGE
from . import DOMAIN
_LOGGER = logging.getLogger(__name__)
# 常量定义
REQUEST_INTERVAL = 5 # 数据请求间隔(秒)
# Meter SN 映射传感器ID到meter_sn的映射
METER_SN_MAP = {
"battery_soc": "21548033",
"solar_energy": "16961537",
"eps_export_energy": "16998401",
"eps_import_energy": "16963585",
"eps_power": "16933889",
"grid_import_energy": "16962561",
"grid_export_energy": "16968705",
"battery_charge_energy": "16964609",
"battery_discharge_energy": "16965633",
"solar_power": "16932865",
"home_power": "16936961",
"grid_import_power": "16930817",
"grid_export_power": "16930817",
"battery_charge_power": "16931841",
"battery_discharge_power": "16931841",
}
# 传感器配置
SENSORS = {
"eps_power": {
"name": "EPS Power",
"unit": UnitOfPower.WATT,
"icon": "mdi:home-lightning-bolt",
"device_class": SensorDeviceClass.POWER,
"state_class": SensorStateClass.MEASUREMENT,
},
"eps_export_energy": {
"name": "EPS Export Energy",
"unit": UnitOfEnergy.KILO_WATT_HOUR,
"icon": "mdi:home-lightning-bolt",
"device_class": SensorDeviceClass.ENERGY,
"state_class": SensorStateClass.TOTAL_INCREASING,
},
"eps_import_energy": {
"name": "EPS Import Energy",
"unit": UnitOfEnergy.KILO_WATT_HOUR,
"icon": "mdi:home-lightning-bolt",
"device_class": SensorDeviceClass.ENERGY,
"state_class": SensorStateClass.TOTAL_INCREASING,
},
# 功率传感器(实时监测)
"solar_power": {
"name": "Solar Power",
"unit": UnitOfPower.WATT,
"icon": "mdi:solar-power",
"device_class": SensorDeviceClass.POWER,
"state_class": SensorStateClass.MEASUREMENT,
},
"home_power": {
"name": "Home Power",
"unit": UnitOfPower.WATT,
"icon": "mdi:home-lightning-bolt",
"device_class": SensorDeviceClass.POWER,
"state_class": SensorStateClass.MEASUREMENT,
},
"grid_import_power": {
"name": "Grid Import",
"unit": UnitOfPower.WATT,
"icon": "mdi:transmission-tower-import",
"device_class": SensorDeviceClass.POWER,
"state_class": SensorStateClass.MEASUREMENT,
},
"grid_export_power": {
"name": "Grid Export",
"unit": UnitOfPower.WATT,
"icon": "mdi:transmission-tower-export",
"device_class": SensorDeviceClass.POWER,
"state_class": SensorStateClass.MEASUREMENT,
},
"battery_charge_power": {
"name": "Battery Charge",
"unit": UnitOfPower.WATT,
"icon": "mdi:battery-charging",
"device_class": SensorDeviceClass.POWER,
"state_class": SensorStateClass.MEASUREMENT,
},
"battery_discharge_power": {
"name": "Battery Discharge",
"unit": UnitOfPower.WATT,
"icon": "mdi:battery-minus",
"device_class": SensorDeviceClass.POWER,
"state_class": SensorStateClass.MEASUREMENT,
},
"battery_soc": {
"name": "Battery State of Charge",
"unit": PERCENTAGE,
"icon": "mdi:battery-70",
"device_class": SensorDeviceClass.BATTERY,
"state_class": SensorStateClass.MEASUREMENT,
},
# 能源传感器(用于能源仪表板)
"solar_energy": {
"name": "Solar Energy",
"unit": UnitOfEnergy.KILO_WATT_HOUR,
"icon": "mdi:solar-power",
"device_class": SensorDeviceClass.ENERGY,
"state_class": SensorStateClass.TOTAL_INCREASING,
},
"home_energy": {
"name": "Home Energy",
"unit": UnitOfEnergy.KILO_WATT_HOUR,
"icon": "mdi:home-lightning-bolt",
"device_class": SensorDeviceClass.ENERGY,
"state_class": SensorStateClass.TOTAL_INCREASING,
},
"grid_import_energy": {
"name": "Grid Import Energy",
"unit": UnitOfEnergy.KILO_WATT_HOUR,
"icon": "mdi:transmission-tower-import",
"device_class": SensorDeviceClass.ENERGY,
"state_class": SensorStateClass.TOTAL_INCREASING,
},
"grid_export_energy": {
"name": "Grid Export Energy",
"unit": UnitOfEnergy.KILO_WATT_HOUR,
"icon": "mdi:transmission-tower-export",
"device_class": SensorDeviceClass.ENERGY,
"state_class": SensorStateClass.TOTAL_INCREASING,
},
"battery_charge_energy": {
"name": "Battery Charge Energy",
"unit": UnitOfEnergy.KILO_WATT_HOUR,
"icon": "mdi:battery-charging",
"device_class": SensorDeviceClass.ENERGY,
"state_class": SensorStateClass.TOTAL_INCREASING,
},
"battery_discharge_energy": {
"name": "Battery Discharge Energy",
"unit": UnitOfEnergy.KILO_WATT_HOUR,
"icon": "mdi:battery-minus",
"device_class": SensorDeviceClass.ENERGY,
"state_class": SensorStateClass.TOTAL_INCREASING,
},
}
class JackeryDataCoordinator:
"""协调器管理MQTT订阅和数据获取供所有传感器实体共享使用."""
def __init__(self, hass: HomeAssistant, topic_prefix: str) -> None:
"""初始化协调器."""
self.hass = hass
self._topic_prefix = topic_prefix
self._data_topic = "v1/iot_gw/gw/data" # 接收设备响应数据的主题
self._data_get_topic = "v1/iot_gw/cloud/data" # 发送数据请求的主题
self._gw_lwt_topic = "v1/iot_gw/gw_lwt" # LWT 主题
self._device_sn = "" # 默认设备序列号
self._sensors = {} # 存储所有传感器实体的引用 {sensor_id: entity}
self._data_task = None # 定时数据请求任务
self._subscribed = False # 标记是否已订阅
def register_sensor(self, sensor_id: str, entity: "JackeryHomeSensor") -> None:
"""注册传感器实体到协调器."""
self._sensors[sensor_id] = entity
_LOGGER.debug(f"Registered sensor {sensor_id} to coordinator")
def unregister_sensor(self, sensor_id: str) -> None:
"""从协调器注销传感器实体."""
if sensor_id in self._sensors:
del self._sensors[sensor_id]
_LOGGER.debug(f"Unregistered sensor {sensor_id} from coordinator")
async def async_start(self) -> None:
"""启动协调器订阅MQTT主题并开始定期请求数据."""
if self._subscribed:
return
try:
# 订阅 LWT topic
@callback
def lwt_message_received(msg):
"""处理 LWT 消息."""
self._handle_lwt_message(msg)
await ha_mqtt.async_subscribe(
self.hass,
self._gw_lwt_topic,
lwt_message_received,
1
)
_LOGGER.info(f"Coordinator subscribed to LWT topic: {self._gw_lwt_topic}")
# 订阅数据响应 topic
@callback
def data_message_received(msg):
"""处理数据响应消息."""
self._handle_data_message(msg)
await ha_mqtt.async_subscribe(
self.hass,
self._data_topic,
data_message_received,
1
)
_LOGGER.info(f"Coordinator subscribed to data topic: {self._data_topic}")
self._subscribed = True
# 启动定时请求任务
self._data_task = asyncio.create_task(self._periodic_data_request())
except Exception as e:
_LOGGER.error(
f"Failed to start coordinator. MQTT may not be connected: {e}. "
"Please check your MQTT integration settings."
)
async def async_stop(self) -> None:
"""停止协调器:取消定时任务."""
if self._data_task and not self._data_task.done():
self._data_task.cancel()
try:
await self._data_task
except asyncio.CancelledError:
pass
_LOGGER.info("Coordinator stopped")
def _handle_lwt_message(self, msg) -> None:
"""处理 LWT 消息,获取设备序列号."""
try:
payload = msg.payload
if isinstance(payload, bytes):
payload = payload.decode("utf-8")
_LOGGER.debug(f"Coordinator received LWT message: {payload}")
data = json.loads(payload)
if isinstance(data, dict) and "gw_sn" in data:
self._device_sn = data["gw_sn"]
_LOGGER.info(f"Device serial number updated: {self._device_sn}")
except Exception as e:
_LOGGER.error(f"Error processing LWT message: {e}")
def _handle_data_message(self, msg) -> None:
"""处理数据响应消息,解析后分发给所有传感器."""
try:
payload = msg.payload
if isinstance(payload, bytes):
payload = payload.decode("utf-8")
_LOGGER.debug(f"Coordinator received data message: {payload}")
try:
data = json.loads(payload)
except json.JSONDecodeError:
_LOGGER.warning(f"Failed to parse data message: {payload}")
return
# 处理 data_get 响应格式
if isinstance(data, dict) and data.get("cmd") == "data_get":
self._parse_and_distribute_data(data)
except Exception as e:
_LOGGER.error(f"Error processing data message: {e}")
def _parse_and_distribute_data(self, data: dict) -> None:
"""解析 data_get 响应并分发给对应的传感器."""
try:
info = data.get("info", {})
dev_list = info.get("dev_list", [])
# 遍历所有设备和meter
for dev in dev_list:
meter_list = dev.get("meter_list", [])
for meter in meter_list:
# 响应格式meter 是 [meter_sn, meter_value]
if not isinstance(meter, (list, tuple)) or len(meter) < 2:
continue
meter_sn = str(meter[0])
try:
meter_value_float = float(meter[1])
# 如果小数部分为 0转换为 int
meter_value = (
int(meter_value_float)
if meter_value_float == int(meter_value_float)
else meter_value_float
)
except (ValueError, TypeError):
_LOGGER.debug(f"Invalid meter value: {meter[1]}")
continue
# 根据 meter_sn 找到对应的传感器并更新
self._update_sensors_by_meter_sn(meter_sn, meter_value)
except Exception as e:
_LOGGER.error(f"Error parsing and distributing data: {e}")
def _update_sensors_by_meter_sn(self, meter_sn: str, meter_value: float) -> None:
"""根据 meter_sn 更新对应的传感器."""
# 遍历所有传感器,找到匹配的 meter_sn
for sensor_id, entity in self._sensors.items():
if str(entity._meter_sn) == meter_sn:
processed_value = entity._process_meter_value(meter_value)
entity._update_sensor_value(processed_value)
def _construct_data_get_request(self) -> dict:
"""构造 data_get 请求,包含所有传感器的 meter_sn."""
# 收集所有唯一的 meter_sn
meter_sns = set()
for entity in self._sensors.values():
if entity._meter_sn:
meter_sns.add(entity._meter_sn)
return {
"cmd": "data_get",
"gw_sn": self._device_sn or "",
"timestamp": str(int(time.time() * 1000)),
"token": str(random.randint(1000, 9999)),
"info": {
"dev_list": [
{
"dev_sn": f"ems_{self._device_sn}" if self._device_sn else "ems_",
"meter_list": list(meter_sns), # 一次性请求所有 meter_sn
}
]
}
}
async def _periodic_data_request(self) -> None:
"""定期发送数据请求(所有传感器共用一个请求)."""
_LOGGER.info("Coordinator starting periodic data request...")
await asyncio.sleep(2) # 等待 MQTT 连接建立
while True:
try:
if not self._device_sn:
_LOGGER.debug("Device serial number not available, waiting...")
await asyncio.sleep(REQUEST_INTERVAL)
continue
if not self._sensors:
_LOGGER.debug("No sensors registered yet, waiting...")
await asyncio.sleep(REQUEST_INTERVAL)
continue
try:
# 构造并发送包含所有 meter_sn 的请求
request_data = self._construct_data_get_request()
await ha_mqtt.async_publish(
self.hass,
self._data_get_topic,
json.dumps(request_data, ensure_ascii=False),
1,
False
)
_LOGGER.debug(
f"Coordinator sent data_get request for {len(self._sensors)} sensors "
f"to {self._data_get_topic}"
)
except Exception as mqtt_error:
_LOGGER.warning(
f"MQTT publish failed: {mqtt_error}. "
f"Please check MQTT broker connection. "
f"Will retry in {REQUEST_INTERVAL} seconds..."
)
await asyncio.sleep(REQUEST_INTERVAL)
except asyncio.CancelledError:
_LOGGER.info("Coordinator periodic data request task cancelled")
raise
except Exception as e:
_LOGGER.error(f"Unexpected error in coordinator periodic data request: {e}")
await asyncio.sleep(REQUEST_INTERVAL)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up JackeryHome sensors from a config entry."""
_LOGGER.info("Setting up JackeryHome sensors")
# 获取配置数据
config = config_entry.data
topic_prefix = config.get("topic_prefix", "homeassistant/sensor")
_LOGGER.info(f"Topic prefix: {topic_prefix}")
# 创建协调器(全局唯一,所有传感器共享)
coordinator = JackeryDataCoordinator(hass, topic_prefix)
# 将协调器存储到 hass.data 中,供其他地方使用
hass.data[DOMAIN][config_entry.entry_id]["coordinator"] = coordinator
# 创建所有传感器实体
entities = []
for sensor_id, sensor_config in SENSORS.items():
entity = JackeryHomeSensor(
sensor_id=sensor_id,
name=sensor_config["name"],
unit=sensor_config["unit"],
icon=sensor_config["icon"],
device_class=sensor_config["device_class"],
state_class=sensor_config["state_class"],
topic_prefix=topic_prefix,
config_entry_id=config_entry.entry_id,
coordinator=coordinator, # 传入协调器
)
entities.append(entity)
# 添加实体
async_add_entities(entities)
# 启动协调器(在所有传感器添加后)
await coordinator.async_start()
_LOGGER.info(f"Added {len(entities)} JackeryHome sensors with shared coordinator")
class JackeryHomeSensor(SensorEntity):
"""Representation of a JackeryHome Sensor."""
def __init__(
self,
sensor_id: str,
name: str,
unit: str,
icon: str,
device_class: SensorDeviceClass,
state_class: SensorStateClass,
topic_prefix: str,
config_entry_id: str,
coordinator: JackeryDataCoordinator,
) -> None:
"""Initialize the sensor."""
self._sensor_id = sensor_id
self._attr_name = name
self._attr_native_unit_of_measurement = unit
self._attr_icon = icon
self._attr_device_class = device_class
self._attr_state_class = state_class
self._attr_unique_id = f"jackery_home_{sensor_id}"
self._attr_device_info = {
"identifiers": {(DOMAIN, config_entry_id)},
"name": "JackeryHome",
"manufacturer": "Jackery",
"model": "Energy Monitor",
"sw_version": "1.0.5",
}
self._attr_native_value = None
self._attr_available = False
self._attr_should_poll = False
self._attr_has_entity_name = False
self._coordinator = coordinator # 协调器引用
# 获取 meter_sn直接根据传感器 ID包括 *_power 后缀)映射
self._meter_sn = METER_SN_MAP.get(sensor_id, 0)
@property
def should_poll(self) -> bool:
"""No polling needed."""
return False
def _process_meter_value(self, meter_value: float) -> float:
## 电池充放电功率 负值为充电,正值为放电
## 电网功率 负值为购买,正值为出售
if self._sensor_id == "grid_import_power":
return abs(meter_value) if meter_value < 0 else 0
elif self._sensor_id == "grid_export_power":
return meter_value if meter_value > 0 else 0
# # 电池功率:同一个 meter_sn同步更新 battery_charge_power / battery_discharge_power
elif self._sensor_id == "battery_charge_power":
return abs(meter_value) if meter_value < 0 else 0
elif self._sensor_id == "battery_discharge_power":
return (meter_value) if meter_value > 0 else 0
if self._sensor_id == "battery_soc":
# Battery SOC 需要乘以 0.1 转换为百分比
meter_value = meter_value * 0.1
return meter_value
def _update_sensor_value(self, value: Any) -> None:
"""更新传感器值并通知 Home Assistant由协调器调用."""
self._attr_native_value = value
self._attr_available = True
self.async_write_ha_state()
_LOGGER.debug(f"Updated {self._sensor_id} with value: {value}")
async def async_added_to_hass(self) -> None:
"""传感器添加到 Home Assistant 时,注册到协调器."""
await super().async_added_to_hass()
# 注册到协调器
self._coordinator.register_sensor(self._sensor_id, self)
_LOGGER.info(f"JackeryHome sensor {self._sensor_id} registered to coordinator")
async def async_will_remove_from_hass(self) -> None:
"""传感器从 Home Assistant 移除时,从协调器注销."""
# 从协调器注销
self._coordinator.unregister_sensor(self._sensor_id)
_LOGGER.info(f"JackeryHome sensor {self._sensor_id} unregistered from coordinator")
await super().async_will_remove_from_hass()
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes."""
return {
"sensor_id": self._sensor_id,
"meter_sn": self._meter_sn,
"device_sn": self._coordinator._device_sn,
}

View File

@@ -1,4 +1,4 @@
# JackeryHome - Home Assistant 自定义集成
# Jackery - Home Assistant 自定义集成
这是一个 Home Assistant 自定义集成,用于通过 MQTT 接收能源监控数据并创建传感器实体。
@@ -29,7 +29,7 @@
⚠️ **重要:本集成依赖 Home Assistant 的 MQTT 集成**
在安装 JackeryHome 之前,您必须先配置 MQTT 集成:
在安装 Jackery 之前,您必须先配置 MQTT 集成:
1. 进入 Home Assistant 的 **设置** → **设备与服务**
2. 点击 **添加集成**,搜索 **MQTT**
@@ -46,17 +46,17 @@
2. 进入 HACS → 集成
3. 点击右上角菜单 → 自定义仓库
4. 添加此仓库 URL 并选择类别为"集成"
5. 搜索 "JackeryHome" 并安装
5. 搜索 "Jackery" 并安装
6. 重启 Home Assistant
### 方式 B手动安装
`custom_components/JackeryHome` 文件夹复制到 Home Assistant 的 `config/custom_components/` 目录下:
`custom_components/Jackery` 文件夹复制到 Home Assistant 的 `config/custom_components/` 目录下:
```
config/
custom_components/
JackeryHome/
Jackery/
__init__.py
manifest.json
sensor.py
@@ -71,7 +71,7 @@ config/
1. 进入 Home Assistant 的 **设置** → **设备与服务**
2. 点击右下角的 **添加集成** 按钮
3. 搜索 "JackeryHome"
3. 搜索 "Jackery"
4. 输入 MQTT 主题前缀(可选,默认:`homeassistant/sensor`
5. 点击提交完成配置
@@ -172,7 +172,7 @@ config/
python main.py
```
3. 模拟器会自动发布传感器数据到 MQTT
4. Home Assistant 的 JackeryHome 集成会自动接收并显示数据
4. Home Assistant 的 Jackery 集成会自动接收并显示数据
## 查看传感器
@@ -239,7 +239,7 @@ entities:
logger:
default: info
logs:
custom_components.jackery_home: debug
custom_components.jackery: debug
homeassistant.components.mqtt: debug
```

View File

@@ -8,13 +8,13 @@ from homeassistant.components import mqtt
_LOGGER = logging.getLogger(__name__)
DOMAIN = "jackery_home"
DOMAIN = "jackery"
PLATFORMS = [Platform.SENSOR]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up JackeryHome from a config entry."""
_LOGGER.info("Setting up JackeryHome integration")
"""Set up Jackery from a config entry."""
_LOGGER.info("Setting up Jackery integration")
# 检查 MQTT 集成是否已配置和可用
if not await mqtt.async_wait_for_mqtt_client(hass):
@@ -42,7 +42,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
_LOGGER.info("Unloading JackeryHome integration")
_LOGGER.info("Unloading Jackery integration")
# 停止协调器
entry_data = hass.data[DOMAIN].get(entry.entry_id, {})

View File

@@ -18,14 +18,14 @@ DATA_SCHEMA = vol.Schema(
{
vol.Optional(
"topic_prefix",
default="homeassistant/sensor"
default="hb"
): str,
}
)
class JackeryHomeConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for JackeryHome."""
class JackeryConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Jackery."""
VERSION = 1
@@ -44,12 +44,12 @@ class JackeryHomeConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
errors["base"] = "mqtt_not_configured"
else:
_LOGGER.info(
f"Creating JackeryHome config entry with topic_prefix: "
f"{user_input.get('topic_prefix', 'homeassistant/sensor')}"
f"Creating Jackery config entry with topic_prefix: "
f"{user_input.get('topic_prefix', 'hb')}"
)
return self.async_create_entry(
title="JackeryHome",
title="Jackery",
data=user_input,
)
@@ -58,7 +58,7 @@ class JackeryHomeConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
data_schema=DATA_SCHEMA,
errors=errors,
description_placeholders={
"topic_prefix": "MQTT topic prefix (e.g., homeassistant/sensor)",
"topic_prefix": "Protocol root topic (default: hb)",
},
)

View File

@@ -0,0 +1,15 @@
{
"domain": "jackery",
"name": "Jackery",
"codeowners": [
"@suyulin"
],
"config_flow": true,
"dependencies": [
"mqtt"
],
"documentation": "https://github.com/suyulin/jackery",
"issue_tracker": "https://github.com/suyulin/jackery/issues",
"iot_class": "local_push",
"version": "1.1.35"
}

View File

@@ -0,0 +1,376 @@
"""Jackery Sensor Platform."""
import asyncio
import json
import logging
import time
import random
import re
from typing import Any, Callable
from homeassistant.components import mqtt as ha_mqtt
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.const import UnitOfPower, UnitOfEnergy, PERCENTAGE, UnitOfTemperature
from . import DOMAIN
_LOGGER = logging.getLogger(__name__)
# 常量定义
REQUEST_INTERVAL = 10 # 数据请求间隔(秒)
# 传感器配置
SENSORS = {
# 电池相关
"battery_soc": {
"json_key": "batSoc",
"name": "Battery SOC",
"unit": PERCENTAGE,
"icon": "mdi:battery-50",
"device_class": SensorDeviceClass.BATTERY,
"state_class": SensorStateClass.MEASUREMENT,
},
"battery_charge_power": {
"json_key": "batInPw",
"name": "Battery Charge Power",
"unit": UnitOfPower.WATT,
"icon": "mdi:battery-charging",
"device_class": SensorDeviceClass.POWER,
"state_class": SensorStateClass.MEASUREMENT,
},
"battery_discharge_power": {
"json_key": "batOutPw",
"name": "Battery Discharge Power",
"unit": UnitOfPower.WATT,
"icon": "mdi:battery-minus",
"device_class": SensorDeviceClass.POWER,
"state_class": SensorStateClass.MEASUREMENT,
},
"battery_temperature": {
"json_key": "cellTemp",
"name": "Battery Temperature",
"unit": UnitOfTemperature.CELSIUS,
"icon": "mdi:thermometer",
"device_class": SensorDeviceClass.TEMPERATURE,
"state_class": SensorStateClass.MEASUREMENT,
},
"battery_count": {
"json_key": "batNum",
"name": "Battery Count",
"unit": None,
"icon": "mdi:battery-multiple",
"device_class": None,
"state_class": SensorStateClass.MEASUREMENT,
},
# 太阳能
"solar_power": {
"json_key": "pvPw",
"name": "Solar Power",
"unit": UnitOfPower.WATT,
"icon": "mdi:solar-power",
"device_class": SensorDeviceClass.POWER,
"state_class": SensorStateClass.MEASUREMENT,
},
# 电网相关
"grid_import_power": { # Grid -> System (outOngridPw)
"json_key": "outOngridPw",
"name": "Grid Import Power",
"unit": UnitOfPower.WATT,
"icon": "mdi:transmission-tower-import",
"device_class": SensorDeviceClass.POWER,
"state_class": SensorStateClass.MEASUREMENT,
},
"grid_export_power": { # System -> Grid/Home (inOngirdPw)
"json_key": "inOngirdPw",
"name": "Grid Export Power",
"unit": UnitOfPower.WATT,
"icon": "mdi:transmission-tower-export",
"device_class": SensorDeviceClass.POWER,
"state_class": SensorStateClass.MEASUREMENT,
},
# EPS (离网输出)
"eps_power": {
"json_key": "swEpsOutPw",
"name": "EPS Output Power",
"unit": UnitOfPower.WATT,
"icon": "mdi:power-plug",
"device_class": SensorDeviceClass.POWER,
"state_class": SensorStateClass.MEASUREMENT,
},
"eps_state": {
"json_key": "swEpsState",
"name": "EPS State",
"unit": None,
"icon": "mdi:power-settings",
"device_class": None,
"state_class": None,
},
# Limits & Settings
"soc_charge_limit": {
"json_key": "socChgLimit",
"name": "SOC Charge Limit",
"unit": PERCENTAGE,
"icon": "mdi:battery-arrow-up",
"device_class": None,
"state_class": SensorStateClass.MEASUREMENT,
},
"soc_discharge_limit": {
"json_key": "socDischgLimit",
"name": "SOC Discharge Limit",
"unit": PERCENTAGE,
"icon": "mdi:battery-arrow-down",
"device_class": None,
"state_class": SensorStateClass.MEASUREMENT,
}
}
class JackeryDataCoordinator:
"""协调器管理MQTT订阅和数据获取供所有传感器实体共享使用."""
def __init__(self, hass: HomeAssistant, topic_prefix: str) -> None:
"""初始化协调器."""
self.hass = hass
self._topic_prefix = topic_prefix
# The new protocol generally uses 'hb' as root, but we respect the configured prefix if given.
# If the user configured 'homeassistant/sensor' (default), we might want to ignore it
# and use 'hb' if the new protocol is strict.
# For now, let's assume 'hb' is the fixed root for this new protocol version.
self._topic_root = "hb"
self._device_sn = "" # 设备序列号
self._sensors = {} # {sensor_id: entity}
self._data_task = None
self._subscribed = False
# Topic patterns
self._topic_status_wildcard = f"{self._topic_root}/device/+/status"
def register_sensor(self, sensor_id: str, entity: "JackerySensor") -> None:
"""注册传感器实体."""
self._sensors[sensor_id] = entity
def unregister_sensor(self, sensor_id: str) -> None:
"""注销传感器实体."""
if sensor_id in self._sensors:
del self._sensors[sensor_id]
async def async_start(self) -> None:
"""启动协调器."""
if self._subscribed:
return
try:
# 订阅状态主题 (Wildcard) 以发现设备和接收数据
@callback
def message_received(msg):
self._handle_message(msg)
await ha_mqtt.async_subscribe(
self.hass,
self._topic_status_wildcard,
message_received,
1
)
_LOGGER.info(f"Coordinator subscribed to: {self._topic_status_wildcard}")
self._subscribed = True
# 启动定时轮询
self._data_task = asyncio.create_task(self._periodic_data_request())
except Exception as e:
_LOGGER.error(f"Failed to start coordinator: {e}")
async def async_stop(self) -> None:
"""停止协调器."""
if self._data_task and not self._data_task.done():
self._data_task.cancel()
try:
await self._data_task
except asyncio.CancelledError:
pass
_LOGGER.info("Coordinator stopped")
def _handle_message(self, msg) -> None:
"""处理接收到的 MQTT 消息."""
try:
topic = msg.topic
payload = msg.payload
if isinstance(payload, bytes):
payload = payload.decode("utf-8")
# Extract device SN from topic: hb/device/{sn}/status
match = re.search(r"hb/device/([^/]+)/status", topic)
if match:
sn = match.group(1)
if not self._device_sn:
self._device_sn = sn
_LOGGER.info(f"Discovered device SN: {self._device_sn}")
elif self._device_sn != sn:
_LOGGER.debug(f"Received data from another device: {sn}")
# Parse Payload
try:
data = json.loads(payload)
except json.JSONDecodeError:
_LOGGER.warning(f"Invalid JSON payload on {topic}")
return
self._distribute_data(data)
except Exception as e:
_LOGGER.error(f"Error handling message: {e}")
def _distribute_data(self, data: dict) -> None:
"""分发数据给传感器."""
for sensor_id, entity in self._sensors.items():
json_key = SENSORS[sensor_id].get("json_key")
if json_key and json_key in data:
raw_value = data[json_key]
entity._update_from_coordinator(raw_value)
async def _periodic_data_request(self) -> None:
"""定期发送 'type: 25' 指令请求全量数据."""
_LOGGER.info("Starting periodic data polling...")
await asyncio.sleep(2)
while True:
try:
if not self._device_sn:
_LOGGER.debug("Waiting for device SN discovery...")
await asyncio.sleep(5)
continue
# Construct Action Topic
action_topic = f"{self._topic_root}/device/{self._device_sn}/action"
# Construct Payload
payload = {
"type": 25,
"eventId": 0,
"messageId": random.randint(1000, 9999),
"ts": int(time.time()),
"body": None
}
await ha_mqtt.async_publish(
self.hass,
action_topic,
json.dumps(payload),
0,
False
)
_LOGGER.debug(f"Sent poll request to {action_topic}")
await asyncio.sleep(REQUEST_INTERVAL)
except asyncio.CancelledError:
break
except Exception as e:
_LOGGER.error(f"Error in polling task: {e}")
await asyncio.sleep(REQUEST_INTERVAL)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up Jackery sensors."""
config = config_entry.data
topic_prefix = config.get("topic_prefix", "hb")
coordinator = JackeryDataCoordinator(hass, topic_prefix)
hass.data[DOMAIN][config_entry.entry_id]["coordinator"] = coordinator
entities = []
for sensor_id, sensor_config in SENSORS.items():
if sensor_config.get("json_key") is None:
continue
entity = JackerySensor(
sensor_id=sensor_id,
coordinator=coordinator,
config_entry_id=config_entry.entry_id,
)
entities.append(entity)
async_add_entities(entities)
await coordinator.async_start()
class JackerySensor(SensorEntity):
"""Jackery Sensor."""
def __init__(
self,
sensor_id: str,
coordinator: JackeryDataCoordinator,
config_entry_id: str,
) -> None:
"""Initialize."""
self._sensor_id = sensor_id
self._coordinator = coordinator
self._config = SENSORS[sensor_id]
self._attr_name = self._config["name"]
self._attr_native_unit_of_measurement = self._config["unit"]
self._attr_icon = self._config["icon"]
self._attr_device_class = self._config["device_class"]
self._attr_state_class = self._config["state_class"]
self._attr_unique_id = f"jackery_{sensor_id}"
self._attr_has_entity_name = True
self._attr_device_info = {
"identifiers": {(DOMAIN, config_entry_id)},
"name": "Jackery",
"manufacturer": "Jackery",
"model": "Energy Monitor",
}
@property
def should_poll(self) -> bool:
return False
async def async_added_to_hass(self) -> None:
await super().async_added_to_hass()
self._coordinator.register_sensor(self._sensor_id, self)
async def async_will_remove_from_hass(self) -> None:
self._coordinator.unregister_sensor(self._sensor_id)
await super().async_will_remove_from_hass()
def _update_from_coordinator(self, value: Any) -> None:
"""Receive data from coordinator."""
# Process specific conversions
if self._sensor_id == "battery_temperature":
# cellTemp is 0.1 C
try:
self._attr_native_value = float(value) * 0.1
except (TypeError, ValueError):
pass
elif self._sensor_id == "battery_soc":
self._attr_native_value = value
else:
self._attr_native_value = value
self._attr_available = True
self.async_write_ha_state()
@property
def extra_state_attributes(self) -> dict[str, Any]:
return {
"device_sn": self._coordinator._device_sn,
"raw_key": self._config.get("json_key")
}

View File

@@ -2,8 +2,8 @@
"config": {
"step": {
"user": {
"title": "配置 JackeryHome",
"description": "设置您的 JackeryHome 能源监控集成。注意:必须先配置 MQTT 集成。",
"title": "配置 Jackery",
"description": "设置您的 Jackery 能源监控集成。注意:必须先配置 MQTT 集成。",
"data": {
"topic_prefix": "MQTT 主题前缀"
}

View File

@@ -2,8 +2,8 @@
"config": {
"step": {
"user": {
"title": "配置 JackeryHome",
"description": "设置您的 JackeryHome 能源监控集成。注意:必须先配置 MQTT 集成。",
"title": "配置 Jackery",
"description": "设置您的 Jackery 能源监控集成。注意:必须先配置 MQTT 集成。",
"data": {
"topic_prefix": "MQTT 主题前缀"
}