Here is a python script I’ve written to communicate with my Kello alarm clock.
Should hopefully give you an example to get started.
It mainly handles receiving messages from MQTT:
Click here to display code
#!/usr/bin/python
# -*- coding: utf-8 -*-
import time
import paho.mqtt.client as mqtt
import subprocess
import datetime
MQTT_Host = "your_mqtt_ip_address"
MQTT_Port = "1883"
MQTT_User = "your_mqtt_user"
MQTT_Password = "your_mqtt_password"
# Ensures main reader program gets killed if main program stops
def Goodbye():
print "Closing MQTT Connection for Kello..."
try:
client.disconnect() #disconnect
client.loop_stop() #stop loop
except:
pass
import atexit
atexit.register(Goodbye)
#define callbacks
def on_message(client, userdata, message):
if(str(message.payload.decode("utf-8")) == "build_greeting" ):
print("%s " % datetime.datetime.now().strftime("%d/%m/%y %H:%M") + str(message.payload.decode("utf-8")) )
cmd = 'echo ichmonix | sudo -S rm /var/www/html/audio/Kello_Greeting.mp3'
ps = subprocess.Popen(cmd, shell=True)
time.sleep(1)
cmd = 'echo ichmonix | sudo -S cp `ls -rt /home/cctv/docker_ha/tts/*.mp3 | tail -1` /var/www/html/audio/Kello_Greeting.mp3'
ps = subprocess.Popen(cmd, shell=True)
time.sleep(5)
cmd = 'echo ichmonix | sudo -S rm `ls -rt /home/cctv/docker_ha/tts/*.mp3`'
ps = subprocess.Popen(cmd, shell=True)
cmd = 'printf "\n0d41\r" | nc 192.168.0.17 4444 2>/dev/null'
ps = subprocess.Popen(cmd, shell=True)
if(str(message.payload.decode("utf-8")) == "play_greeting" ):
cmd = 'printf "\n0p41uri:http\:\/\/mydomain\/audio\/Kello_Greeting.mp3?time=' + str(time.time()) + '\r" | nc 192.168.0.17 4444'
ps = subprocess.Popen(cmd, shell=True)
def on_message_volume(client, userdata, message):
if(len(str(message.payload.decode("utf-8")))==1):
Vol = "0" + hex(int(str(message.payload.decode("utf-8"))))[2:]
else:
Vol = hex(int(str(message.payload.decode("utf-8"))))[2:]
cmd = 'printf "\n0p40' + Vol + '\r" | nc 192.168.0.17 4444 2>/dev/null'
ps = subprocess.Popen(cmd, shell=True)
def on_message_MuteAlarms(client, userdata, message):
if (str(message.payload.decode("utf-8")) == "ON" ):
cmd = 'printf "\n0p231\r" | nc 192.168.0.17 4444 2>/dev/null'
else:
cmd = 'printf "\n0d231\r" | nc 192.168.0.17 4444 2>/dev/null'
ps = subprocess.Popen(cmd, shell=True)
def on_message_PlayStatus(client, userdata, message):
if (str(message.payload.decode("utf-8")) == "OFF" ):
cmd = 'printf "\n0d41\r" | nc 192.168.0.17 4444 2>/dev/null'
ps = subprocess.Popen(cmd, shell=True)
def on_message_radio(client, userdata, message):
time.sleep(1)
print("%s " % datetime.datetime.now().strftime("%d/%m/%y %H:%M") + str(message.payload.decode("utf-8")) )
if (str(message.payload.decode("utf-8")) == "RTL2" ):
cmd = 'printf "\n0p41uri:http\:\/\/streaming.radio.rtl2.fr\:80\/rtl2-1-44-96\r" | nc 192.168.0.17 4444'
if (str(message.payload.decode("utf-8")) == "AbsoluteRadio" ):
cmd = 'printf "\n0p41uri:http\:\/\/icy-e-bab-04-cr.sharp-stream.com\/absoluteradio.mp3\r" | nc 192.168.0.17 4444'
if (str(message.payload.decode("utf-8")) == "HitWest" ):
cmd = 'printf "\n0p41uri:http\:\/\/broadcast.infomaniak.ch\/hitwest-high.mp3\r" | nc 192.168.0.17 4444'
if (str(message.payload.decode("utf-8")) == "Chill" ):
cmd = 'printf "\n0p41uri:http\:\/\/media-the.musicradio.com\/ChillMP3\r" | nc 192.168.0.17 4444'
if (str(message.payload.decode("utf-8")) == "WeatherForecast" ):
cmd = 'printf "\n0p41uri:http\:\/\/mydomain\/audio\/Kello_Greeting.mp3\r" | nc 192.168.0.17 4444'
if (str(message.payload.decode("utf-8")) == "ConservatoryWindowsLeftOpen" ):
cmd = 'printf "\n0p41uri:http\:\/\/mydomain\/audio\/Conservatory_Skylights_Left_Open.mp3\r" | nc 192.168.0.17 4444'
ps = subprocess.Popen(cmd, shell=True)
client = mqtt.Client("MQTT_Kello_Sub", clean_session=False) # must be unique on MQTT network
client.message_callback_add('Kello/PlayStatus', on_message_PlayStatus)
client.message_callback_add('Kello/Volume', on_message_volume)
client.message_callback_add('Kello/Radio', on_message_radio)
client.message_callback_add('Kello/MuteAlarms', on_message_MuteAlarms)
client.on_message=on_message
client.username_pw_set(str(MQTT_User),str(MQTT_Password))
client.connect(MQTT_Host, port=str(MQTT_Port), keepalive=60)
time.sleep(4)
client.subscribe("Kello/#", qos=1)#subscribe
print("Subscribing to Kello on MQTT")
client.loop_forever() #start loop to process received messages
while(True):
time.sleep(2)
If you need to simply send messages from your script to MQTT (as that’s what times out, I guess sending commands still works?), then it’s even easier.
Below is an example of a code sent from my Raspberry Pi to my HA server to report motion on a PIR connected to the Pi
MQTT_Command = ["mosquitto_pub", "-h", MQTT_Host, "-p", "1883", "-u", MQTT_User, "-P", MQTT_Password, "-t", "RPi/PIRConservatory", "-r", "-m", "ON"]
ps = subprocess.Popen(MQTT_Command)
All you’d have to do is to have a script that runs in a loop and use similar commands as your command_line in HA, and post to MQTT for HA to pick up