"""Jackery Sensor Platform.""" import asyncio import json import logging import time import random import re from typing import Any, Callable from homeassistant.components import mqtt as ha_mqtt from homeassistant.components.sensor import ( SensorDeviceClass, SensorEntity, SensorStateClass, ) from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant, callback from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.const import UnitOfPower, UnitOfEnergy, PERCENTAGE, UnitOfTemperature from . import DOMAIN _LOGGER = logging.getLogger(__name__) # 常量定义 REQUEST_INTERVAL = 10 # 数据请求间隔(秒) # 传感器配置 SENSORS = { # 电池相关 "battery_soc": { "json_key": "batSoc", "name": "Battery SOC", "unit": PERCENTAGE, "icon": "mdi:battery-50", "device_class": SensorDeviceClass.BATTERY, "state_class": SensorStateClass.MEASUREMENT, }, "battery_charge_power": { "json_key": "batInPw", "name": "Battery Charge Power", "unit": UnitOfPower.WATT, "icon": "mdi:battery-charging", "device_class": SensorDeviceClass.POWER, "state_class": SensorStateClass.MEASUREMENT, }, "battery_discharge_power": { "json_key": "batOutPw", "name": "Battery Discharge Power", "unit": UnitOfPower.WATT, "icon": "mdi:battery-minus", "device_class": SensorDeviceClass.POWER, "state_class": SensorStateClass.MEASUREMENT, }, "battery_temperature": { "json_key": "cellTemp", "name": "Battery Temperature", "unit": UnitOfTemperature.CELSIUS, "icon": "mdi:thermometer", "device_class": SensorDeviceClass.TEMPERATURE, "state_class": SensorStateClass.MEASUREMENT, }, "battery_count": { "json_key": "batNum", "name": "Battery Count", "unit": None, "icon": "mdi:battery-multiple", "device_class": None, "state_class": SensorStateClass.MEASUREMENT, }, # 太阳能 "solar_power": { "json_key": "pvPw", "name": "Solar Power", "unit": UnitOfPower.WATT, "icon": "mdi:solar-power", "device_class": SensorDeviceClass.POWER, "state_class": SensorStateClass.MEASUREMENT, }, "solar_power_pv1": { "json_key": "pv1", "name": "Solar Power PV1", "unit": UnitOfPower.WATT, "icon": "mdi:solar-panel", "device_class": SensorDeviceClass.POWER, "state_class": SensorStateClass.MEASUREMENT, }, "solar_power_pv2": { "json_key": "pv2", "name": "Solar Power PV2", "unit": UnitOfPower.WATT, "icon": "mdi:solar-panel", "device_class": SensorDeviceClass.POWER, "state_class": SensorStateClass.MEASUREMENT, }, "solar_power_pv3": { "json_key": "pv3", "name": "Solar Power PV3", "unit": UnitOfPower.WATT, "icon": "mdi:solar-panel", "device_class": SensorDeviceClass.POWER, "state_class": SensorStateClass.MEASUREMENT, }, "solar_power_pv4": { "json_key": "pv4", "name": "Solar Power PV4", "unit": UnitOfPower.WATT, "icon": "mdi:solar-panel", "device_class": SensorDeviceClass.POWER, "state_class": SensorStateClass.MEASUREMENT, }, # 电网相关 "grid_import_power": { # Grid -> System (outOngridPw) "json_key": "outOngridPw", "name": "Grid Import Power", "unit": UnitOfPower.WATT, "icon": "mdi:transmission-tower-import", "device_class": SensorDeviceClass.POWER, "state_class": SensorStateClass.MEASUREMENT, }, "grid_export_power": { # System -> Grid/Home (inOngirdPw) "json_key": "inOngirdPw", "name": "Grid Export Power", "unit": UnitOfPower.WATT, "icon": "mdi:transmission-tower-export", "device_class": SensorDeviceClass.POWER, "state_class": SensorStateClass.MEASUREMENT, }, "max_output_power": { "json_key": "maxOutPw", "name": "Max Output Power (OnGrid)", "unit": UnitOfPower.WATT, "icon": "mdi:speedometer", "device_class": SensorDeviceClass.POWER, "state_class": SensorStateClass.MEASUREMENT, }, # EPS (离网输出) "eps_output_power": { "json_key": "swEpsOutPw", "name": "EPS Output Power", "unit": UnitOfPower.WATT, "icon": "mdi:power-plug", "device_class": SensorDeviceClass.POWER, "state_class": SensorStateClass.MEASUREMENT, }, "eps_input_power": { "json_key": "swEpsInPw", "name": "EPS Input Power", "unit": UnitOfPower.WATT, "icon": "mdi:power-plug", "device_class": SensorDeviceClass.POWER, "state_class": SensorStateClass.MEASUREMENT, }, "eps_state": { "json_key": "swEpsState", "name": "EPS State", "unit": None, "icon": "mdi:power-settings", "device_class": None, "state_class": None, # 1-Normal, 0-Abnormal }, "eps_switch": { "json_key": "swEps", "name": "EPS Switch Status", "unit": None, "icon": "mdi:toggle-switch", "device_class": None, "state_class": None, # 1-On, 0-Off }, # Limits & Settings & Status "soc_charge_limit": { "json_key": "socChgLimit", "name": "SOC Charge Limit", "unit": PERCENTAGE, "icon": "mdi:battery-arrow-up", "device_class": None, "state_class": SensorStateClass.MEASUREMENT, }, "soc_discharge_limit": { "json_key": "socDischgLimit", "name": "SOC Discharge Limit", "unit": PERCENTAGE, "icon": "mdi:battery-arrow-down", "device_class": None, "state_class": SensorStateClass.MEASUREMENT, }, "is_auto_standby": { "json_key": "isAutoStandby", "name": "Auto Standby Allowed", "unit": None, "icon": "mdi:power-sleep", "device_class": None, "state_class": None, # 1-Allowed, 0-Not Allowed }, "auto_standby_status": { "json_key": "autoStandby", "name": "Auto Standby Status", "unit": None, "icon": "mdi:power-sleep", "device_class": None, "state_class": None, # 0-Invalid, 1-Sleep/Off, 2-On } } class JackeryDataCoordinator: """协调器:管理MQTT订阅和数据获取,供所有传感器实体共享使用.""" def __init__(self, hass: HomeAssistant, topic_prefix: str) -> None: """初始化协调器.""" self.hass = hass self._topic_prefix = topic_prefix self._topic_root = "hb" self._device_sn = "" # 设备序列号 self._sensors = {} # {sensor_id: entity} self._data_task = None self._subscribed = False # Topic patterns self._topic_status_wildcard = f"{self._topic_root}/device/+/status" def register_sensor(self, sensor_id: str, entity: "JackerySensor") -> None: """注册传感器实体.""" self._sensors[sensor_id] = entity def unregister_sensor(self, sensor_id: str) -> None: """注销传感器实体.""" if sensor_id in self._sensors: del self._sensors[sensor_id] async def async_start(self) -> None: """启动协调器.""" if self._subscribed: return try: # 订阅状态主题 (Wildcard) 以发现设备和接收数据 @callback def message_received(msg): self._handle_message(msg) await ha_mqtt.async_subscribe( self.hass, self._topic_status_wildcard, message_received, 1 ) _LOGGER.info(f"Coordinator subscribed to: {self._topic_status_wildcard}") self._subscribed = True # 启动定时轮询 self._data_task = asyncio.create_task(self._periodic_data_request()) except Exception as e: _LOGGER.error(f"Failed to start coordinator: {e}") async def async_stop(self) -> None: """停止协调器.""" if self._data_task and not self._data_task.done(): self._data_task.cancel() try: await self._data_task except asyncio.CancelledError: pass _LOGGER.info("Coordinator stopped") def _handle_message(self, msg) -> None: """处理接收到的 MQTT 消息.""" try: topic = msg.topic payload = msg.payload if isinstance(payload, bytes): payload = payload.decode("utf-8") # Extract device SN from topic: hb/device/{sn}/status match = re.search(r"hb/device/([^/]+)/status", topic) if match: sn = match.group(1) if not self._device_sn: self._device_sn = sn _LOGGER.info(f"Discovered device SN: {self._device_sn}") elif self._device_sn != sn: _LOGGER.debug(f"Received data from another device: {sn}") # Parse Payload try: data = json.loads(payload) except json.JSONDecodeError: _LOGGER.warning(f"Invalid JSON payload on {topic}") return self._distribute_data(data) except Exception as e: _LOGGER.error(f"Error handling message: {e}") def _distribute_data(self, data: dict) -> None: """分发数据给传感器.""" for sensor_id, entity in self._sensors.items(): json_key = SENSORS[sensor_id].get("json_key") if json_key and json_key in data: raw_value = data[json_key] entity._update_from_coordinator(raw_value) async def _periodic_data_request(self) -> None: """定期发送 'type: 25' 指令请求全量数据.""" _LOGGER.info("Starting periodic data polling...") await asyncio.sleep(2) while True: try: if not self._device_sn: _LOGGER.debug("Waiting for device SN discovery...") await asyncio.sleep(5) continue # Construct Action Topic action_topic = f"{self._topic_root}/device/{self._device_sn}/action" # Construct Payload payload = { "type": 25, "eventId": 0, "messageId": random.randint(1000, 9999), "ts": int(time.time()), "body": None } await ha_mqtt.async_publish( self.hass, action_topic, json.dumps(payload), 0, False ) _LOGGER.debug(f"Sent poll request to {action_topic}") await asyncio.sleep(REQUEST_INTERVAL) except asyncio.CancelledError: break except Exception as e: _LOGGER.error(f"Error in polling task: {e}") await asyncio.sleep(REQUEST_INTERVAL) async def async_setup_entry( hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities: AddEntitiesCallback, ) -> None: """Set up Jackery sensors.""" config = config_entry.data topic_prefix = config.get("topic_prefix", "hb") coordinator = JackeryDataCoordinator(hass, topic_prefix) hass.data[DOMAIN][config_entry.entry_id]["coordinator"] = coordinator entities = [] for sensor_id, sensor_config in SENSORS.items(): if sensor_config.get("json_key") is None: continue entity = JackerySensor( sensor_id=sensor_id, coordinator=coordinator, config_entry_id=config_entry.entry_id, ) entities.append(entity) async_add_entities(entities) await coordinator.async_start() class JackerySensor(SensorEntity): """Jackery Sensor.""" def __init__( self, sensor_id: str, coordinator: JackeryDataCoordinator, config_entry_id: str, ) -> None: """Initialize.""" self._sensor_id = sensor_id self._coordinator = coordinator self._config = SENSORS[sensor_id] self._attr_name = self._config["name"] self._attr_native_unit_of_measurement = self._config["unit"] self._attr_icon = self._config["icon"] self._attr_device_class = self._config["device_class"] self._attr_state_class = self._config["state_class"] self._attr_unique_id = f"jackery_{sensor_id}" self._attr_has_entity_name = True self._attr_device_info = { "identifiers": {(DOMAIN, config_entry_id)}, "name": "Jackery", "manufacturer": "Jackery", "model": "Energy Monitor", } @property def should_poll(self) -> bool: return False async def async_added_to_hass(self) -> None: await super().async_added_to_hass() self._coordinator.register_sensor(self._sensor_id, self) async def async_will_remove_from_hass(self) -> None: self._coordinator.unregister_sensor(self._sensor_id) await super().async_will_remove_from_hass() def _update_from_coordinator(self, value: Any) -> None: """Receive data from coordinator.""" # Process specific conversions if self._sensor_id == "battery_temperature": # cellTemp is 0.1 C try: self._attr_native_value = float(value) * 0.1 except (TypeError, ValueError): pass elif self._sensor_id == "battery_soc": self._attr_native_value = value elif self._sensor_id.startswith("solar_power_pv") and isinstance(value, dict): # Handle dictionary for PV if it occurs, trying to find common value keys # Assumption based on "PV1发电总功率" -> it might contain power if "w" in value: self._attr_native_value = value["w"] elif "power" in value: self._attr_native_value = value["power"] else: # Fallback: display raw dict as string or extract first numeric value? # Using str(value) for safety if structure is unknown self._attr_native_value = str(value) else: self._attr_native_value = value self._attr_available = True self.async_write_ha_state() @property def extra_state_attributes(self) -> dict[str, Any]: return { "device_sn": self._coordinator._device_sn, "raw_key": self._config.get("json_key") }