diff --git a/custom_components/jackery/sensor.py b/custom_components/jackery/sensor.py index 2511b47..126f25a 100644 --- a/custom_components/jackery/sensor.py +++ b/custom_components/jackery/sensor.py @@ -267,9 +267,11 @@ class JackeryDataCoordinator: self._known_plugs = set() # Set of known plug SNs self.add_entities_callback = None # Callback to add new entities + self._data_cache = {} # Cache for merged data from status and events # Topic patterns self._topic_status_wildcard = f"{self._topic_root}/device/+/status" + self._topic_event_wildcard = f"{self._topic_root}/device/+/event" def register_sensor(self, sensor_id: str, entity: "JackerySensor") -> None: """注册传感器实体.""" @@ -299,6 +301,15 @@ class JackeryDataCoordinator: ) _LOGGER.info(f"Coordinator subscribed to: {self._topic_status_wildcard}") + # Subscribe to event topic for sub-device data (Type 101) + await ha_mqtt.async_subscribe( + self.hass, + self._topic_event_wildcard, + message_received, + 1 + ) + _LOGGER.info(f"Coordinator subscribed to: {self._topic_event_wildcard}") + self._subscribed = True # 启动定时轮询 @@ -325,10 +336,11 @@ class JackeryDataCoordinator: if isinstance(payload, bytes): payload = payload.decode("utf-8") - # Extract device SN from topic: {prefix}/device/{sn}/status - match = re.search(rf"{self._topic_root}/device/([^/]+)/status", topic) + # Extract device SN from topic: {prefix}/device/{sn}/status OR .../event + match = re.search(rf"{self._topic_root}/device/([^/]+)/(status|event)", topic) if match: sn = match.group(1) + msg_type = match.group(2) # 'status' or 'event' if not self._device_sn: self._device_sn = sn _LOGGER.info(f"Discovered device SN: {self._device_sn}") @@ -337,51 +349,94 @@ class JackeryDataCoordinator: # Parse Payload try: - data = json.loads(payload) - # 如果数据在 body 字段中,则提取 body - if "body" in data and isinstance(data["body"], dict): - data = data["body"] + raw_data = json.loads(payload) + msg_code = raw_data.get("type") + body = raw_data.get("body") + + # If body is missing or None, use empty dict or the raw_data itself if it looks like data + # But protocol says data is in body. + if body is None: + # Some status messages might be flat? Assuming body per protocol. + # If Type 101 and body is None, ignore. + if msg_code == 101: + return + body = {} + + # Merge logic + # Type 101: Sub-device full data + if msg_code == 101 and isinstance(body, dict): + # Expect "plug": [ ... ] + # Normalize to "plugs" and "cts" for internal usage + raw_plugs = body.get("plug", []) + if raw_plugs: + # Separate CTs (devType=2?) and Plugs (others?) + # User said CTs (2) and Plugs (6). Protocol example shows devType in item. + + # Update "plugs" list (containing all sub-devices or just plugs?) + # Let's keep "plugs" as the raw list of all sub-devices for _check_for_new_plugs to scan + # Or better, separate them now. + + current_cts = [] + current_plugs = [] + + for item in raw_plugs: + dt = item.get("devType") + # Assuming devType 2 is CT based on "Poll Sub-devices ... CTs (2)" + if dt == 2: + # Map keys for CT calculator: TphasePw -> gridBuy, TnphasePw -> gridSell ? + # Or just pass item as is. _calculate_energy_flow expects item in "cts" list. + current_cts.append(item) + else: + current_plugs.append(item) + + self._data_cache["cts"] = current_cts + # We store all in "plugs" for JackeryPlugSensor to find itself by SN + self._data_cache["plugs"] = raw_plugs + self._data_cache["plug"] = raw_plugs # Keep original key too + + # Type 25 or Status: Main device data + elif isinstance(body, dict): + # Merge top-level keys + self._data_cache.update(body) + except json.JSONDecodeError: _LOGGER.warning(f"Invalid JSON payload on {topic}") return - # Enrich data with calculations - data = self._calculate_energy_flow(data) + # Enrich data with calculations using merged cache + # operate on copy or direct? Direct is fine. + self._data_cache = self._calculate_energy_flow(self._data_cache) # Check for new plugs - self._check_for_new_plugs(data) + self._check_for_new_plugs(self._data_cache) - self._distribute_data(data) + self._distribute_data(self._data_cache) except Exception as e: _LOGGER.error(f"Error handling message: {e}") def _check_for_new_plugs(self, data: dict) -> None: """检查并添加新发现的插座.""" - plugs = data.get("plugs") + # Check both keys + plugs = data.get("plugs") or data.get("plug") if not plugs or not isinstance(plugs, list): return new_entities = [] for plug in plugs: - sn = plug.get("sn") + # Check SN key (could be 'sn' or 'deviceSn') + sn = plug.get("deviceSn") or plug.get("sn") + dev_type = plug.get("devType") + + # Filter out CTs (devType 2) if we only want plugs here? + # JackeryPlugSensor implies "Plug". CTs are usually main sensors. + if dev_type == 2: + continue + if sn and sn not in self._known_plugs: - _LOGGER.info(f"Discovered new plug: {sn}") + _LOGGER.info(f"Discovered new plug: {sn} (Type: {dev_type})") self._known_plugs.add(sn) - # Create plug sensor - # Pass entry_id if needed, but entity init can handle without it for simple cases? - # Need config_entry_id. We can store it in coordinator or pass it later. - # Actually, standard pattern is to use the config_entry passed in setup. - # We'll need to store entry_id in coordinator or make JackeryPlugSensor use it. - # Let's attach config_entry_id to coordinator in __init__? - # Or just pass it. Coordinator doesn't have it currently. - # We'll use a property or modify init. - # For now, let's assume async_add_entities callback handles the addition. - # But we need to instantiate the entity class. - - # Wait, I need config_entry_id to create proper device_info identifiers. - # I'll add config_entry_id to JackeryDataCoordinator. if hasattr(self, "config_entry_id"): entity = JackeryPlugSensor( plug_sn=sn, @@ -777,26 +832,29 @@ class JackeryPlugSensor(SensorEntity): def _update_from_coordinator(self, data: dict) -> None: """Receive data from coordinator.""" - plugs = data.get("plugs") + plugs = data.get("plugs") or data.get("plug") if not plugs or not isinstance(plugs, list): return # Find my plug data - my_plug = next((p for p in plugs if p.get("sn") == self._plug_sn), None) + my_plug = next((p for p in plugs if (p.get("sn") == self._plug_sn or p.get("deviceSn") == self._plug_sn)), None) if not my_plug: return # Update state (outPw) try: - self._attr_native_value = float(my_plug.get("outPw", 0)) + # Try specific plug keys from protocol or generic 'outPw' + # Protocol example: { "a": 12, ... } doesn't show power explicitly. + # Assuming 'outPw' or similar exists, or maybe 'p' or 'power'. + # Existing code used 'outPw'. Let's stick to it or add fallbacks if known. + val = my_plug.get("outPw") + if val is None: + val = my_plug.get("power") # Common alternative - # Update name if available and changed? - # name = my_plug.get("name") - # if name and name != self._attr_name: - # self._attr_name = name # Changing name dynamically might be tricky for entity registry. - - self._attr_available = True - self.async_write_ha_state() + if val is not None: + self._attr_native_value = float(val) + self._attr_available = True + self.async_write_ha_state() except (TypeError, ValueError): pass