diff --git a/custom_components/JackeryHome/config_flow.py b/custom_components/JackeryHome/config_flow.py index 34d6e53..1950024 100644 --- a/custom_components/JackeryHome/config_flow.py +++ b/custom_components/JackeryHome/config_flow.py @@ -21,7 +21,7 @@ DATA_SCHEMA = vol.Schema( ): str, vol.Required( "mqtt_broker", - default="192.168.0.101" + default="192.168.1.100" ): str, vol.Optional( "mqtt_port", diff --git a/custom_components/JackeryHome/sensor.py b/custom_components/JackeryHome/sensor.py index de4cfa8..e5779ef 100644 --- a/custom_components/JackeryHome/sensor.py +++ b/custom_components/JackeryHome/sensor.py @@ -2,6 +2,8 @@ import asyncio import json import logging +import time +import random from typing import Any from homeassistant.components import mqtt as ha_mqtt @@ -19,6 +21,23 @@ from . import DOMAIN _LOGGER = logging.getLogger(__name__) +# Meter SN 映射(传感器ID到meter_sn的映射) +METER_SN_MAP = { + "battery_soc": 21548033, + "solar_energy": 16961537, + "home_energy": 16936961, + "grid_import_energy": 16959489, + "grid_export_energy": 16960513, + "battery_charge_energy": 16952321, + "battery_discharge_energy": 16953345, + "solar_power": 1026001, + "home_power": 21171201, + "grid_import_power": 16930817, + "grid_export_power": 16930817, + "battery_charge_power": 16931841, + "battery_discharge_power": 16931841, +} + # 传感器配置 SENSORS = { # 功率传感器(实时监测) @@ -180,11 +199,14 @@ class JackeryHomeSensor(SensorEntity): "sw_version": "1.0.5", } self._topic = f"{topic_prefix}/{sensor_id}/state" - self._data_topic = "device/data" - self._data_get_topic = "device/data-get" + self._data_topic = "v1/iot_gw/cloud/data/#" # 接收设备响应数据的主题 + self._data_get_topic = "v1/iot_gw/cloud/data" # 发送数据请求的基础主题(需要加上 device_sn) + self._gw_lwt_topic = "v1/iot_gw/gw_lwt/#" self._attr_native_value = None self._attr_available = False self._data_task = None + self._device_sn = "" # 设备序列号(从 LWT 消息中获取) + self._meter_sn = METER_SN_MAP.get(sensor_id, 0) # 当前传感器的 meter_sn # 能源传感器标识 self._is_energy_sensor = device_class == SensorDeviceClass.ENERGY @@ -198,6 +220,28 @@ class JackeryHomeSensor(SensorEntity): """Set up the sensor.""" _LOGGER.info(f"JackeryHome sensor {self._sensor_id} added to Home Assistant") + # 订阅 LWT 主题以获取设备序列号 + @callback + def lwt_message_received(msg): + """Handle LWT messages to get device serial number.""" + try: + payload = msg.payload + if isinstance(payload, bytes): + payload = payload.decode("utf-8") + + _LOGGER.debug(f"Received LWT message: {payload}") + + try: + data = json.loads(payload) + if isinstance(data, dict) and "gw_sn" in data: + self._device_sn = data["gw_sn"] + _LOGGER.info(f"Device serial number updated: {self._device_sn}") + except json.JSONDecodeError: + _LOGGER.warning(f"Failed to parse LWT message: {payload}") + + except Exception as e: + _LOGGER.error(f"Error processing LWT message: {e}") + # 订阅 device/data topic 处理消息回调 @callback def data_message_received(msg): @@ -209,7 +253,24 @@ class JackeryHomeSensor(SensorEntity): _LOGGER.debug(f"Received data message for {self._sensor_id}: {payload}") - # 尝试解析 JSON + # 尝试解析 data_get 格式的数据 + try: + data = json.loads(payload) + # 检查是否是 data_get 格式 + if isinstance(data, dict) and data.get("cmd") == "data_get": + value = self._parse_data_get_response(data) + if value is not None: + self._attr_native_value = value + self._attr_available = True + self.async_write_ha_state() + _LOGGER.debug(f"Updated {self._sensor_id} with value: {value}") + else: + _LOGGER.debug(f"No matching data found for {self._sensor_id} in data_get response") + return + except json.JSONDecodeError: + pass + + # 兼容旧格式:尝试解析 JSON try: data = json.loads(payload) # 根据传感器ID从数据中提取对应的值 @@ -243,6 +304,15 @@ class JackeryHomeSensor(SensorEntity): except Exception as e: _LOGGER.error(f"Error processing data message for {self._sensor_id}: {e}") + # 订阅 LWT topic 以获取设备序列号 + await ha_mqtt.async_subscribe( + self.hass, + self._gw_lwt_topic, + lwt_message_received, + 1 + ) + _LOGGER.info(f"Subscribed to MQTT topic: {self._gw_lwt_topic}") + # 订阅 device/data topic await ha_mqtt.async_subscribe( self.hass, @@ -257,19 +327,112 @@ class JackeryHomeSensor(SensorEntity): self._data_task = asyncio.create_task(self._periodic_data_request()) + def _construct_data_get_request(self) -> dict: + """构造 data_get 格式的请求数据.""" + data = { + "cmd": "data_get", + "gw_sn": self._device_sn if self._device_sn else "", + "timestamp": time.time(), + "token": random.randint(1000, 9999), + "info": { + "dev_list": [ + { + "dev_sn": "ems_" + self._device_sn if self._device_sn else "ems_", + "meter_list": [ + self._meter_sn, + ] + } + ] + } + } + return data + + def _parse_data_get_response(self, data: dict) -> Any: + """解析 data_get 格式的响应数据. + + 请求格式: meter_list 中只包含 meter_sn (整数) + 响应格式: meter_list 中包含 [meter_sn, meter_value] (列表) + """ + try: + cmd = data.get("cmd") + if cmd != "data_get": + return None + + info = data.get("info", {}) + dev_list = info.get("dev_list", []) + + for dev in dev_list: + meter_list = dev.get("meter_list", []) + for meter in meter_list: + # 响应格式:meter 是 [meter_sn, meter_value] 格式的列表 + if isinstance(meter, list) and len(meter) >= 2: + meter_sn = meter[0] + meter_value = meter[1] + + # 检查是否匹配当前传感器的 meter_sn + if meter_sn == self._meter_sn: + # 处理特殊逻辑(电网功率和电池功率) + if self._sensor_id == "grid_import_power": + # 电网功率:负值为购买,正值为出售 + if meter_value < 0: + return abs(meter_value) + else: + return 0 + elif self._sensor_id == "grid_export_power": + # 电网功率:负值为购买,正值为出售 + if meter_value > 0: + return meter_value + else: + return 0 + elif self._sensor_id == "battery_charge_power": + # 电池功率:负值为充电,正值为放电 + if meter_value < 0: + return abs(meter_value) + else: + return 0 + elif self._sensor_id == "battery_discharge_power": + # 电池功率:负值为充电,正值为放电 + if meter_value > 0: + return meter_value + else: + return 0 + else: + # 其他传感器直接返回值 + return meter_value + # 请求格式:meter 只是 meter_sn (整数),忽略请求 + elif isinstance(meter, (int, float)): + # 这是请求格式,不是响应,忽略 + pass + + return None + + except Exception as e: + _LOGGER.error(f"Error parsing data_get response: {e}") + return None + async def _periodic_data_request(self) -> None: """Periodically send data request to device/data-get topic.""" while True: try: + # 如果还没有设备序列号,等待一段时间再重试 + if not self._device_sn: + _LOGGER.debug(f"Device serial number not available yet for {self._sensor_id}, waiting...") + await asyncio.sleep(5) + continue + + # 构造 data_get 格式的请求 + request_data = self._construct_data_get_request() + # 发送数据获取请求 + topic = f"{self._data_get_topic}/{self._device_sn}" if self._device_sn else self._data_get_topic await ha_mqtt.async_publish( self.hass, - self._data_get_topic, - json.dumps({"request": "data", "sensor": self._sensor_id}), + topic, + json.dumps(request_data, ensure_ascii=False), 1, False ) - _LOGGER.debug(f"Sent data request for {self._sensor_id} to {self._data_get_topic}") + _LOGGER.debug(f"Sent data_get request for {self._sensor_id} (meter_sn: {self._meter_sn}) to {topic}") # 等待5秒 await asyncio.sleep(5) @@ -297,4 +460,6 @@ class JackeryHomeSensor(SensorEntity): "mqtt_topic": self._topic, "data_topic": self._data_topic, "data_get_topic": self._data_get_topic, + "meter_sn": self._meter_sn, + "device_sn": self._device_sn, } \ No newline at end of file diff --git a/data_transmission_example.py b/data_transmission_example.py index 4eb1d15..373912f 100644 --- a/data_transmission_example.py +++ b/data_transmission_example.py @@ -10,14 +10,15 @@ import random import paho.mqtt.client as mqtt class DataTransmissionExample: - """数据传输示例类""" - def __init__(self, broker="192.168.0.101", port=1883): + + def __init__(self, broker="192.168.1.100", port=1883): self.broker = broker self.port = port self.client = None self.running = False - + self.device_status = "offline" + self.device_sn = "" # 初始化能源累积数据(基准从1kWh开始) self.energy_data = { "solar_energy": 1.0, @@ -27,10 +28,120 @@ class DataTransmissionExample: "battery_charge_energy": 1.0, "battery_discharge_energy": 1.0, } - + # 系统SOC + battery_soc = 21548033 + + ## 能量累计 + solar_energy = 16961537 + home_energy = 16936961 + grid_import_energy = 16959489 + grid_export_energy = 16960513 + battery_charge_energy = 16952321 + battery_discharge_energy = 16953345 + ## 实时功率 + solar_power = 1026001 + home_power = 21171201 + grid_import_power = 16930817 + grid_export_power = 16930817 + battery_charge_power = 16931841 + battery_discharge_power = 16931841 + ## 构造发送数据 + def construct_send_data(self): + data = { + "cmd": "data_get", + "gw_sn": self.device_sn, + "timestamp": time.time(), + ## 随机数 + "token": random.randint(1000, 9999), + "info": { + "dev_list": [ + { + "dev_sn": "ems_" + self.device_sn, + "meter_list": [ + self.battery_soc, + self.solar_energy, + self.home_energy, + self.grid_import_energy, + self.grid_export_energy, + self.battery_charge_energy, + self.battery_discharge_energy, + self.solar_power, + self.home_power, + self.grid_import_power, + self.grid_export_power, + self.battery_charge_power, + self.battery_discharge_power, + ] + } + ] + } + } + return data + ## 解析数据 + def parse_data(self, payload): + data = json.loads(payload) + cmd = data["cmd"] + gw_sn = data["gw_sn"] + token = data["token"] + timestamp = data["timestamp"] + info = data["info"] + dev_list = info["dev_list"] + for dev in dev_list: + dev_sn = dev["dev_sn"] + meter_list = dev["meter_list"] + for meter in meter_list: + meter_sn = meter[0] + meter_value = meter[1] + print(f"📨 收到设备数据: {dev_sn} {meter_sn} {meter_value}") + if meter_sn == self.battery_soc: + self.battery_soc = meter_value + print(f"📨 收到电池电量: {self.battery_soc}") + if meter_sn == self.solar_energy: + self.solar_energy = meter_value + print(f"📨 收到太阳能能量: {self.solar_energy}") + if meter_sn == self.home_energy: + self.home_energy = meter_value + print(f"📨 收到家庭能量: {self.home_energy}") + if meter_sn == self.grid_import_energy: + self.grid_import_energy = meter_value + print(f"📨 收到电网购买能量: {self.grid_import_energy}") + if meter_sn == self.grid_export_energy: + self.grid_export_energy = meter_value + print(f"📨 收到电网出售能量: {self.grid_export_energy}") + if meter_sn == self.battery_charge_energy: + self.battery_charge_energy = meter_value + print(f"📨 收到电池充电能量: {self.battery_charge_energy}") + if meter_sn == self.battery_discharge_energy: + self.battery_discharge_energy = meter_value + print(f"📨 收到电池放电能量: {self.battery_discharge_energy}") + if meter_sn == self.solar_power: + self.solar_power = meter_value + print(f"📨 收到太阳能功率: {self.solar_power}") + if meter_sn == self.home_power: + self.home_power = meter_value + print(f"📨 收到家庭功率: {self.home_power}") + ## 电网功率 负值为购买,正值为出售 + if meter_sn == self.grid_import_power: + self.grid_import_power = meter_value + if meter_value < 0: + self.grid_import_power = -meter_value + print(f"📨 收到电网购买功率: {self.grid_import_power}") + else: + self.grid_export_power = meter_value + print(f"📨 收到电网出售功率: {self.grid_export_power}") + ## 电池充放电功率 负值为充电,正值为放电 + if meter_sn == self.battery_charge_power: + self.battery_charge_power = meter_value + if meter_value < 0: + self.battery_charge_power = -meter_value + print(f"📨 收到电池充电功率: {self.battery_charge_power}") + else: + self.battery_discharge_power = meter_value + print(f"📨 收到电池放电功率: {self.battery_discharge_power}") + def setup_mqtt(self): """设置 MQTT 客户端""" - self.client = mqtt.Client(client_id="energy_device_simulator", callback_api_version=mqtt.CallbackAPIVersion.VERSION2) + self.client = mqtt.Client(client_id="energy_device", callback_api_version=mqtt.CallbackAPIVersion.VERSION2) self.client.on_connect = self.on_connect self.client.on_message = self.on_message @@ -39,100 +150,44 @@ class DataTransmissionExample: if rc == 0: print("✅ 连接到 MQTT 代理成功") # 订阅数据获取请求主题 - client.subscribe("device/data-get") - print("✅ 订阅 device/data-get 主题成功") + client.subscribe("v1/iot_gw/cloud/data/#") + client.subscribe("v1/iot_gw/gw_lwt/") + print("✅ 订阅 v1/iot_gw/cloud/data/# 主题成功") + print("✅ 订阅 v1/iot_gw/gw_lwt/ 主题成功") else: print(f"❌ 连接 MQTT 代理失败,错误码: {rc}") def on_message(self, client, userdata, msg): """MQTT 消息接收回调""" - if msg.topic == "device/data-get": + if msg.topic == "v1/iot_gw/cloud/data/#": print(f"📨 收到数据请求: {msg.payload.decode()}") - # 模拟处理时间 - # time.sleep(0.1) - # 发送模拟数据 - # self.send_device_data() + # 解析JSON + data = json.loads(msg.payload) + self.parse_data(data) + if msg.topic == "v1/iot_gw/gw_lwt/#": + print(f"📨 收到设备状态: {msg.payload.decode()}") + # 解析JSON + data = json.loads(msg.payload) + self.device_sn = data["gw_sn"] + info = data["info"] + print(f"📨 收到设备状态: {self.device_sn} {info}") + # 更新设备状态 + self.device_status = info + print(f"📨 设备状态: {self.device_status}") - def generate_sample_data(self): - """生成模拟的设备数据""" - # 模拟太阳能发电(白天较高,夜晚较低) - hour = time.localtime().tm_hour - if 6 <= hour <= 18: # 白天 - solar_power = random.uniform(500, 3000) - else: # 夜晚 - solar_power = random.uniform(0, 100) - - # 模拟家庭用电 - home_power = random.uniform(800, 2500) - - # 计算电网功率(家庭用电 - 太阳能发电) - grid_power = home_power - solar_power - - # 分离电网功率为购买和出售 - grid_import = max(0, grid_power) # 从电网购买 - grid_export = max(0, -grid_power) # 向电网出售 - - # 模拟电池充放电 - battery_power = random.uniform(-800, 800) - battery_charge = max(0, -battery_power) # 充电 - battery_discharge = max(0, battery_power) # 放电 - - # 模拟电池电量(根据充放电状态变化) - if not hasattr(self, 'battery_soc'): - self.battery_soc = random.uniform(30, 90) - - # 根据充放电更新电量 - if battery_power > 0: # 放电 - self.battery_soc = max(0, self.battery_soc - 0.5) - elif battery_power < 0: # 充电 - self.battery_soc = min(100, self.battery_soc + 0.3) - - # 更新能源累积数据(每次增加0.1kWh) - # 根据功率值确定能源增长方向 - if solar_power > 0: - self.energy_data["solar_energy"] += 0.1 - if home_power > 0: - self.energy_data["home_energy"] += 0.1 - if grid_import > 0: - self.energy_data["grid_import_energy"] += 0.1 - if grid_export > 0: - self.energy_data["grid_export_energy"] += 0.1 - if battery_charge > 0: - self.energy_data["battery_charge_energy"] += 0.1 - if battery_discharge > 0: - self.energy_data["battery_discharge_energy"] += 0.1 - - return { - # 功率数据(实时监测) - "solar_power": round(solar_power, 2), - "home_power": round(home_power, 2), - "grid_import": round(grid_import, 2), - "grid_export": round(grid_export, 2), - "battery_charge": round(battery_charge, 2), - "battery_discharge": round(battery_discharge, 2), - "battery_soc": round(self.battery_soc, 1), - # 能源数据(累积值) - "solar_energy": round(self.energy_data["solar_energy"], 3), - "home_energy": round(self.energy_data["home_energy"], 3), - "grid_import_energy": round(self.energy_data["grid_import_energy"], 3), - "grid_export_energy": round(self.energy_data["grid_export_energy"], 3), - "battery_charge_energy": round(self.energy_data["battery_charge_energy"], 3), - "battery_discharge_energy": round(self.energy_data["battery_discharge_energy"], 3), - } def send_device_data(self): - """发送设备数据到 /device/data 主题""" - data = self.generate_sample_data() + data = self.construct_send_data() # 转换为 JSON 格式 json_data = json.dumps(data, ensure_ascii=False, indent=2) # 发布到 device/data 主题 - result = self.client.publish("device/data", json_data) + result = self.client.publish("v1/iot_gw/cloud/data/"+self.device_sn, json_data) if result.rc == mqtt.MQTT_ERR_SUCCESS: print("📤 发送设备数据:") - print(f" 主题: device/data") + print(f" 主题: v1/iot_gw/cloud/data/{self.device_sn}") print(f" 数据: {json_data}") print() else: @@ -187,13 +242,6 @@ def main(): """主函数""" print("🏠 Energy Monitor 数据传输示例") print("=" * 50) - print("这个示例演示了以下功能:") - print("1. 监听 device/data-get 请求") - print("2. 响应请求并发送设备数据到 device/data") - print("3. 模拟真实的能源监控数据(功率 + 能源累积)") - print("4. 功率数据:实时变化的功率值") - print("5. 能源数据:累积值,基准从1kWh开始,每次增加0.1kWh") - print("6. 每5秒的数据获取频率(由 Home Assistant 集成触发)") print() # 创建示例实例 diff --git a/main.py b/main.py index 03bbd9d..273a8c9 100644 --- a/main.py +++ b/main.py @@ -3,9 +3,9 @@ import time import random import paho.mqtt.client as mqtt -MQTT_BROKER = "192.168.0.101" +MQTT_BROKER = "192.168.1.100" MQTT_PORT = 1883 -MQTT_CLIENT_ID = "hem_simulator" +MQTT_CLIENT_ID = "jackery_home_simulator" # 模式控制变量 # 0: 自发自用模式 (Self-consumption)