How about just a simple tool you can use from the shell, and then pipe into other tools? Just something I’ve hacked up a year or two ago; it’s both quick and dirty. You’ll maybe need to install some python libraries, like paho MQTT if not already on your Linux/macOS/whatever box that can run python3. Season to taste.
#!/usr/bin/env python3
#
# Louis Mamakos <[email protected]>
#
import argparse
import paho.mqtt.client as mqtt
import json
import time
import re
import os
import sys
import logging
VERSION='0.2'
DEFAULT_MQTT_BROKER = os.getenv('MQTT_BROKER', '127.0.0.1')
DEFAULT_MQTT_PORT = int(os.getenv('MQTT_PORT', '1883'))
DEFAULT_MQTT_TOPICS = os.getenv('MQTT_TOPICS', '19916/info/#,$SYS/broker/log/#,owntracks/#').split(',')
def on_connect(client, opts, flags, rc):
if opts.verbose:
logging.info("Connected ({:d})".format(rc))
def on_message(client, opts, msg):
# unless we want to see retained messages, return
if msg.retain and not opts.retained:
return
# exclude messages based on simple substrings in the topic
if opts.exclude:
for pat in opts.exclude:
if pat in msg.topic:
return
# exclude messages based on regular expression matches against the topic
if opts.exclude_regex:
for regex in opts.exclude_regex:
if re.search(regex, msg.topic):
return
if not opts.json:
print("[{}] {}: {}".format(time.strftime('%Y-%m-%d %H:%M:%S'),msg.topic, msg.payload.decode()))
else:
payload = msg.payload.decode()
try:
p = json.loads(payload)
except json.decoder.JSONDecodeError:
p = payload
print(json.dumps( {
'topic': msg.topic,
'qos' : msg.qos,
'retain' : msg.retain,
'epochtime': time.time(),
'payload' : p,
} )
)
if opts.unbuffered:
sys.stdout.flush()
def main():
"""
Simple MQTT monitoring tool, can subscribe to one or more MQTT topics,
and optionally filter out the remaining messages by matching against the
topic with simple substrings or regular expressions. JSON formatted output
is also available for piping into, e.g., jq or other tools.
"""
parser = argparse.ArgumentParser(description=main.__doc__)
parser.add_argument('-j', '--json', action='store_true', help='JSON formatted output')
parser.add_argument('--qos', default=0, type=int, choices=[0,1,2], help='QoS of subscribe')
parser.add_argument('--broker', action='store', help='MQTT broker address/name', default=DEFAULT_MQTT_BROKER)
parser.add_argument('--port', action='store', type=int, help='MQTT broker port number', default=DEFAULT_MQTT_PORT)
parser.add_argument('--keepalive', action='store', type=int, default=60, help='MQTT session keepalive')
parser.add_argument('--retained', action='store_true', default=False, help='receive retained messages')
parser.add_argument('--tls', action='store_true', default=False, help='TLS session')
parser.add_argument('-v', '--verbose', action='store_true', default=False, help='verbose mode')
parser.add_argument('-d', '--debug', action='store_true', default=False, help='debug mode')
parser.add_argument('-x', '--exclude', action='append', metavar='substring', help='one or more topic substrings to exclude')
parser.add_argument('-X', '--exclude-regex', action='append', metavar='regex', help='one or more topic regex atterns to exclude')
parser.add_argument('-u', '--unbuffered', action='store_true', default=False, help="Unbuffered output")
parser.add_argument('--default-topics', action='store_true', default=False, help='subscribe to some default topics')
parser.add_argument('topics', nargs='*', default=None, help='list of MQTT topics to subscribe to')
args = parser.parse_args()
logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.WARNING)
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
elif args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
client = mqtt.Client(clean_session=True, userdata=args)
client.on_message = on_message
client.on_connect = on_connect
from socket import gaierror
try:
if args.tls:
import ssl
client.tls_set(ca_certs=None, certfile=None, keyfile=None, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
client.tls_insecure_set(True)
logging.debug("Attempt TLS connection to MQTT broker {}:{}".format(args.broker, args.port))
client.connect(args.broker, args.port, args.keepalive)
else:
logging.debug("Attempt connection to MQTT broker {}:{}".format(args.broker, args.port))
client.connect(args.broker, args.port, args.keepalive)
except gaierror:
logging.error("Can't connect to MQTT broker {}:{}".format(args.broker, args.port))
sys.exit(1)
if args.default_topics:
# default topics, from static list or environment variable if --default-topics option
# is used
topics = DEFAULT_MQTT_TOPICS
elif args.topics:
# or from topic specifed as arguments
topics = args.topics
else:
# otherwise, everything!
topics = ('#')
for topic in topics:
client.subscribe((topic, args.qos))
logging.info("Subscribe to {:s} qos {:d}".format(topic, args.qos))
client.loop_forever()
main()