From 15215ab232b563236acdfc2d967303a065f045d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=8D=E6=B1=82=E5=9C=A3=E5=89=91?= Date: Tue, 18 Nov 2025 16:08:40 +0800 Subject: [PATCH] refactor: improve sensor handling and data processing in JackeryHome component - Introduced a constant for request interval to standardize data request timing. - Refactored meter serial number mapping logic for clarity and maintainability. - Enhanced LWT and data message handling with dedicated methods for improved readability and error handling. - Updated periodic data request logic to utilize the new request interval constant. --- custom_components/JackeryHome/sensor.py | 378 ++++++++++-------------- 1 file changed, 155 insertions(+), 223 deletions(-) diff --git a/custom_components/JackeryHome/sensor.py b/custom_components/JackeryHome/sensor.py index 0926765..1b219fd 100644 --- a/custom_components/JackeryHome/sensor.py +++ b/custom_components/JackeryHome/sensor.py @@ -21,6 +21,9 @@ from . import DOMAIN _LOGGER = logging.getLogger(__name__) +# 常量定义 +REQUEST_INTERVAL = 5 # 数据请求间隔(秒) + # Meter SN 映射(传感器ID到meter_sn的映射) METER_SN_MAP = { "battery_soc": "21548033", @@ -206,15 +209,16 @@ class JackeryHomeSensor(SensorEntity): self._attr_available = False self._data_task = None self._device_sn = "" # 设备序列号(从 LWT 消息中获取) + # 获取 meter_sn,对于功率传感器,使用对应的 _power 键 - if sensor_id == "grid_import": - self._meter_sn = METER_SN_MAP.get("grid_import_power", 0) - elif sensor_id == "grid_export": - self._meter_sn = METER_SN_MAP.get("grid_export_power", 0) - elif sensor_id in ["battery_charge", "battery_discharge"]: - self._meter_sn = METER_SN_MAP.get("battery_charge_power", 0) - else: - self._meter_sn = METER_SN_MAP.get(sensor_id, 0) # 当前传感器的 meter_sn + meter_sn_key_map = { + "grid_import": "grid_import_power", + "grid_export": "grid_export_power", + "battery_charge": "battery_charge_power", + "battery_discharge": "battery_discharge_power", + } + meter_sn_key = meter_sn_key_map.get(sensor_id, sensor_id) + self._meter_sn = METER_SN_MAP.get(meter_sn_key, 0) # 能源传感器标识 self._is_energy_sensor = device_class == SensorDeviceClass.ENERGY @@ -224,187 +228,144 @@ class JackeryHomeSensor(SensorEntity): """No polling needed.""" return False - async def async_added_to_hass(self) -> None: - """Set up the sensor.""" - _LOGGER.info(f"JackeryHome sensor {self._sensor_id} added to Home Assistant") - - # 订阅 LWT 主题以获取设备序列号 - @callback - def lwt_message_received(msg): - """Handle LWT messages to get device serial number.""" - try: - payload = msg.payload - if isinstance(payload, bytes): - payload = payload.decode("utf-8") - - _LOGGER.debug(f"Received LWT message: {payload}") - - try: - data = json.loads(payload) - if isinstance(data, dict) and "gw_sn" in data: - self._device_sn = data["gw_sn"] - _LOGGER.info(f"Device serial number updated: {self._device_sn}") - except json.JSONDecodeError: - _LOGGER.warning(f"Failed to parse LWT message: {payload}") - - except Exception as e: - _LOGGER.error(f"Error processing LWT message: {e}") - - # 订阅 device/data topic 处理消息回调 - -@callback -def data_message_received(msg): - """Handle new MQTT messages from device/data topic.""" - try: - payload = msg.payload - if isinstance(payload, bytes): - payload = payload.decode("utf-8") - - _LOGGER.debug(f"Received data message for {self._sensor_id}: {payload}") - - data = None + def _handle_lwt_message(self, msg) -> None: + """Handle LWT messages to get device serial number.""" try: - data = json.loads(payload) - except json.JSONDecodeError: - data = None - - # data_get 响应,解析方式参考 data_transmission_example.py - if isinstance(data, dict) and data.get("cmd") == "data_get": + payload = msg.payload + if isinstance(payload, bytes): + payload = payload.decode("utf-8") + + _LOGGER.debug(f"Received LWT message: {payload}") + try: - target_meter_sn = str(self._meter_sn) - info = data.get("info", {}) - dev_list = info.get("dev_list", []) - for dev in dev_list: - meter_list = dev.get("meter_list", []) - for meter in meter_list: - if not isinstance(meter, (list, tuple)) or len(meter) < 2: - continue + data = json.loads(payload) + if isinstance(data, dict) and "gw_sn" in data: + self._device_sn = data["gw_sn"] + _LOGGER.info(f"Device serial number updated: {self._device_sn}") + except json.JSONDecodeError: + _LOGGER.warning(f"Failed to parse LWT message: {payload}") + + except Exception as e: + _LOGGER.error(f"Error processing LWT message: {e}") - meter_sn = str(meter[0]) - try: - meter_value_float = float(meter[1]) - except (ValueError, TypeError): - continue - - meter_value = int(meter_value_float) if meter_value_float == int(meter_value_float) else meter_value_float - - if meter_sn != target_meter_sn: - continue - - if self._sensor_id == "grid_import": - value = abs(meter_value) if meter_value < 0 else 0 - elif self._sensor_id == "grid_export": - value = meter_value if meter_value > 0 else 0 - elif self._sensor_id == "battery_charge": - value = abs(meter_value) if meter_value < 0 else 0 - elif self._sensor_id == "battery_discharge": - value = meter_value if meter_value > 0 else 0 - else: - value = meter_value - - self._attr_native_value = value - self._attr_available = True - self.async_write_ha_state() - _LOGGER.debug(f"Updated {self._sensor_id} with value: {value}") - return - - _LOGGER.debug(f"No matching data found for {self._sensor_id} in data_get payload") - except Exception as parse_err: - _LOGGER.error(f"Error parsing data_get response: {parse_err}") - return - - # 兼容旧格式的数据 - if isinstance(data, dict): - if self._sensor_id in data: - value = data[self._sensor_id] - elif "value" in data: - value = data["value"] - else: - value = data + def _process_meter_value(self, meter_value: float) -> float: + """根据传感器类型处理 meter 值.""" + if self._sensor_id == "grid_import": + return abs(meter_value) if meter_value < 0 else 0 + elif self._sensor_id == "grid_export": + return meter_value if meter_value > 0 else 0 + elif self._sensor_id == "battery_charge": + return abs(meter_value) if meter_value < 0 else 0 + elif self._sensor_id == "battery_discharge": + return meter_value if meter_value > 0 else 0 else: + return meter_value + + def _handle_data_message(self, msg) -> None: + """Handle new MQTT messages from device/data topic.""" + try: + payload = msg.payload + if isinstance(payload, bytes): + payload = payload.decode("utf-8") + + _LOGGER.debug(f"Received data message for {self._sensor_id}: {payload}") + + data = None try: - value = float(payload) - except ValueError: - self._attr_available = False - self.async_write_ha_state() + data = json.loads(payload) + except json.JSONDecodeError: + data = None + + # 处理 data_get 响应 + if isinstance(data, dict) and data.get("cmd") == "data_get": + value = self._parse_data_get_response(data) + if value is not None: + self._update_sensor_value(value) return + # 兼容旧格式的数据 + if isinstance(data, dict): + if self._sensor_id in data: + value = data[self._sensor_id] + elif "value" in data: + value = data["value"] + else: + value = data + else: + try: + value = float(payload) + except ValueError: + self._attr_available = False + self.async_write_ha_state() + return + + if value is not None: + self._update_sensor_value(value) + + except Exception as e: + _LOGGER.error(f"Error processing data message for {self._sensor_id}: {e}") + + def _update_sensor_value(self, value: Any) -> None: + """更新传感器值并通知 Home Assistant.""" self._attr_native_value = value self._attr_available = True self.async_write_ha_state() _LOGGER.debug(f"Updated {self._sensor_id} with value: {value}") - except Exception as e: - _LOGGER.error(f"Error processing data message for {self._sensor_id}: {e}") - # 在回调函数中不能直接使用 await,需要通过 async_create_task 执行异步操作 - async def _resubscribe_topics(): - """重新订阅 MQTT 主题的异步函数""" - # 定义 LWT 消息处理回调 - @callback - def lwt_message_received(msg): - """Handle LWT messages to get device serial number.""" - try: - payload = msg.payload - if isinstance(payload, bytes): - payload = payload.decode("utf-8") - - _LOGGER.debug(f"Received LWT message: {payload}") - - try: - data = json.loads(payload) - if isinstance(data, dict) and "gw_sn" in data: - self._device_sn = data["gw_sn"] - _LOGGER.info(f"Device serial number updated: {self._device_sn}") - except json.JSONDecodeError: - _LOGGER.warning(f"Failed to parse LWT message: {payload}") - - except Exception as e: - _LOGGER.error(f"Error processing LWT message: {e}") - - # 订阅 LWT topic 以获取设备序列号 - await ha_mqtt.async_subscribe( - self.hass, - self._gw_lwt_topic, - lwt_message_received, - 1 - ) - _LOGGER.info(f"Subscribed to MQTT topic: {self._gw_lwt_topic}") - - # 订阅 device/data topic - await ha_mqtt.async_subscribe( - self.hass, - self._data_topic, - data_message_received, - 1 - ) - - _LOGGER.info(f"Subscribed to MQTT topic: {self._data_topic}") - - # 启动定时器,每隔5秒向 device/data-get 发送数据获取请求 - self._data_task = asyncio.create_task(self._periodic_data_request()) + async def async_added_to_hass(self) -> None: + """Set up the sensor.""" + _LOGGER.info(f"JackeryHome sensor {self._sensor_id} added to Home Assistant") - self.hass.async_create_task(_resubscribe_topics()) + # 创建 LWT 消息处理回调 + @callback + def lwt_message_received(msg): + """Callback wrapper for LWT messages.""" + self._handle_lwt_message(msg) + + # 创建数据消息处理回调 + @callback + def data_message_received(msg): + """Callback wrapper for data messages.""" + self._handle_data_message(msg) + + # 订阅 LWT topic 以获取设备序列号 + await ha_mqtt.async_subscribe( + self.hass, + self._gw_lwt_topic, + lwt_message_received, + 1 + ) + _LOGGER.info(f"Subscribed to MQTT topic: {self._gw_lwt_topic}") + + # 订阅 device/data topic + await ha_mqtt.async_subscribe( + self.hass, + self._data_topic, + data_message_received, + 1 + ) + _LOGGER.info(f"Subscribed to MQTT topic: {self._data_topic}") + + # 启动定时器,每隔5秒向 device/data-get 发送数据获取请求 + self._data_task = asyncio.create_task(self._periodic_data_request()) def _construct_data_get_request(self) -> dict: """构造 data_get 格式的请求数据.""" - data = { + return { "cmd": "data_get", - "gw_sn": self._device_sn if self._device_sn else "", + "gw_sn": self._device_sn or "", "timestamp": str(int(time.time() * 1000)), "token": str(random.randint(1000, 9999)), "info": { "dev_list": [ { - "dev_sn": "ems_" + self._device_sn if self._device_sn else "ems_", - "meter_list": [ - self._meter_sn, - ] + "dev_sn": f"ems_{self._device_sn}" if self._device_sn else "ems_", + "meter_list": [self._meter_sn], } ] } } - return data def _parse_data_get_response(self, data: dict) -> Any: """解析 data_get 格式的响应数据. @@ -419,61 +380,32 @@ def data_message_received(msg): info = data.get("info", {}) dev_list = info.get("dev_list", []) + target_meter_sn = str(self._meter_sn) for dev in dev_list: meter_list = dev.get("meter_list", []) for meter in meter_list: # 响应格式:meter 是 [meter_sn, meter_value] 格式的列表 - if isinstance(meter, list) and len(meter) >= 2: - meter_sn_raw = meter[0] - meter_value_raw = meter[1] - - # 处理 meter_sn:可能是字符串或整数,统一转换为整数进行比较 - try: - meter_sn = int(meter_sn_raw) if isinstance(meter_sn_raw, str) else int(meter_sn_raw) - except (ValueError, TypeError): - meter_sn = meter_sn_raw - - # 先转换为 float,然后判断是否可以转换为 int - meter_value_float = float(meter_value_raw) - # 如果小数部分为 0,则转换为 int,否则保留 float - meter_value = int(meter_value_float) if meter_value_float == int(meter_value_float) else meter_value_float - - # 检查是否匹配当前传感器的 meter_sn(支持字符串和整数比较) - if int(meter_sn) == int(self._meter_sn): - # 处理特殊逻辑(电网功率和电池功率) - if self._sensor_id == "grid_import": - # 电网功率:负值为购买,正值为出售 - if meter_value < 0: - return abs(meter_value) - else: - return 0 - elif self._sensor_id == "grid_export": - # 电网功率:负值为购买,正值为出售 - if meter_value > 0: - return meter_value - else: - return 0 - elif self._sensor_id == "battery_charge": - # 电池功率:负值为充电,正值为放电 - if meter_value < 0: - return abs(meter_value) - else: - return 0 - elif self._sensor_id == "battery_discharge": - # 电池功率:负值为充电,正值为放电 - if meter_value > 0: - return meter_value - else: - return 0 - else: - # 其他传感器直接返回值 - return meter_value - # 请求格式:meter 只是 meter_sn (整数),忽略请求 - elif isinstance(meter, (int, float)): - # 这是请求格式,不是响应,忽略 - pass + if not isinstance(meter, (list, tuple)) or len(meter) < 2: + continue + + meter_sn = str(meter[0]) + if meter_sn != target_meter_sn: + continue + + try: + meter_value_float = float(meter[1]) + except (ValueError, TypeError): + _LOGGER.debug(f"Invalid meter value for {self._sensor_id}: {meter[1]}") + continue + + # 如果小数部分为 0,则转换为 int,否则保留 float + meter_value = int(meter_value_float) if meter_value_float == int(meter_value_float) else meter_value_float + + # 使用统一的方法处理 meter 值 + return self._process_meter_value(meter_value) + _LOGGER.debug(f"No matching data found for {self._sensor_id} in data_get payload") return None except Exception as e: @@ -486,31 +418,31 @@ def data_message_received(msg): try: # 如果还没有设备序列号,等待一段时间再重试 if not self._device_sn: - _LOGGER.debug(f"Device serial number not available yet for {self._sensor_id}, waiting...") - await asyncio.sleep(5) + _LOGGER.debug( + f"Device serial number not available yet for {self._sensor_id}, waiting..." + ) + await asyncio.sleep(REQUEST_INTERVAL) continue - # 构造 data_get 格式的请求 + # 构造并发送 data_get 格式的请求 request_data = self._construct_data_get_request() - - # 发送数据获取请求 - topic = self._data_get_topic await ha_mqtt.async_publish( self.hass, - topic, + self._data_get_topic, json.dumps(request_data, ensure_ascii=False), 1, False ) - _LOGGER.debug(f"Sent data_get request for {self._sensor_id} (meter_sn: {self._meter_sn}) to {topic}") + _LOGGER.debug( + f"Sent data_get request for {self._sensor_id} " + f"(meter_sn: {self._meter_sn}) to {self._data_get_topic}" + ) - # 等待5秒 - await asyncio.sleep(5) + await asyncio.sleep(REQUEST_INTERVAL) except Exception as e: _LOGGER.error(f"Error in periodic data request for {self._sensor_id}: {e}") - # 出错时等待5秒再重试 - await asyncio.sleep(5) + await asyncio.sleep(REQUEST_INTERVAL) async def async_will_remove_from_hass(self) -> None: """Clean up when sensor is removed."""