Hi all,
I have written a python script that reads Zigbee2MQTT
device status (online/offline) and sends a message through telegram to me if a device(s) go offline or come back online. Tested a couple of weeks and looks alright.
Internally, there’s a dict that holds the current status of the devices. I also save it on the disk (in json format) so it could survive restarts.
Here’s the code:
# This is apps.yaml
z2m_status:
module: z2m_status
class: Z2MStatus
enable_log: false
app_switch: input_boolean.z2m_status_app # could be empty
notifiers:
- telegram_me # defined in configuration.yaml as a notifier with a chat id
mqtt_topic:
- zig2mqtt
- z2m
- zigbee2mqtt
Note that since I have three instances of Zigbee2MQTT with three different topics (don’t ask me why) I added all three above.
# This is z2m_status.py
import adbase as ad
import json
class Z2MStatus(ad.ADBase):
"""
Class to read zigbee2mqtt device status (online/offline)
"""
def initialize(self):
# ---------------------------------------------
# read args from yaml
# ---------------------------------------------
for arg, val in self.args.items():
setattr(self, arg, val)
# ---------------------------------------------
# mqtt
# ---------------------------------------------
self.hass = self.get_plugin_api("HASS")
self.mqtt = self.get_plugin_api("MQTT")
# ---------------------------------------------
# class inits:
# ---------------------------------------------
self.status = {}
self.filename = '/config/tmp/appdaemon_z2m_status.json'
self.load_status()
# ---------------------------------------------
# start listen methods:
# ---------------------------------------------
self.mqtt.listen_event(self.mqtt_on_message, namespace='mqtt', event='MQTT_MESSAGE') #, wildcard=self.mqtt_topic + "/+/availability")
def log(self, *kwargs):
if self.enable_log:
# super().log(kwargs)
self.hass.log(kwargs)
else:
return
def mqtt_on_message(self, event, data, kwargs):
"""
unpacks corresponding mqtt message
"""
if self.app_switch is not None and self.hass.get_state(self.app_switch) == 'off':
return
self.log('mqtt detected')
# This is a list of user's accepted topics e.g.: z2m, zig2mqtt, ...
mqtt_topics = [self.mqtt_topic] if isinstance(self.mqtt_topic, str) else self.mqtt_topic
payload = data['payload']
topic = data['topic']
topic_split = topic.split('/')
if len(topic_split) != 3:
return
if topic_split[0] not in mqtt_topics:
return
if topic_split[2] != 'availability':
return
self.log(f"{topic_split=} {payload=}")
main_topic = topic_split[0]
device = topic_split[1]
if device in self.status:
self.log('device exists in status dict')
previous_status = self.status[device]
if previous_status != "online" and payload == "online":
self.notify(main_topic, device, payload)
elif previous_status != "offline" and payload == "offline":
self.notify(main_topic, device, payload)
else:
if payload == "offline":
self.notify(main_topic, device, payload)
# update dict or add the key to dict
self.status[device] = payload
self.save_status()
def load_status(self):
"""
checks if the file exists and loads it
"""
self.log('loading')
try:
with open(self.filename) as f:
self.status = json.load(f)
self.log(f"loading done -> {self.status=}")
except FileNotFoundError:
self.log('File does not exist, using empty dict')
def save_status(self):
"""
Saves the status dict in json format
"""
self.log('saving')
try:
with open(self.filename, 'w') as f:
json.dump(self.status, f, indent=4)
self.log('saving done')
except Exception as e:
self.log(f'Saving Error -> {e}')
def notify(self, main_topic, device, payload):
msg = main_topic + '->' + device + ": " + payload
self.notify_telegram(message=msg)
def notify_telegram(self, message, title=''):
for notifier in self.notifiers:
self.hass.call_service(f'notify/{notifier}', title=title, message=message)
ToDo:
- Potentially the
listen event
could be done periodically, e.g. every 10 minutes, rather than constantly to give a break to the processor. - Find a way to listen to
wildcard topic(s)
rather than all. Looks like it’s not possible withAppdaemon
yet. Is it?