refactor: improve sensor handling and data processing in JackeryHome component

- Introduced a constant for request interval to standardize data request timing.
- Refactored meter serial number mapping logic for clarity and maintainability.
- Enhanced LWT and data message handling with dedicated methods for improved readability and error handling.
- Updated periodic data request logic to utilize the new request interval constant.
This commit is contained in:
不求圣剑
2025-11-18 16:08:40 +08:00
parent 6a8fd88478
commit 15215ab232

View File

@@ -21,6 +21,9 @@ from . import DOMAIN
_LOGGER = logging.getLogger(__name__)
# 常量定义
REQUEST_INTERVAL = 5 # 数据请求间隔(秒)
# Meter SN 映射传感器ID到meter_sn的映射
METER_SN_MAP = {
"battery_soc": "21548033",
@@ -206,15 +209,16 @@ class JackeryHomeSensor(SensorEntity):
self._attr_available = False
self._data_task = None
self._device_sn = "" # 设备序列号(从 LWT 消息中获取)
# 获取 meter_sn对于功率传感器使用对应的 _power 键
if sensor_id == "grid_import":
self._meter_sn = METER_SN_MAP.get("grid_import_power", 0)
elif sensor_id == "grid_export":
self._meter_sn = METER_SN_MAP.get("grid_export_power", 0)
elif sensor_id in ["battery_charge", "battery_discharge"]:
self._meter_sn = METER_SN_MAP.get("battery_charge_power", 0)
else:
self._meter_sn = METER_SN_MAP.get(sensor_id, 0) # 当前传感器的 meter_sn
meter_sn_key_map = {
"grid_import": "grid_import_power",
"grid_export": "grid_export_power",
"battery_charge": "battery_charge_power",
"battery_discharge": "battery_discharge_power",
}
meter_sn_key = meter_sn_key_map.get(sensor_id, sensor_id)
self._meter_sn = METER_SN_MAP.get(meter_sn_key, 0)
# 能源传感器标识
self._is_energy_sensor = device_class == SensorDeviceClass.ENERGY
@@ -224,13 +228,7 @@ class JackeryHomeSensor(SensorEntity):
"""No polling needed."""
return False
async def async_added_to_hass(self) -> None:
"""Set up the sensor."""
_LOGGER.info(f"JackeryHome sensor {self._sensor_id} added to Home Assistant")
# 订阅 LWT 主题以获取设备序列号
@callback
def lwt_message_received(msg):
def _handle_lwt_message(self, msg) -> None:
"""Handle LWT messages to get device serial number."""
try:
payload = msg.payload
@@ -250,10 +248,20 @@ class JackeryHomeSensor(SensorEntity):
except Exception as e:
_LOGGER.error(f"Error processing LWT message: {e}")
# 订阅 device/data topic 处理消息回调
def _process_meter_value(self, meter_value: float) -> float:
"""根据传感器类型处理 meter 值."""
if self._sensor_id == "grid_import":
return abs(meter_value) if meter_value < 0 else 0
elif self._sensor_id == "grid_export":
return meter_value if meter_value > 0 else 0
elif self._sensor_id == "battery_charge":
return abs(meter_value) if meter_value < 0 else 0
elif self._sensor_id == "battery_discharge":
return meter_value if meter_value > 0 else 0
else:
return meter_value
@callback
def data_message_received(msg):
def _handle_data_message(self, msg) -> None:
"""Handle new MQTT messages from device/data topic."""
try:
payload = msg.payload
@@ -268,49 +276,11 @@ def data_message_received(msg):
except json.JSONDecodeError:
data = None
# data_get 响应,解析方式参考 data_transmission_example.py
# 处理 data_get 响应
if isinstance(data, dict) and data.get("cmd") == "data_get":
try:
target_meter_sn = str(self._meter_sn)
info = data.get("info", {})
dev_list = info.get("dev_list", [])
for dev in dev_list:
meter_list = dev.get("meter_list", [])
for meter in meter_list:
if not isinstance(meter, (list, tuple)) or len(meter) < 2:
continue
meter_sn = str(meter[0])
try:
meter_value_float = float(meter[1])
except (ValueError, TypeError):
continue
meter_value = int(meter_value_float) if meter_value_float == int(meter_value_float) else meter_value_float
if meter_sn != target_meter_sn:
continue
if self._sensor_id == "grid_import":
value = abs(meter_value) if meter_value < 0 else 0
elif self._sensor_id == "grid_export":
value = meter_value if meter_value > 0 else 0
elif self._sensor_id == "battery_charge":
value = abs(meter_value) if meter_value < 0 else 0
elif self._sensor_id == "battery_discharge":
value = meter_value if meter_value > 0 else 0
else:
value = meter_value
self._attr_native_value = value
self._attr_available = True
self.async_write_ha_state()
_LOGGER.debug(f"Updated {self._sensor_id} with value: {value}")
return
_LOGGER.debug(f"No matching data found for {self._sensor_id} in data_get payload")
except Exception as parse_err:
_LOGGER.error(f"Error parsing data_get response: {parse_err}")
value = self._parse_data_get_response(data)
if value is not None:
self._update_sensor_value(value)
return
# 兼容旧格式的数据
@@ -329,37 +299,34 @@ def data_message_received(msg):
self.async_write_ha_state()
return
if value is not None:
self._update_sensor_value(value)
except Exception as e:
_LOGGER.error(f"Error processing data message for {self._sensor_id}: {e}")
def _update_sensor_value(self, value: Any) -> None:
"""更新传感器值并通知 Home Assistant."""
self._attr_native_value = value
self._attr_available = True
self.async_write_ha_state()
_LOGGER.debug(f"Updated {self._sensor_id} with value: {value}")
except Exception as e:
_LOGGER.error(f"Error processing data message for {self._sensor_id}: {e}")
# 在回调函数中不能直接使用 await需要通过 async_create_task 执行异步操作
async def _resubscribe_topics():
"""重新订阅 MQTT 主题的异步函数"""
# 定义 LWT 消息处理回调
async def async_added_to_hass(self) -> None:
"""Set up the sensor."""
_LOGGER.info(f"JackeryHome sensor {self._sensor_id} added to Home Assistant")
# 创建 LWT 消息处理回调
@callback
def lwt_message_received(msg):
"""Handle LWT messages to get device serial number."""
try:
payload = msg.payload
if isinstance(payload, bytes):
payload = payload.decode("utf-8")
"""Callback wrapper for LWT messages."""
self._handle_lwt_message(msg)
_LOGGER.debug(f"Received LWT message: {payload}")
try:
data = json.loads(payload)
if isinstance(data, dict) and "gw_sn" in data:
self._device_sn = data["gw_sn"]
_LOGGER.info(f"Device serial number updated: {self._device_sn}")
except json.JSONDecodeError:
_LOGGER.warning(f"Failed to parse LWT message: {payload}")
except Exception as e:
_LOGGER.error(f"Error processing LWT message: {e}")
# 创建数据消息处理回调
@callback
def data_message_received(msg):
"""Callback wrapper for data messages."""
self._handle_data_message(msg)
# 订阅 LWT topic 以获取设备序列号
await ha_mqtt.async_subscribe(
@@ -377,34 +344,28 @@ def data_message_received(msg):
data_message_received,
1
)
_LOGGER.info(f"Subscribed to MQTT topic: {self._data_topic}")
# 启动定时器每隔5秒向 device/data-get 发送数据获取请求
self._data_task = asyncio.create_task(self._periodic_data_request())
self.hass.async_create_task(_resubscribe_topics())
def _construct_data_get_request(self) -> dict:
"""构造 data_get 格式的请求数据."""
data = {
return {
"cmd": "data_get",
"gw_sn": self._device_sn if self._device_sn else "",
"gw_sn": self._device_sn or "",
"timestamp": str(int(time.time() * 1000)),
"token": str(random.randint(1000, 9999)),
"info": {
"dev_list": [
{
"dev_sn": "ems_" + self._device_sn if self._device_sn else "ems_",
"meter_list": [
self._meter_sn,
]
"dev_sn": f"ems_{self._device_sn}" if self._device_sn else "ems_",
"meter_list": [self._meter_sn],
}
]
}
}
return data
def _parse_data_get_response(self, data: dict) -> Any:
"""解析 data_get 格式的响应数据.
@@ -419,61 +380,32 @@ def data_message_received(msg):
info = data.get("info", {})
dev_list = info.get("dev_list", [])
target_meter_sn = str(self._meter_sn)
for dev in dev_list:
meter_list = dev.get("meter_list", [])
for meter in meter_list:
# 响应格式meter 是 [meter_sn, meter_value] 格式的列表
if isinstance(meter, list) and len(meter) >= 2:
meter_sn_raw = meter[0]
meter_value_raw = meter[1]
if not isinstance(meter, (list, tuple)) or len(meter) < 2:
continue
meter_sn = str(meter[0])
if meter_sn != target_meter_sn:
continue
# 处理 meter_sn可能是字符串或整数统一转换为整数进行比较
try:
meter_sn = int(meter_sn_raw) if isinstance(meter_sn_raw, str) else int(meter_sn_raw)
meter_value_float = float(meter[1])
except (ValueError, TypeError):
meter_sn = meter_sn_raw
_LOGGER.debug(f"Invalid meter value for {self._sensor_id}: {meter[1]}")
continue
# 先转换为 float然后判断是否可以转换为 int
meter_value_float = float(meter_value_raw)
# 如果小数部分为 0则转换为 int否则保留 float
meter_value = int(meter_value_float) if meter_value_float == int(meter_value_float) else meter_value_float
# 检查是否匹配当前传感器的 meter_sn支持字符串和整数比较
if int(meter_sn) == int(self._meter_sn):
# 处理特殊逻辑(电网功率和电池功率)
if self._sensor_id == "grid_import":
# 电网功率:负值为购买,正值为出售
if meter_value < 0:
return abs(meter_value)
else:
return 0
elif self._sensor_id == "grid_export":
# 电网功率:负值为购买,正值为出售
if meter_value > 0:
return meter_value
else:
return 0
elif self._sensor_id == "battery_charge":
# 电池功率:负值为充电,正值为放电
if meter_value < 0:
return abs(meter_value)
else:
return 0
elif self._sensor_id == "battery_discharge":
# 电池功率:负值为充电,正值为放电
if meter_value > 0:
return meter_value
else:
return 0
else:
# 其他传感器直接返回值
return meter_value
# 请求格式meter 只是 meter_sn (整数),忽略请求
elif isinstance(meter, (int, float)):
# 这是请求格式,不是响应,忽略
pass
# 使用统一的方法处理 meter 值
return self._process_meter_value(meter_value)
_LOGGER.debug(f"No matching data found for {self._sensor_id} in data_get payload")
return None
except Exception as e:
@@ -486,31 +418,31 @@ def data_message_received(msg):
try:
# 如果还没有设备序列号,等待一段时间再重试
if not self._device_sn:
_LOGGER.debug(f"Device serial number not available yet for {self._sensor_id}, waiting...")
await asyncio.sleep(5)
_LOGGER.debug(
f"Device serial number not available yet for {self._sensor_id}, waiting..."
)
await asyncio.sleep(REQUEST_INTERVAL)
continue
# 构造 data_get 格式的请求
# 构造并发送 data_get 格式的请求
request_data = self._construct_data_get_request()
# 发送数据获取请求
topic = self._data_get_topic
await ha_mqtt.async_publish(
self.hass,
topic,
self._data_get_topic,
json.dumps(request_data, ensure_ascii=False),
1,
False
)
_LOGGER.debug(f"Sent data_get request for {self._sensor_id} (meter_sn: {self._meter_sn}) to {topic}")
_LOGGER.debug(
f"Sent data_get request for {self._sensor_id} "
f"(meter_sn: {self._meter_sn}) to {self._data_get_topic}"
)
# 等待5秒
await asyncio.sleep(5)
await asyncio.sleep(REQUEST_INTERVAL)
except Exception as e:
_LOGGER.error(f"Error in periodic data request for {self._sensor_id}: {e}")
# 出错时等待5秒再重试
await asyncio.sleep(5)
await asyncio.sleep(REQUEST_INTERVAL)
async def async_will_remove_from_hass(self) -> None:
"""Clean up when sensor is removed."""