Hello,
I’m trying to develop something to have fully functional Kincony module.
Outputs can be controled by Red-node or command line switch for example:
switch:
- platform: command_line
switches:
relay01:
command_on: 'echo -n RELAY-SET-1,1,1|nc -w 1 192.168.1.18 4196'
command_off: 'echo -n RELAY-SET-1,1,0|nc -w 1 192.168.1.18 4196'
value_template: '{{ "1,OK" in value}}'
friendly_name: "kincony 4IO Relais 1"
But for the inputs, I don’t find any easy way. The interface send “ALARM” message when an input change state (only off to on), and that’s all… Moreover, the “ALARM” message delay is very long.
It’s not interresting for me because I want to put push button on these inputs. Of course, it’s possible to use another way to have this function (like buy input module), but I prefer try whith what I have…
Unfortunately, I’m newbie in Home assistant. I already use this interface with Domoticz with succes, but graphism of Home Assistant is really better
So I try to transpose my Domoticz plugin to Home Assistant.
Under Domoticz, the plugin was written in python so I install AppDaemon. After some difficulty, I can use with succes my interface under Home Assistant.
Here the Kincony.py, and lower the app.yaml. Maybe some comment are always in French, I’m sorry
In next post, I give explaination about how it running and problem I have.
kincony.py:
import appdaemon.plugins.hass.hassapi as hass
import os, sys, threading, time, select, socket, math
#
# Kincony App
#
# Args: address(txt), port(txt), friendly_name(txt), inputs(int), outputs(int), frequency(int)
# frequency is the number of inputs read in one second
#
class Kincony(hass.Hass):
def initialize(self):
# Args to variables affectation
self.address = self.args["address"]
self.port = int(self.args["port"])
self.friendlyName = self.args["friendly_name"]
self.inputs = self.args["inputs"]
self.outputs = self.args["outputs"]
self.frequency = round(0.36*self.args["frequency"]*self.args["frequency"] - 4.5*self.args["frequency"] + 15.14)
self.log("Multiples de 50ms : %s", self.frequency)
self.connexionOK = False
self.stopThread = False
self.checkInputs = threading.Thread(name="ThreadCheckInputs", target = Kincony.KinconyCheckInputs, args= (self,))
self.debug = self.args["debug"]
self.run = self.args["activation"]
# Logs
self.log("Class Test initialization", "INFO")
# Input sensor creation
for i in range(1,self.inputs + 1):
inputName = "binary_sensor.{}_Input_{}".format(self.friendlyName, i)
friendlyName = "{} Input {}".format(self.friendlyName, i)
self.set_state(inputName, state = "off", attributes = {"friendly_name": friendlyName})
# Connexion
self.connexionOK = self.KinconyConnexion()
# TODO : after connexion, choice between reset outputs or read output state
self.checkInputs.start()
def terminate(self):
self.log("Kincony : End")
self.connexion_TCP.close()
def KinconyCheckInputs(self):
"""
Check loop of inputs state.
"""
# Log
self.log("KinconyCheckInputs called", level="INFO")
# Loop
while self.run:
self.Update(True, False)
# for i in range(0,self.frequency):
# time.sleep(0.05)
# if self.stopThread:
# break
if not self.run:
break
self.log("KinconyCheckInputs - Thread stop", level="INFO")
def KinconyConnexion(self):
# Device connexion
self.connexion_TCP = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.connexion_TCP.settimeout(2)
try:
self.connexion_TCP.connect((self.address,self.port))
except socket.timeout:
self.log("KinconyConnexion - Comm error, timeout", level="ERROR")
self.connexion_TCP.close()
return False
except Exception as err:
self.log("KinconyConnexion - Error :'%s'", str(err), level="ERROR")
self.connexion_TCP.close()
return False
self.log("KinconyConnexion - Try to join Kincony at IP:'%s'", self.address, level="INFO")
KinconyRx = self.KinconyScan()
if ("RELAY-SCAN_DEVICE-CHANNEL_" in KinconyRx) and (",OK" in KinconyRx):
self.log("KinconyConnexion - Device detected, try to comm", level="INFO")
KinconyRx = self.KinconyTest()
if ("OK" in KinconyRx):
self.log("KinconyConnexion - Comm OK with Kincony at IP:'%s'", self.address, level="INFO")
return True
else:
self.log("KinconyConnexion - Comm error: '%s'", KinconyRx, level="ERROR")
return False
else:
self.log("KinconyConnexion - Comm error: '%s'", KinconyRx, level="ERROR")
return False
def KinconyScan(self):
# First time connexion
# send RELAY-SCAN_DEVICE-NOW to the device and check the answer (shoud include 'OK')
# Log
self.log("KinconyScan called", level="INFO")
# Send
KinconyTx = "RELAY-SCAN_DEVICE-NOW"
self.connexion_TCP.sendto(KinconyTx.encode(), (self.address,self.port))
while True:
try:
KinconyRx = self.connexion_TCP.recv(256)
except socket.timeout:
self.log("KinconyScan - comm error, timeout", level="ERROR")
self.connexion_TCP.close()
self.connexion_ok = False
return ("ERROR")
except Exception as err:
self.log("KinconyScan - Error : '%s'", str(err), level="ERROR")
self.connexion_TCP.close()
self.connexion_ok = False
return ("ERROR")
KinconyRx.decode()
KinconyRx = str(KinconyRx)
if "RELAY-ALARM" in KinconyRx:
self.log("KinconyScan - ALARM", level="INFO")
continue
elif ("RELAY-SCAN_DEVICE" in KinconyRx) and (",OK" in KinconyRx):
start = KinconyRx.find("RELAY-SCAN_DEVICE")
end = (KinconyRx.find(",OK"))+3
self.log("KinconyScan - receive OK :'%s'", KinconyRx[start:end], level="INFO")
return(KinconyRx[start:end])
else:
self.log("KinconyReadInputs - Error", level="ERROR")
return ("ERROR")
def KinconyTest(self):
# Second time connexion
# Send 'RELAY-TEST-NOW' to the device and check the answer (should be 'HOST-TEST-START')
# Log
self.log("KinconyTest called", level="INFO")
# Send
KinconyTx = "RELAY-TEST-NOW"
self.connexion_TCP.sendto(KinconyTx.encode(), (self.address,self.port))
while True:
try:
KinconyRx = self.connexion_TCP.recv(256)
except socket.timeout:
self.log("KinconyTest - comm error, timeout", level= "ERROR")
self.connexion_TCP.close()
self.connexion_ok = False
return ("ERROR")
except Exception as err:
self.log("KinconyTest - Error : '%s'", str(err), level= "ERROR")
self.connexion_TCP.close()
self.connexion_ok = False
return ("ERROR")
KinconyRx.decode()
KinconyRx = str(KinconyRx)
if "RELAY-ALARM" in KinconyRx:
self.log("KinconyTest - ALARM", level="INFO")
continue
elif "HOST-TEST-START" in KinconyRx:
self.log("KinconyTest - Comm OK", level="INFO")
return ("OK")
else:
self.log("KinconyTest - Error : '%s'", KinconyRx, level= "ERROR")
return("ERROR")
def KinconyReadInputs(self):
# Log
self.log("KinconyReadInputs called", level="INFO")
# Send
KinconyTx = "RELAY-GET_INPUT-1"
self.connexion_TCP.sendto(KinconyTx.encode(), (self.address,self.port))
while True:
try:
KinconyRx = self.connexion_TCP.recv(256)
except socket.timeout:
self.log("KinconyReadInputs - comm error, timeout", level="ERROR")
self.connexion_TCP.close()
self.connexion_ok = False
return ("ERROR")
except Exception as err:
self.log("KinconyReadInputs - Error :'%s'", str(err), level="ERROR")
self.connexion_TCP.close()
self.connexion_ok = False
return ("ERROR")
KinconyRx.decode()
KinconyRx = str(KinconyRx)
if "RELAY-ALARM" in KinconyRx:
self.log("KinconyReadInputs - ALARM", level="INFO")
continue
elif ("RELAY-GET_INPUT-1," in KinconyRx) and (",OK" in KinconyRx):
start = KinconyRx.find("RELAY-GET_INPUT-1,")
end = KinconyRx.find(",OK")
self.log("KinconyReadInputs - Receive OK :'%s'", KinconyRx[start:end+3], level="INFO")
start = start + len("RELAY-GET_INPUT-1,")
return(KinconyRx[start:end])
else:
self.log("KinconyReadInputs - Error '%s'", KinconyRx, level="ERROR")
return ("ERROR")
def KinconyReadOutputs(self):
"""
Lecture des sorties de la carte. Retourne la valeur de chaque octet de sortie
de la carte (entier, sous la forme octet3, octet2, octet1, octet0 pour une carte
de 32 sorties, octet1, octet0 pour une carte à 16 sorties, octet0 pour les cartes
avec 8 sorties et moins) si OK, sinon "ERROR"
"""
# Log
self.log("KinconyReadOutputs called", level="INFO")
# Send
KinconyTx = "RELAY-STATE-1"
self.connexion_TCP.sendto(KinconyTx.encode(), (self.address,self.port))
while True:
try:
KinconyRx = self.connexion_TCP.recv(256)
except socket.timeout:
self.log("KinconyReadOutputs - comm error, timeout", level="ERROR")
self.connexion_TCP.close()
self.connexion_ok = False
return ("ERROR")
except Exception as err:
self.log("KinconyReadOutputs - Error :'%s'", str(err), level="ERROR")
self.connexion_TCP.close()
self.connexion_ok = False
return ("ERROR")
KinconyRx.decode()
KinconyRx = str(KinconyRx)
if "RELAY-ALARM" in KinconyRx:
self.log("KinconyReadOutputs - ALARM", level="INFO")
continue
elif ("RELAY-STATE-1," in KinconyRx) and (",OK" in KinconyRx):
start = KinconyRx.find("RELAY-STATE-1,")
end = KinconyRx.find(",OK")
self.log("KinconyReadOutputs - Receive OK :'%s'", KinconyRx[start:end+3], level="INFO")
start = start + len("RELAY-STATE-1,")
return(KinconyRx[start:end])
else:
self.log("KinconyReadOutputs - Error :'%s'", KinconyRx, level="ERROR")
return ("ERROR")
def Update(self, Inputs, Outputs):
"""
Update inputs/outputs in Home Assistant
"""
# Log
self.log("Update called", level="INFO")
# Inputs treatment
if Inputs:
# Inputs
KinconyRx = self.KinconyReadInputs()
if ("ERROR" in KinconyRx):
self.log("Update - Input state error: '%s'", KinconyRx)
return
self.log("Update - inputs state OK :'%s'", KinconyRx, level="INFO")
# Convert int to bin
# transformation en binaire et inversion des bits (^255), suppression du '0b' ([2:]
# et complétion du mot avec des 0 pour obtenir un mot de 8 bits (zfill(8))
# conversion int value in str binary state of each input
inputsStates = bin(int(KinconyRx)^255)[2:].zfill(8)
self.log("Inputs states : %s", inputsStates, level="INFO")
bits = 7
# mise à jour
for i in range(1,self.inputs + 1):
inputName = "binary_sensor.{}_Input_{}".format(self.friendlyName, i)
self.set_state(inputName, state = "on" if inputsStates[bits] == "1" else "off")
bits -= 1
# # Outputs treatment
# if Outputs:
# KinconyRx = self.KinconyReadOutputs()
# # Traitement STATE des sorties (lecture des sorties et mise à jour Domoticz)
# if ("ERROR" in KinconyRx):
# Domoticz.Error("UpdateDomoticz - Erreur réception état sorties: '" + KinconyRx + "'")
# return
# Debug("UpdateDomoticz - Réception état sorties OK : '" + KinconyRx + "'")
# mots = list()
# mots = KinconyRx.split(",")
# mots.reverse()
# nb_mots = len(mots)
# # Extraction du nombre de mots renvoyés par la carte
# if nb_mots != 1:
# for mot_en_cours in range(nb_mots):
# etat_sorties = bin(int(mots[mot_en_cours]))[2:].zfill(8)
# no_bit = 7
# for sortie in range(1+(mot_en_cours*8), 9+(mot_en_cours*8)):
# if Devices[int(sortie)].nValue != int(etat_sorties[no_bit]):
# Debug("UpdateDomoticz - Discordance valeur sortie " + str(sortie) + ", mise à jour")
# Devices[int(sortie)].Update(nValue = int(etat_sorties[no_bit]), sValue = "On" if etat_sorties[no_bit] == 1 else "Off")
# no_bit -= 1
# else:
# etat_sorties = bin(int(mots[0]))[2:].zfill(8)
# no_bit = 7
# for sortie in range(1, self.nb_sorties+1):
# if Devices[int(sortie)].nValue != int(etat_sorties[no_bit]):
# Debug("UpdateDomoticz - Discordance valeur sortie " + str(sortie) + ", mise à jour")
# Devices[int(sortie)].Update(nValue = int(etat_sorties[no_bit]), sValue = "On" if etat_sorties[no_bit] == 1 else "Off")
# no_bit -= 1
apps.yaml:
kincony:
module: kincony
class: Kincony
friendly_name: 'Kincony_4IO'
address: '192.168.1.18'
port: '4196'
inputs: 4
outputs: 4
frequency: 4
debug: True
activation: True