ZIgbee2MQTT device availability check

Hi all,
I have written a python script that reads Zigbee2MQTT device status (online/offline) and sends a message through telegram to me if a device(s) go offline or come back online. Tested a couple of weeks and looks alright.

Internally, there’s a dict that holds the current status of the devices. I also save it on the disk (in json format) so it could survive restarts.

Here’s the code:

# This is apps.yaml
z2m_status:
  module: z2m_status
  class: Z2MStatus
  enable_log: false
  app_switch: input_boolean.z2m_status_app  # could be empty
  notifiers:
    - telegram_me  # defined in configuration.yaml as a notifier with a chat id
  mqtt_topic: 
    - zig2mqtt
    - z2m
    - zigbee2mqtt

Note that since I have three instances of Zigbee2MQTT with three different topics (don’t ask me why) I added all three above.

# This is z2m_status.py
import adbase as ad
import json


class Z2MStatus(ad.ADBase):
    """
    Class to read zigbee2mqtt device status (online/offline)
    """

    def initialize(self):
        # ---------------------------------------------
        # read args from yaml
        # ---------------------------------------------
        for arg, val in self.args.items():
            setattr(self, arg, val)        

        # ---------------------------------------------
        # mqtt
        # ---------------------------------------------
        self.hass = self.get_plugin_api("HASS")
        self.mqtt = self.get_plugin_api("MQTT")

        # ---------------------------------------------
        # class inits:
        # ---------------------------------------------
        self.status = {}
        self.filename = '/config/tmp/appdaemon_z2m_status.json'
        self.load_status()

        # ---------------------------------------------
        # start listen methods:
        # ---------------------------------------------
        self.mqtt.listen_event(self.mqtt_on_message, namespace='mqtt', event='MQTT_MESSAGE')  #, wildcard=self.mqtt_topic + "/+/availability")


    def log(self, *kwargs):
        if self.enable_log:
            # super().log(kwargs)
            self.hass.log(kwargs)
        else:
            return
    
    def mqtt_on_message(self, event, data, kwargs):
        """
        unpacks corresponding mqtt message
        """
        if self.app_switch is not None and self.hass.get_state(self.app_switch) == 'off':
            return
        
        self.log('mqtt detected')

        # This is a list of user's accepted topics e.g.: z2m, zig2mqtt, ...
        mqtt_topics = [self.mqtt_topic] if isinstance(self.mqtt_topic, str) else self.mqtt_topic

        payload = data['payload']
        topic = data['topic']
        topic_split = topic.split('/')

        if len(topic_split) != 3:
            return
        if topic_split[0] not in mqtt_topics:
            return
        if topic_split[2] != 'availability':
            return

        self.log(f"{topic_split=} {payload=}")
        
        main_topic = topic_split[0]
        device = topic_split[1]

        if device in self.status:
            self.log('device exists in status dict')
            previous_status = self.status[device]
            if previous_status != "online" and payload == "online":
                self.notify(main_topic, device, payload)
            elif previous_status != "offline" and payload == "offline":
                self.notify(main_topic, device, payload)
        else:
            if payload == "offline":
                self.notify(main_topic, device, payload)
        
        # update dict or add the key to dict
        self.status[device] = payload
        self.save_status()

    def load_status(self):
        """
        checks if the file exists and loads it
        """
        self.log('loading')
        try:
            with open(self.filename) as f:
                self.status = json.load(f)
                self.log(f"loading done -> {self.status=}")
        except FileNotFoundError:
            self.log('File does not exist, using empty dict')

    def save_status(self):
        """
        Saves the status dict in json format
        """
        self.log('saving')
        try:
            with open(self.filename, 'w') as f:
                json.dump(self.status, f, indent=4)
                self.log('saving done')
        except Exception as e:
            self.log(f'Saving Error -> {e}')


    def notify(self, main_topic, device, payload):
        msg = main_topic + '->' + device + ": " + payload
        self.notify_telegram(message=msg)
    
    def notify_telegram(self, message, title=''):
        for notifier in self.notifiers:
            self.hass.call_service(f'notify/{notifier}', title=title, message=message)

ToDo:

  • Potentially the listen event could be done periodically, e.g. every 10 minutes, rather than constantly to give a break to the processor.
  • Find a way to listen to wildcard topic(s) rather than all. Looks like it’s not possible with Appdaemon yet. Is it?
2 Likes