Files
homeassistant-jackery/data_transmission_example.py
不求圣剑 a3d46e8f7a feat: 增加能源数据模拟发送功能
- 在data_transmission_example.py中增加energy类型数据的模拟发送
- 能源数据基准从1kWh开始,每次增加0.1kWh
- 简化传感器处理逻辑,直接显示累积值而不进行计算
- 支持所有能源类型:太阳能、家庭用电、电网购买/出售、电池充放电
2025-10-15 10:49:38 +08:00

208 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
数据传输示例
演示如何使用修改后的 Energy Monitor 系统进行数据传输
"""
import json
import time
import random
import paho.mqtt.client as mqtt
class DataTransmissionExample:
"""数据传输示例类"""
def __init__(self, broker="192.168.0.101", port=1883):
self.broker = broker
self.port = port
self.client = None
self.running = False
# 初始化能源累积数据基准从1kWh开始
self.energy_data = {
"solar_energy": 1.0,
"home_energy": 1.0,
"grid_import_energy": 1.0,
"grid_export_energy": 1.0,
"battery_charge_energy": 1.0,
"battery_discharge_energy": 1.0,
}
def setup_mqtt(self):
"""设置 MQTT 客户端"""
self.client = mqtt.Client(client_id="energy_device_simulator", callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
def on_connect(self, client, userdata, flags, rc, properties):
"""MQTT 连接回调"""
if rc == 0:
print("✅ 连接到 MQTT 代理成功")
# 订阅数据获取请求主题
client.subscribe("device/data-get")
print("✅ 订阅 device/data-get 主题成功")
else:
print(f"❌ 连接 MQTT 代理失败,错误码: {rc}")
def on_message(self, client, userdata, msg):
"""MQTT 消息接收回调"""
if msg.topic == "device/data-get":
print(f"📨 收到数据请求: {msg.payload.decode()}")
# 模拟处理时间
# time.sleep(0.1)
# 发送模拟数据
# self.send_device_data()
def generate_sample_data(self):
"""生成模拟的设备数据"""
# 模拟太阳能发电(白天较高,夜晚较低)
hour = time.localtime().tm_hour
if 6 <= hour <= 18: # 白天
solar_power = random.uniform(500, 3000)
else: # 夜晚
solar_power = random.uniform(0, 100)
# 模拟家庭用电
home_power = random.uniform(800, 2500)
# 计算电网功率(家庭用电 - 太阳能发电)
grid_power = home_power - solar_power
# 分离电网功率为购买和出售
grid_import = max(0, grid_power) # 从电网购买
grid_export = max(0, -grid_power) # 向电网出售
# 模拟电池充放电
battery_power = random.uniform(-800, 800)
battery_charge = max(0, -battery_power) # 充电
battery_discharge = max(0, battery_power) # 放电
# 模拟电池电量(根据充放电状态变化)
if not hasattr(self, 'battery_soc'):
self.battery_soc = random.uniform(30, 90)
# 根据充放电更新电量
if battery_power > 0: # 放电
self.battery_soc = max(0, self.battery_soc - 0.5)
elif battery_power < 0: # 充电
self.battery_soc = min(100, self.battery_soc + 0.3)
# 更新能源累积数据每次增加0.1kWh
# 根据功率值确定能源增长方向
if solar_power > 0:
self.energy_data["solar_energy"] += 0.1
if home_power > 0:
self.energy_data["home_energy"] += 0.1
if grid_import > 0:
self.energy_data["grid_import_energy"] += 0.1
if grid_export > 0:
self.energy_data["grid_export_energy"] += 0.1
if battery_charge > 0:
self.energy_data["battery_charge_energy"] += 0.1
if battery_discharge > 0:
self.energy_data["battery_discharge_energy"] += 0.1
return {
# 功率数据(实时监测)
"solar_power": round(solar_power, 2),
"home_power": round(home_power, 2),
"grid_import": round(grid_import, 2),
"grid_export": round(grid_export, 2),
"battery_charge": round(battery_charge, 2),
"battery_discharge": round(battery_discharge, 2),
"battery_soc": round(self.battery_soc, 1),
# 能源数据(累积值)
"solar_energy": round(self.energy_data["solar_energy"], 3),
"home_energy": round(self.energy_data["home_energy"], 3),
"grid_import_energy": round(self.energy_data["grid_import_energy"], 3),
"grid_export_energy": round(self.energy_data["grid_export_energy"], 3),
"battery_charge_energy": round(self.energy_data["battery_charge_energy"], 3),
"battery_discharge_energy": round(self.energy_data["battery_discharge_energy"], 3),
}
def send_device_data(self):
"""发送设备数据到 /device/data 主题"""
data = self.generate_sample_data()
# 转换为 JSON 格式
json_data = json.dumps(data, ensure_ascii=False, indent=2)
# 发布到 device/data 主题
result = self.client.publish("device/data", json_data)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
print("📤 发送设备数据:")
print(f" 主题: device/data")
print(f" 数据: {json_data}")
print()
else:
print(f"❌ 发送数据失败,错误码: {result.rc}")
def start_simulation(self, duration=60):
"""启动数据模拟"""
print("🚀 启动数据传输模拟")
print(f"📡 MQTT 代理: {self.broker}:{self.port}")
print(f"⏱️ 运行时长: {duration}")
print("=" * 50)
try:
# 连接 MQTT 代理
self.client.connect(self.broker, self.port, 60)
self.client.loop_start()
self.running = True
# 等待连接建立
time.sleep(2)
# 发送初始数据
print("📤 发送初始数据...")
self.send_device_data()
# 运行指定时间
start_time = time.time()
while self.running and (time.time() - start_time) < duration:
time.sleep(1)
# 每5秒发送一次数据模拟设备主动发送
if int(time.time() - start_time) % 5 == 0:
print("📤 设备主动发送数据...")
self.send_device_data()
except KeyboardInterrupt:
print("\n⏹️ 用户中断模拟")
except Exception as e:
print(f"❌ 模拟出错: {e}")
finally:
self.stop_simulation()
def stop_simulation(self):
"""停止模拟"""
self.running = False
if self.client:
self.client.loop_stop()
self.client.disconnect()
print("✅ 模拟已停止")
def main():
"""主函数"""
print("🏠 Energy Monitor 数据传输示例")
print("=" * 50)
print("这个示例演示了以下功能:")
print("1. 监听 device/data-get 请求")
print("2. 响应请求并发送设备数据到 device/data")
print("3. 模拟真实的能源监控数据(功率 + 能源累积)")
print("4. 功率数据:实时变化的功率值")
print("5. 能源数据累积值基准从1kWh开始每次增加0.1kWh")
print("6. 每5秒的数据获取频率由 Home Assistant 集成触发)")
print()
# 创建示例实例
example = DataTransmissionExample()
example.setup_mqtt()
# 启动模拟运行60秒
example.start_simulation(duration=1000)
if __name__ == "__main__":
main()