更新插座和CT的支持
This commit is contained in:
@@ -267,9 +267,11 @@ class JackeryDataCoordinator:
|
||||
|
||||
self._known_plugs = set() # Set of known plug SNs
|
||||
self.add_entities_callback = None # Callback to add new entities
|
||||
self._data_cache = {} # Cache for merged data from status and events
|
||||
|
||||
# Topic patterns
|
||||
self._topic_status_wildcard = f"{self._topic_root}/device/+/status"
|
||||
self._topic_event_wildcard = f"{self._topic_root}/device/+/event"
|
||||
|
||||
def register_sensor(self, sensor_id: str, entity: "JackerySensor") -> None:
|
||||
"""注册传感器实体."""
|
||||
@@ -299,6 +301,15 @@ class JackeryDataCoordinator:
|
||||
)
|
||||
_LOGGER.info(f"Coordinator subscribed to: {self._topic_status_wildcard}")
|
||||
|
||||
# Subscribe to event topic for sub-device data (Type 101)
|
||||
await ha_mqtt.async_subscribe(
|
||||
self.hass,
|
||||
self._topic_event_wildcard,
|
||||
message_received,
|
||||
1
|
||||
)
|
||||
_LOGGER.info(f"Coordinator subscribed to: {self._topic_event_wildcard}")
|
||||
|
||||
self._subscribed = True
|
||||
|
||||
# 启动定时轮询
|
||||
@@ -325,10 +336,11 @@ class JackeryDataCoordinator:
|
||||
if isinstance(payload, bytes):
|
||||
payload = payload.decode("utf-8")
|
||||
|
||||
# Extract device SN from topic: {prefix}/device/{sn}/status
|
||||
match = re.search(rf"{self._topic_root}/device/([^/]+)/status", topic)
|
||||
# Extract device SN from topic: {prefix}/device/{sn}/status OR .../event
|
||||
match = re.search(rf"{self._topic_root}/device/([^/]+)/(status|event)", topic)
|
||||
if match:
|
||||
sn = match.group(1)
|
||||
msg_type = match.group(2) # 'status' or 'event'
|
||||
if not self._device_sn:
|
||||
self._device_sn = sn
|
||||
_LOGGER.info(f"Discovered device SN: {self._device_sn}")
|
||||
@@ -337,51 +349,94 @@ class JackeryDataCoordinator:
|
||||
|
||||
# Parse Payload
|
||||
try:
|
||||
data = json.loads(payload)
|
||||
# 如果数据在 body 字段中,则提取 body
|
||||
if "body" in data and isinstance(data["body"], dict):
|
||||
data = data["body"]
|
||||
raw_data = json.loads(payload)
|
||||
msg_code = raw_data.get("type")
|
||||
body = raw_data.get("body")
|
||||
|
||||
# If body is missing or None, use empty dict or the raw_data itself if it looks like data
|
||||
# But protocol says data is in body.
|
||||
if body is None:
|
||||
# Some status messages might be flat? Assuming body per protocol.
|
||||
# If Type 101 and body is None, ignore.
|
||||
if msg_code == 101:
|
||||
return
|
||||
body = {}
|
||||
|
||||
# Merge logic
|
||||
# Type 101: Sub-device full data
|
||||
if msg_code == 101 and isinstance(body, dict):
|
||||
# Expect "plug": [ ... ]
|
||||
# Normalize to "plugs" and "cts" for internal usage
|
||||
raw_plugs = body.get("plug", [])
|
||||
if raw_plugs:
|
||||
# Separate CTs (devType=2?) and Plugs (others?)
|
||||
# User said CTs (2) and Plugs (6). Protocol example shows devType in item.
|
||||
|
||||
# Update "plugs" list (containing all sub-devices or just plugs?)
|
||||
# Let's keep "plugs" as the raw list of all sub-devices for _check_for_new_plugs to scan
|
||||
# Or better, separate them now.
|
||||
|
||||
current_cts = []
|
||||
current_plugs = []
|
||||
|
||||
for item in raw_plugs:
|
||||
dt = item.get("devType")
|
||||
# Assuming devType 2 is CT based on "Poll Sub-devices ... CTs (2)"
|
||||
if dt == 2:
|
||||
# Map keys for CT calculator: TphasePw -> gridBuy, TnphasePw -> gridSell ?
|
||||
# Or just pass item as is. _calculate_energy_flow expects item in "cts" list.
|
||||
current_cts.append(item)
|
||||
else:
|
||||
current_plugs.append(item)
|
||||
|
||||
self._data_cache["cts"] = current_cts
|
||||
# We store all in "plugs" for JackeryPlugSensor to find itself by SN
|
||||
self._data_cache["plugs"] = raw_plugs
|
||||
self._data_cache["plug"] = raw_plugs # Keep original key too
|
||||
|
||||
# Type 25 or Status: Main device data
|
||||
elif isinstance(body, dict):
|
||||
# Merge top-level keys
|
||||
self._data_cache.update(body)
|
||||
|
||||
except json.JSONDecodeError:
|
||||
_LOGGER.warning(f"Invalid JSON payload on {topic}")
|
||||
return
|
||||
|
||||
# Enrich data with calculations
|
||||
data = self._calculate_energy_flow(data)
|
||||
# Enrich data with calculations using merged cache
|
||||
# operate on copy or direct? Direct is fine.
|
||||
self._data_cache = self._calculate_energy_flow(self._data_cache)
|
||||
|
||||
# Check for new plugs
|
||||
self._check_for_new_plugs(data)
|
||||
self._check_for_new_plugs(self._data_cache)
|
||||
|
||||
self._distribute_data(data)
|
||||
self._distribute_data(self._data_cache)
|
||||
|
||||
except Exception as e:
|
||||
_LOGGER.error(f"Error handling message: {e}")
|
||||
|
||||
def _check_for_new_plugs(self, data: dict) -> None:
|
||||
"""检查并添加新发现的插座."""
|
||||
plugs = data.get("plugs")
|
||||
# Check both keys
|
||||
plugs = data.get("plugs") or data.get("plug")
|
||||
if not plugs or not isinstance(plugs, list):
|
||||
return
|
||||
|
||||
new_entities = []
|
||||
for plug in plugs:
|
||||
sn = plug.get("sn")
|
||||
# Check SN key (could be 'sn' or 'deviceSn')
|
||||
sn = plug.get("deviceSn") or plug.get("sn")
|
||||
dev_type = plug.get("devType")
|
||||
|
||||
# Filter out CTs (devType 2) if we only want plugs here?
|
||||
# JackeryPlugSensor implies "Plug". CTs are usually main sensors.
|
||||
if dev_type == 2:
|
||||
continue
|
||||
|
||||
if sn and sn not in self._known_plugs:
|
||||
_LOGGER.info(f"Discovered new plug: {sn}")
|
||||
_LOGGER.info(f"Discovered new plug: {sn} (Type: {dev_type})")
|
||||
self._known_plugs.add(sn)
|
||||
|
||||
# Create plug sensor
|
||||
# Pass entry_id if needed, but entity init can handle without it for simple cases?
|
||||
# Need config_entry_id. We can store it in coordinator or pass it later.
|
||||
# Actually, standard pattern is to use the config_entry passed in setup.
|
||||
# We'll need to store entry_id in coordinator or make JackeryPlugSensor use it.
|
||||
# Let's attach config_entry_id to coordinator in __init__?
|
||||
# Or just pass it. Coordinator doesn't have it currently.
|
||||
# We'll use a property or modify init.
|
||||
# For now, let's assume async_add_entities callback handles the addition.
|
||||
# But we need to instantiate the entity class.
|
||||
|
||||
# Wait, I need config_entry_id to create proper device_info identifiers.
|
||||
# I'll add config_entry_id to JackeryDataCoordinator.
|
||||
if hasattr(self, "config_entry_id"):
|
||||
entity = JackeryPlugSensor(
|
||||
plug_sn=sn,
|
||||
@@ -777,24 +832,27 @@ class JackeryPlugSensor(SensorEntity):
|
||||
|
||||
def _update_from_coordinator(self, data: dict) -> None:
|
||||
"""Receive data from coordinator."""
|
||||
plugs = data.get("plugs")
|
||||
plugs = data.get("plugs") or data.get("plug")
|
||||
if not plugs or not isinstance(plugs, list):
|
||||
return
|
||||
|
||||
# Find my plug data
|
||||
my_plug = next((p for p in plugs if p.get("sn") == self._plug_sn), None)
|
||||
my_plug = next((p for p in plugs if (p.get("sn") == self._plug_sn or p.get("deviceSn") == self._plug_sn)), None)
|
||||
if not my_plug:
|
||||
return
|
||||
|
||||
# Update state (outPw)
|
||||
try:
|
||||
self._attr_native_value = float(my_plug.get("outPw", 0))
|
||||
|
||||
# Update name if available and changed?
|
||||
# name = my_plug.get("name")
|
||||
# if name and name != self._attr_name:
|
||||
# self._attr_name = name # Changing name dynamically might be tricky for entity registry.
|
||||
# Try specific plug keys from protocol or generic 'outPw'
|
||||
# Protocol example: { "a": 12, ... } doesn't show power explicitly.
|
||||
# Assuming 'outPw' or similar exists, or maybe 'p' or 'power'.
|
||||
# Existing code used 'outPw'. Let's stick to it or add fallbacks if known.
|
||||
val = my_plug.get("outPw")
|
||||
if val is None:
|
||||
val = my_plug.get("power") # Common alternative
|
||||
|
||||
if val is not None:
|
||||
self._attr_native_value = float(val)
|
||||
self._attr_available = True
|
||||
self.async_write_ha_state()
|
||||
except (TypeError, ValueError):
|
||||
|
||||
Reference in New Issue
Block a user