feat: enhance data transmission and sensor handling

- Added new energy data points and updated the initialization of energy variables in data_transmission_example.py.
- Improved data construction and parsing methods to handle new data formats and ensure compatibility with updated MQTT topics.
- Refined sensor logic in JackeryHome component to correctly map meter serial numbers and handle power values more robustly.
- Ensured consistent data types for meter values and improved error handling during data processing.
This commit is contained in:
不求圣剑
2025-11-18 14:45:26 +08:00
parent 054fa172f2
commit 1c3933d5e3
2 changed files with 113 additions and 79 deletions

View File

@@ -206,6 +206,12 @@ class JackeryHomeSensor(SensorEntity):
self._attr_available = False self._attr_available = False
self._data_task = None self._data_task = None
self._device_sn = "" # 设备序列号(从 LWT 消息中获取) self._device_sn = "" # 设备序列号(从 LWT 消息中获取)
# 获取 meter_sn对于功率传感器使用对应的 _power 键
if sensor_id in ["grid_import", "grid_export"]:
self._meter_sn = METER_SN_MAP.get("grid_import_power", 0)
elif sensor_id in ["battery_charge", "battery_discharge"]:
self._meter_sn = METER_SN_MAP.get("battery_charge_power", 0)
else:
self._meter_sn = METER_SN_MAP.get(sensor_id, 0) # 当前传感器的 meter_sn self._meter_sn = METER_SN_MAP.get(sensor_id, 0) # 当前传感器的 meter_sn
# 能源传感器标识 # 能源传感器标识
@@ -332,8 +338,8 @@ class JackeryHomeSensor(SensorEntity):
data = { data = {
"cmd": "data_get", "cmd": "data_get",
"gw_sn": self._device_sn if self._device_sn else "", "gw_sn": self._device_sn if self._device_sn else "",
"timestamp": time.time(), "timestamp": str(int(time.time() * 1000)),
"token": random.randint(1000, 9999), "token": str(random.randint(1000, 9999)),
"info": { "info": {
"dev_list": [ "dev_list": [
{ {
@@ -366,31 +372,42 @@ class JackeryHomeSensor(SensorEntity):
for meter in meter_list: for meter in meter_list:
# 响应格式meter 是 [meter_sn, meter_value] 格式的列表 # 响应格式meter 是 [meter_sn, meter_value] 格式的列表
if isinstance(meter, list) and len(meter) >= 2: if isinstance(meter, list) and len(meter) >= 2:
meter_sn = meter[0] meter_sn_raw = meter[0]
meter_value = meter[1] meter_value_raw = meter[1]
# 检查是否匹配当前传感器的 meter_sn # 处理 meter_sn可能是字符串或整数统一转换为整数进行比较
if meter_sn == self._meter_sn: try:
meter_sn = int(meter_sn_raw) if isinstance(meter_sn_raw, str) else int(meter_sn_raw)
except (ValueError, TypeError):
meter_sn = meter_sn_raw
# 先转换为 float然后判断是否可以转换为 int
meter_value_float = float(meter_value_raw)
# 如果小数部分为 0则转换为 int否则保留 float
meter_value = int(meter_value_float) if meter_value_float == int(meter_value_float) else meter_value_float
# 检查是否匹配当前传感器的 meter_sn支持字符串和整数比较
if int(meter_sn) == int(self._meter_sn):
# 处理特殊逻辑(电网功率和电池功率) # 处理特殊逻辑(电网功率和电池功率)
if self._sensor_id == "grid_import_power": if self._sensor_id == "grid_import":
# 电网功率:负值为购买,正值为出售 # 电网功率:负值为购买,正值为出售
if meter_value < 0: if meter_value < 0:
return abs(meter_value) return abs(meter_value)
else: else:
return 0 return 0
elif self._sensor_id == "grid_export_power": elif self._sensor_id == "grid_export":
# 电网功率:负值为购买,正值为出售 # 电网功率:负值为购买,正值为出售
if meter_value > 0: if meter_value > 0:
return meter_value return meter_value
else: else:
return 0 return 0
elif self._sensor_id == "battery_charge_power": elif self._sensor_id == "battery_charge":
# 电池功率:负值为充电,正值为放电 # 电池功率:负值为充电,正值为放电
if meter_value < 0: if meter_value < 0:
return abs(meter_value) return abs(meter_value)
else: else:
return 0 return 0
elif self._sensor_id == "battery_discharge_power": elif self._sensor_id == "battery_discharge":
# 电池功率:负值为充电,正值为放电 # 电池功率:负值为充电,正值为放电
if meter_value > 0: if meter_value > 0:
return meter_value return meter_value

View File

@@ -8,7 +8,21 @@ import json
import time import time
import random import random
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
battery_soc_point = "21548033"
## 能量累计
solar_energy_point = "16961537"
home_energy_point = "16936961"
grid_import_energy_point = "16959489"
grid_export_energy_point = "16960513"
battery_charge_energy_point = "16952321"
battery_discharge_energy_point = "16953345"
## 实时功率
solar_power_point = "1026001"
home_power_point = "21171201"
grid_import_power_point = "16930817"
grid_export_power_point = "16930817"
battery_charge_power_point = "16931841"
battery_discharge_power_point = "16931841"
class DataTransmissionExample: class DataTransmissionExample:
@@ -19,58 +33,50 @@ class DataTransmissionExample:
self.running = False self.running = False
self.device_status = "offline" self.device_status = "offline"
self.device_sn = "" self.device_sn = ""
# 初始化能源累积数据基准从1kWh开始 self.battery_soc = 0
self.energy_data = { self.solar_energy = 0
"solar_energy": 1.0, self.solar_energy = 0
"home_energy": 1.0, self.home_energy = 0
"grid_import_energy": 1.0, self.grid_import_energy = 0
"grid_export_energy": 1.0, self.grid_export_energy = 0
"battery_charge_energy": 1.0, self.battery_charge_energy = 0
"battery_discharge_energy": 1.0, self.battery_discharge_energy = 0
}
# 系统SOC self.energy_data = {
battery_soc = 21548033 "solar_energy": 0,
"home_energy": 0,
"grid_import_energy": 0,
"grid_export_energy": 0,
"battery_charge_energy": 0,
"battery_discharge_energy": 0,
}
## 能量累计
solar_energy = 16961537
home_energy = 16936961
grid_import_energy = 16959489
grid_export_energy = 16960513
battery_charge_energy = 16952321
battery_discharge_energy = 16953345
## 实时功率
solar_power = 1026001
home_power = 21171201
grid_import_power = 16930817
grid_export_power = 16930817
battery_charge_power = 16931841
battery_discharge_power = 16931841
## 构造发送数据 ## 构造发送数据
def construct_send_data(self): def construct_send_data(self):
data = { data = {
"cmd": "data_get", "cmd": "data_get",
"gw_sn": self.device_sn, "gw_sn": self.device_sn,
"timestamp": time.time(), "timestamp": str(int(time.time() * 1000)),
## 随机数 ## 随机数是字符串
"token": random.randint(1000, 9999), "token": str(random.randint(1000, 9999)),
"info": { "info": {
"dev_list": [ "dev_list": [
{ {
"dev_sn": "ems_" + self.device_sn, "dev_sn": "ems_" + self.device_sn,
"meter_list": [ "meter_list": [
self.battery_soc, battery_soc_point,
self.solar_energy, solar_energy_point,
self.home_energy, home_energy_point,
self.grid_import_energy, grid_import_energy_point,
self.grid_export_energy, grid_export_energy_point,
self.battery_charge_energy, battery_charge_energy_point,
self.battery_discharge_energy, battery_discharge_energy_point,
self.solar_power, solar_power_point,
self.home_power, home_power_point,
self.grid_import_power, grid_import_power_point,
self.grid_export_power, grid_export_power_point,
self.battery_charge_power, battery_charge_power_point,
self.battery_discharge_power, battery_discharge_power_point,
] ]
} }
] ]
@@ -79,11 +85,11 @@ class DataTransmissionExample:
return data return data
## 解析数据 ## 解析数据
def parse_data(self, payload): def parse_data(self, payload):
# payload 已经是字典类型,不需要再次解析
if isinstance(payload, str):
data = json.loads(payload) data = json.loads(payload)
cmd = data["cmd"] else:
gw_sn = data["gw_sn"] data = payload
token = data["token"]
timestamp = data["timestamp"]
info = data["info"] info = data["info"]
dev_list = info["dev_list"] dev_list = info["dev_list"]
for dev in dev_list: for dev in dev_list:
@@ -91,37 +97,40 @@ class DataTransmissionExample:
meter_list = dev["meter_list"] meter_list = dev["meter_list"]
for meter in meter_list: for meter in meter_list:
meter_sn = meter[0] meter_sn = meter[0]
meter_value = meter[1] # 先转换为 float然后判断是否可以转换为 int
meter_value_float = float(meter[1])
# 如果小数部分为 0则转换为 int否则保留 float
meter_value = int(meter_value_float) if meter_value_float == int(meter_value_float) else meter_value_float
print(f"📨 收到设备数据: {dev_sn} {meter_sn} {meter_value}") print(f"📨 收到设备数据: {dev_sn} {meter_sn} {meter_value}")
if meter_sn == self.battery_soc: if meter_sn == battery_soc_point:
self.battery_soc = meter_value self.battery_soc = meter_value
print(f"📨 收到电池电量: {self.battery_soc}") print(f"📨 收到电池电量: {self.battery_soc}")
if meter_sn == self.solar_energy: if meter_sn == solar_energy_point:
self.solar_energy = meter_value self.solar_energy = meter_value
print(f"📨 收到太阳能能量: {self.solar_energy}") print(f"📨 收到太阳能能量: {self.solar_energy}")
if meter_sn == self.home_energy: if meter_sn == home_energy_point:
self.home_energy = meter_value self.home_energy = meter_value
print(f"📨 收到家庭能量: {self.home_energy}") print(f"📨 收到家庭能量: {self.home_energy}")
if meter_sn == self.grid_import_energy: if meter_sn == grid_import_energy_point:
self.grid_import_energy = meter_value self.grid_import_energy = meter_value
print(f"📨 收到电网购买能量: {self.grid_import_energy}") print(f"📨 收到电网购买能量: {self.grid_import_energy}")
if meter_sn == self.grid_export_energy: if meter_sn == grid_export_energy_point:
self.grid_export_energy = meter_value self.grid_export_energy = meter_value
print(f"📨 收到电网出售能量: {self.grid_export_energy}") print(f"📨 收到电网出售能量: {self.grid_export_energy}")
if meter_sn == self.battery_charge_energy: if meter_sn == battery_charge_energy_point:
self.battery_charge_energy = meter_value self.battery_charge_energy = meter_value
print(f"📨 收到电池充电能量: {self.battery_charge_energy}") print(f"📨 收到电池充电能量: {self.battery_charge_energy}")
if meter_sn == self.battery_discharge_energy: if meter_sn == battery_discharge_energy_point:
self.battery_discharge_energy = meter_value self.battery_discharge_energy = meter_value
print(f"📨 收到电池放电能量: {self.battery_discharge_energy}") print(f"📨 收到电池放电能量: {self.battery_discharge_energy}")
if meter_sn == self.solar_power: if meter_sn == solar_power_point:
self.solar_power = meter_value self.solar_power = meter_value
print(f"📨 收到太阳能功率: {self.solar_power}") print(f"📨 收到太阳能功率: {self.solar_power}")
if meter_sn == self.home_power: if meter_sn == home_power_point:
self.home_power = meter_value self.home_power = meter_value
print(f"📨 收到家庭功率: {self.home_power}") print(f"📨 收到家庭功率: {self.home_power}")
## 电网功率 负值为购买,正值为出售 ## 电网功率 负值为购买,正值为出售
if meter_sn == self.grid_import_power: if meter_sn == grid_import_power_point:
self.grid_import_power = meter_value self.grid_import_power = meter_value
if meter_value < 0: if meter_value < 0:
self.grid_import_power = -meter_value self.grid_import_power = -meter_value
@@ -130,7 +139,7 @@ class DataTransmissionExample:
self.grid_export_power = meter_value self.grid_export_power = meter_value
print(f"📨 收到电网出售功率: {self.grid_export_power}") print(f"📨 收到电网出售功率: {self.grid_export_power}")
## 电池充放电功率 负值为充电,正值为放电 ## 电池充放电功率 负值为充电,正值为放电
if meter_sn == self.battery_charge_power: if meter_sn == battery_charge_power_point:
self.battery_charge_power = meter_value self.battery_charge_power = meter_value
if meter_value < 0: if meter_value < 0:
self.battery_charge_power = -meter_value self.battery_charge_power = -meter_value
@@ -150,21 +159,21 @@ class DataTransmissionExample:
if rc == 0: if rc == 0:
print("✅ 连接到 MQTT 代理成功") print("✅ 连接到 MQTT 代理成功")
# 订阅数据获取请求主题 # 订阅数据获取请求主题
client.subscribe("v1/iot_gw/cloud/data/#") client.subscribe("v1/iot_gw/gw/data")
client.subscribe("v1/iot_gw/gw_lwt/") client.subscribe("v1/iot_gw/gw_lwt")
print("✅ 订阅 v1/iot_gw/cloud/data/# 主题成功") print("✅ 订阅 v1/iot_gw/gw_data 主题成功")
print("✅ 订阅 v1/iot_gw/gw_lwt/ 主题成功") print("✅ 订阅 v1/iot_gw/gw_lwt 主题成功")
else: else:
print(f"❌ 连接 MQTT 代理失败,错误码: {rc}") print(f"❌ 连接 MQTT 代理失败,错误码: {rc}")
def on_message(self, client, userdata, msg): def on_message(self, client, userdata, msg):
"""MQTT 消息接收回调""" """MQTT 消息接收回调"""
if msg.topic == "v1/iot_gw/cloud/data/#": if msg.topic == "v1/iot_gw/gw/data":
print(f"📨 收到数据请求: {msg.payload.decode()}") print(f"📨 收到数据请求: {msg.payload.decode()}")
# 解析JSON # 解析JSON
data = json.loads(msg.payload) data = json.loads(msg.payload)
self.parse_data(data) self.parse_data(data)
if msg.topic == "v1/iot_gw/gw_lwt/#": if msg.topic == "v1/iot_gw/gw_lwt":
print(f"📨 收到设备状态: {msg.payload.decode()}") print(f"📨 收到设备状态: {msg.payload.decode()}")
# 解析JSON # 解析JSON
data = json.loads(msg.payload) data = json.loads(msg.payload)
@@ -183,11 +192,11 @@ class DataTransmissionExample:
json_data = json.dumps(data, ensure_ascii=False, indent=2) json_data = json.dumps(data, ensure_ascii=False, indent=2)
# 发布到 device/data 主题 # 发布到 device/data 主题
result = self.client.publish("v1/iot_gw/cloud/data/"+self.device_sn, json_data) result = self.client.publish("v1/iot_gw/cloud/data", json_data)
if result.rc == mqtt.MQTT_ERR_SUCCESS: if result.rc == mqtt.MQTT_ERR_SUCCESS:
print("📤 发送设备数据:") print("📤 发送设备数据:")
print(f" 主题: v1/iot_gw/cloud/data/{self.device_sn}") print(f" 主题: v1/iot_gw/cloud/data")
print(f" 数据: {json_data}") print(f" 数据: {json_data}")
print() print()
else: else:
@@ -211,7 +220,11 @@ class DataTransmissionExample:
# 发送初始数据 # 发送初始数据
print("📤 发送初始数据...") print("📤 发送初始数据...")
if self.device_sn != "":
self.send_device_data() self.send_device_data()
else:
print("❌ 设备SN为空无法发送数据")
# return
# 运行指定时间 # 运行指定时间
start_time = time.time() start_time = time.time()
@@ -221,7 +234,11 @@ class DataTransmissionExample:
# 每5秒发送一次数据模拟设备主动发送 # 每5秒发送一次数据模拟设备主动发送
if int(time.time() - start_time) % 5 == 0: if int(time.time() - start_time) % 5 == 0:
print("📤 设备主动发送数据...") print("📤 设备主动发送数据...")
if self.device_sn != "":
self.send_device_data() self.send_device_data()
else:
print("❌ 设备SN为空无法发送数据")
# return
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n⏹️ 用户中断模拟") print("\n⏹️ 用户中断模拟")