修复实体设备不显示问题

- 重写 sensor.py,使用 Home Assistant 内置 MQTT 组件
- 修复配置流程标题显示
- 添加更好的错误处理和日志记录
- 创建 MQTT 测试脚本
- 统一所有日志信息为 JackeryHome
This commit is contained in:
不求圣剑
2025-10-14 11:04:32 +08:00
parent 7f5b5492ca
commit 04821cb501
4 changed files with 173 additions and 187 deletions

97
test_mqtt.py Normal file
View File

@@ -0,0 +1,97 @@
#!/usr/bin/env python3
"""
测试 MQTT 数据发送脚本
用于测试 JackeryHome 集成是否能正确接收数据
"""
import json
import time
import paho.mqtt.client as mqtt
import random
# MQTT 配置
MQTT_BROKER = "192.168.0.101" # 修改为你的 MQTT Broker 地址
MQTT_PORT = 1883
TOPIC_PREFIX = "homeassistant/sensor"
def on_connect(client, userdata, flags, rc):
"""连接回调"""
if rc == 0:
print("✅ 成功连接到 MQTT Broker")
else:
print(f"❌ 连接失败,错误代码: {rc}")
def on_publish(client, userdata, mid):
"""发布回调"""
print(f"📤 消息已发布ID: {mid}")
def main():
"""主函数"""
print("🚀 启动 JackeryHome MQTT 测试脚本")
print(f"📡 MQTT Broker: {MQTT_BROKER}:{MQTT_PORT}")
print(f"📂 Topic Prefix: {TOPIC_PREFIX}")
print("-" * 50)
# 创建 MQTT 客户端
client = mqtt.Client(client_id="jackery_home_test", callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_publish = on_publish
try:
# 连接到 MQTT Broker
print("🔗 正在连接到 MQTT Broker...")
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()
# 等待连接
time.sleep(2)
# 模拟数据
sensors = {
"solar_power": {"min": 200, "max": 3000, "unit": "W"},
"home_power": {"min": 500, "max": 3500, "unit": "W"},
"grid_import": {"min": 0, "max": 2000, "unit": "W"},
"grid_export": {"min": 0, "max": 1500, "unit": "W"},
"battery_charge": {"min": 0, "max": 1000, "unit": "W"},
"battery_discharge": {"min": 0, "max": 1000, "unit": "W"},
"battery_soc": {"min": 20, "max": 100, "unit": "%"},
}
print("📊 开始发送测试数据...")
print("按 Ctrl+C 停止")
print("-" * 50)
count = 0
while True:
count += 1
print(f"\n🔄 第 {count} 轮数据发送:")
for sensor_id, config in sensors.items():
# 生成随机值
value = random.randint(config["min"], config["max"])
# 构建主题
topic = f"{TOPIC_PREFIX}/{sensor_id}/state"
# 发送数据
result = client.publish(topic, str(value))
if result.rc == mqtt.MQTT_ERR_SUCCESS:
print(f"{sensor_id}: {value} {config['unit']} -> {topic}")
else:
print(f"{sensor_id}: 发送失败")
# 等待 5 秒
time.sleep(5)
except KeyboardInterrupt:
print("\n\n⏹️ 用户中断,正在停止...")
except Exception as e:
print(f"\n❌ 发生错误: {e}")
finally:
# 清理资源
client.loop_stop()
client.disconnect()
print("🔚 测试脚本已停止")
if __name__ == "__main__":
main()