fix: improve MQTT topic resubscription and error handling in JackeryHome sensor
- Refactored the error handling logic to use an asynchronous function for resubscribing to MQTT topics after an error occurs. - Added detailed logging for LWT message processing and device serial number updates. - Ensured that the periodic data request task is initiated after resubscribing to the necessary topics.
This commit is contained in:
@@ -336,27 +336,54 @@ def data_message_received(msg):
|
||||
|
||||
except Exception as e:
|
||||
_LOGGER.error(f"Error processing data message for {self._sensor_id}: {e}")
|
||||
# 订阅 LWT topic 以获取设备序列号
|
||||
await ha_mqtt.async_subscribe(
|
||||
self.hass,
|
||||
self._gw_lwt_topic,
|
||||
lwt_message_received,
|
||||
1
|
||||
)
|
||||
_LOGGER.info(f"Subscribed to MQTT topic: {self._gw_lwt_topic}")
|
||||
# 在回调函数中不能直接使用 await,需要通过 async_create_task 执行异步操作
|
||||
async def _resubscribe_topics():
|
||||
"""重新订阅 MQTT 主题的异步函数"""
|
||||
# 定义 LWT 消息处理回调
|
||||
@callback
|
||||
def lwt_message_received(msg):
|
||||
"""Handle LWT messages to get device serial number."""
|
||||
try:
|
||||
payload = msg.payload
|
||||
if isinstance(payload, bytes):
|
||||
payload = payload.decode("utf-8")
|
||||
|
||||
_LOGGER.debug(f"Received LWT message: {payload}")
|
||||
|
||||
try:
|
||||
data = json.loads(payload)
|
||||
if isinstance(data, dict) and "gw_sn" in data:
|
||||
self._device_sn = data["gw_sn"]
|
||||
_LOGGER.info(f"Device serial number updated: {self._device_sn}")
|
||||
except json.JSONDecodeError:
|
||||
_LOGGER.warning(f"Failed to parse LWT message: {payload}")
|
||||
|
||||
except Exception as e:
|
||||
_LOGGER.error(f"Error processing LWT message: {e}")
|
||||
|
||||
# 订阅 LWT topic 以获取设备序列号
|
||||
await ha_mqtt.async_subscribe(
|
||||
self.hass,
|
||||
self._gw_lwt_topic,
|
||||
lwt_message_received,
|
||||
1
|
||||
)
|
||||
_LOGGER.info(f"Subscribed to MQTT topic: {self._gw_lwt_topic}")
|
||||
|
||||
# 订阅 device/data topic
|
||||
await ha_mqtt.async_subscribe(
|
||||
self.hass,
|
||||
self._data_topic,
|
||||
data_message_received,
|
||||
1
|
||||
)
|
||||
|
||||
_LOGGER.info(f"Subscribed to MQTT topic: {self._data_topic}")
|
||||
|
||||
# 启动定时器,每隔5秒向 device/data-get 发送数据获取请求
|
||||
self._data_task = asyncio.create_task(self._periodic_data_request())
|
||||
|
||||
# 订阅 device/data topic
|
||||
await ha_mqtt.async_subscribe(
|
||||
self.hass,
|
||||
self._data_topic,
|
||||
data_message_received,
|
||||
1
|
||||
)
|
||||
|
||||
_LOGGER.info(f"Subscribed to MQTT topic: {self._data_topic}")
|
||||
|
||||
# 启动定时器,每隔5秒向 device/data-get 发送数据获取请求
|
||||
self._data_task = asyncio.create_task(self._periodic_data_request())
|
||||
self.hass.async_create_task(_resubscribe_topics())
|
||||
|
||||
|
||||
def _construct_data_get_request(self) -> dict:
|
||||
|
||||
Reference in New Issue
Block a user