Kincony KC868-H4 and H8

Hello,

I’m trying to develop something to have fully functional Kincony module.
Outputs can be controled by Red-node or command line switch for example:

switch:
  - platform: command_line
    switches:
      relay01:
        command_on: 'echo -n RELAY-SET-1,1,1|nc -w 1 192.168.1.18 4196'
        command_off: 'echo -n RELAY-SET-1,1,0|nc -w 1 192.168.1.18 4196'
        value_template: '{{ "1,OK" in value}}'
        friendly_name: "kincony 4IO Relais 1"

But for the inputs, I don’t find any easy way. The interface send “ALARM” message when an input change state (only off to on), and that’s all… Moreover, the “ALARM” message delay is very long.

It’s not interresting for me because I want to put push button on these inputs. Of course, it’s possible to use another way to have this function (like buy input module), but I prefer try whith what I have…

Unfortunately, I’m newbie in Home assistant. I already use this interface with Domoticz with succes, but graphism of Home Assistant is really better :slight_smile:
So I try to transpose my Domoticz plugin to Home Assistant.
Under Domoticz, the plugin was written in python so I install AppDaemon. After some difficulty, I can use with succes my interface under Home Assistant.

Here the Kincony.py, and lower the app.yaml. Maybe some comment are always in French, I’m sorry :wink:
In next post, I give explaination about how it running and problem I have.

kincony.py:

import appdaemon.plugins.hass.hassapi as hass
import os, sys, threading, time, select, socket, math

#
# Kincony App
#
# Args: address(txt), port(txt), friendly_name(txt), inputs(int), outputs(int), frequency(int)
#       frequency is the number of inputs read in one second
#

class Kincony(hass.Hass):

    def initialize(self):
        # Args to variables affectation
        self.address = self.args["address"]
        self.port = int(self.args["port"])
        self.friendlyName = self.args["friendly_name"]
        self.inputs = self.args["inputs"]
        self.outputs = self.args["outputs"]
        self.frequency = round(0.36*self.args["frequency"]*self.args["frequency"] - 4.5*self.args["frequency"] + 15.14)
        self.log("Multiples de 50ms : %s", self.frequency)
        self.connexionOK = False
        self.stopThread = False
        self.checkInputs = threading.Thread(name="ThreadCheckInputs", target = Kincony.KinconyCheckInputs, args= (self,))
        self.debug = self.args["debug"]
        self.run = self.args["activation"]

        # Logs
        self.log("Class Test initialization", "INFO")

        # Input sensor creation
        for i in range(1,self.inputs + 1):
            inputName = "binary_sensor.{}_Input_{}".format(self.friendlyName, i)
            friendlyName = "{} Input {}".format(self.friendlyName, i)
            self.set_state(inputName, state = "off", attributes = {"friendly_name": friendlyName})
        
        # Connexion
        self.connexionOK = self.KinconyConnexion()
        # TODO : after connexion, choice between reset outputs or read output state
        self.checkInputs.start()

    def terminate(self):
        self.log("Kincony : End")
        self.connexion_TCP.close()

    def KinconyCheckInputs(self):
        """
        Check loop of inputs state. 
        """
        # Log
        self.log("KinconyCheckInputs called", level="INFO")

        # Loop
        while self.run:
            self.Update(True, False)
            # for i in range(0,self.frequency):
            #     time.sleep(0.05)
            #     if self.stopThread:
            #         break
            if not self.run:
                break
        self.log("KinconyCheckInputs - Thread stop", level="INFO")

    def KinconyConnexion(self):
        # Device connexion
        self.connexion_TCP = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        self.connexion_TCP.settimeout(2)
        try:
            self.connexion_TCP.connect((self.address,self.port))
        except socket.timeout:
            self.log("KinconyConnexion - Comm error, timeout", level="ERROR")
            self.connexion_TCP.close()
            return False
        except Exception as err:
            self.log("KinconyConnexion - Error :'%s'", str(err), level="ERROR")
            self.connexion_TCP.close()
            return False
        self.log("KinconyConnexion - Try to join Kincony at IP:'%s'", self.address, level="INFO")
        KinconyRx = self.KinconyScan()
        if ("RELAY-SCAN_DEVICE-CHANNEL_" in KinconyRx) and (",OK" in KinconyRx):
            self.log("KinconyConnexion - Device detected, try to comm", level="INFO")
            KinconyRx = self.KinconyTest()
            if ("OK" in KinconyRx):
                self.log("KinconyConnexion - Comm OK with Kincony at IP:'%s'", self.address, level="INFO")
                return True
            else:
                self.log("KinconyConnexion - Comm error: '%s'", KinconyRx, level="ERROR")
                return False
        else:
            self.log("KinconyConnexion - Comm error: '%s'", KinconyRx, level="ERROR")
            return False

    def KinconyScan(self):
        # First time connexion
        # send RELAY-SCAN_DEVICE-NOW to the device and check the answer (shoud include 'OK')
        
        # Log
        self.log("KinconyScan called", level="INFO")
        
        # Send
        KinconyTx = "RELAY-SCAN_DEVICE-NOW"
        self.connexion_TCP.sendto(KinconyTx.encode(), (self.address,self.port))
        while True:
            try:
                KinconyRx = self.connexion_TCP.recv(256)
            except socket.timeout:
                self.log("KinconyScan - comm error, timeout", level="ERROR")
                self.connexion_TCP.close()
                self.connexion_ok = False
                return ("ERROR")
            except Exception as err:
                self.log("KinconyScan - Error : '%s'", str(err), level="ERROR")
                self.connexion_TCP.close()
                self.connexion_ok = False
                return ("ERROR")
            KinconyRx.decode()
            KinconyRx = str(KinconyRx)
            if "RELAY-ALARM" in KinconyRx:
                self.log("KinconyScan - ALARM", level="INFO")
                continue
            elif ("RELAY-SCAN_DEVICE" in KinconyRx) and (",OK" in KinconyRx):
                start = KinconyRx.find("RELAY-SCAN_DEVICE")
                end = (KinconyRx.find(",OK"))+3
                self.log("KinconyScan - receive OK :'%s'", KinconyRx[start:end], level="INFO")
                return(KinconyRx[start:end])
            else:
                self.log("KinconyReadInputs - Error", level="ERROR")
                return ("ERROR")

    def KinconyTest(self):
        # Second time connexion
        # Send 'RELAY-TEST-NOW' to the device and check the answer (should be 'HOST-TEST-START')
        
        # Log
        self.log("KinconyTest called", level="INFO")
        
        # Send
        KinconyTx = "RELAY-TEST-NOW"
        self.connexion_TCP.sendto(KinconyTx.encode(), (self.address,self.port))
        while True:
            try:
                KinconyRx = self.connexion_TCP.recv(256)
            except socket.timeout:
                self.log("KinconyTest - comm error, timeout", level= "ERROR")
                self.connexion_TCP.close()
                self.connexion_ok = False
                return ("ERROR")
            except Exception as err:
                self.log("KinconyTest - Error : '%s'", str(err), level= "ERROR")
                self.connexion_TCP.close()
                self.connexion_ok = False
                return ("ERROR")
            KinconyRx.decode()
            KinconyRx = str(KinconyRx)
            if "RELAY-ALARM" in KinconyRx:
                self.log("KinconyTest - ALARM", level="INFO")
                continue
            elif "HOST-TEST-START" in KinconyRx:
                self.log("KinconyTest - Comm OK", level="INFO")
                return ("OK")
            else:
                self.log("KinconyTest - Error : '%s'", KinconyRx, level= "ERROR")
                return("ERROR")

    def KinconyReadInputs(self):
        # Log
        self.log("KinconyReadInputs called", level="INFO")
        
        # Send
        KinconyTx = "RELAY-GET_INPUT-1"
        self.connexion_TCP.sendto(KinconyTx.encode(), (self.address,self.port))
        while True:
            try:
                KinconyRx = self.connexion_TCP.recv(256)
            except socket.timeout:
                self.log("KinconyReadInputs - comm error, timeout", level="ERROR")
                self.connexion_TCP.close()
                self.connexion_ok = False
                return ("ERROR")
            except Exception as err:
                self.log("KinconyReadInputs - Error :'%s'", str(err), level="ERROR")
                self.connexion_TCP.close()
                self.connexion_ok = False
                return ("ERROR")
            KinconyRx.decode()
            KinconyRx = str(KinconyRx)
            if "RELAY-ALARM" in KinconyRx:
                self.log("KinconyReadInputs - ALARM", level="INFO")
                continue
            elif ("RELAY-GET_INPUT-1," in KinconyRx) and (",OK" in KinconyRx):
                start = KinconyRx.find("RELAY-GET_INPUT-1,")
                end = KinconyRx.find(",OK")
                self.log("KinconyReadInputs - Receive OK :'%s'", KinconyRx[start:end+3], level="INFO")
                start = start + len("RELAY-GET_INPUT-1,")
                return(KinconyRx[start:end])
            else:
                self.log("KinconyReadInputs - Error '%s'", KinconyRx, level="ERROR")
                return ("ERROR")

    def KinconyReadOutputs(self):
        """
        Lecture des sorties de la carte. Retourne la valeur de chaque octet de sortie
        de la carte (entier, sous la forme octet3, octet2, octet1, octet0 pour une carte
        de 32 sorties, octet1, octet0 pour une carte à 16 sorties, octet0 pour les cartes
        avec 8 sorties et moins) si OK, sinon "ERROR"
        """
        # Log
        self.log("KinconyReadOutputs called", level="INFO")
        
        # Send
        KinconyTx = "RELAY-STATE-1"
        self.connexion_TCP.sendto(KinconyTx.encode(), (self.address,self.port))
        while True:
            try:
                KinconyRx = self.connexion_TCP.recv(256)
            except socket.timeout:
                self.log("KinconyReadOutputs - comm error, timeout", level="ERROR")
                self.connexion_TCP.close()
                self.connexion_ok = False
                return ("ERROR")
            except Exception as err:
                self.log("KinconyReadOutputs - Error :'%s'", str(err), level="ERROR")
                self.connexion_TCP.close()
                self.connexion_ok = False
                return ("ERROR")
            KinconyRx.decode()
            KinconyRx = str(KinconyRx)
            if "RELAY-ALARM" in KinconyRx:
                self.log("KinconyReadOutputs - ALARM", level="INFO")
                continue
            elif ("RELAY-STATE-1," in KinconyRx) and (",OK" in KinconyRx):
                start = KinconyRx.find("RELAY-STATE-1,")
                end = KinconyRx.find(",OK")
                self.log("KinconyReadOutputs - Receive OK :'%s'", KinconyRx[start:end+3], level="INFO")
                start = start + len("RELAY-STATE-1,")
                return(KinconyRx[start:end])
            else:
                self.log("KinconyReadOutputs - Error :'%s'", KinconyRx, level="ERROR")
                return ("ERROR")

    def Update(self, Inputs, Outputs):
        """
        Update inputs/outputs in Home Assistant
        """
        # Log
        self.log("Update called", level="INFO")
        
        # Inputs treatment
        if Inputs:
            # Inputs
            KinconyRx = self.KinconyReadInputs()
            if ("ERROR" in KinconyRx):
                self.log("Update - Input state error: '%s'", KinconyRx)
                return
            self.log("Update - inputs state OK :'%s'", KinconyRx, level="INFO")
            # Convert int to bin
            # transformation en binaire et inversion des bits (^255), suppression du '0b' ([2:]
            # et complétion du mot avec des 0 pour obtenir un mot de 8 bits (zfill(8))
            # conversion int value in str binary state of each input 
            inputsStates = bin(int(KinconyRx)^255)[2:].zfill(8)
            self.log("Inputs states : %s", inputsStates, level="INFO")
            bits = 7
            # mise à jour
            for i in range(1,self.inputs + 1):
                inputName = "binary_sensor.{}_Input_{}".format(self.friendlyName, i)
                self.set_state(inputName, state = "on" if inputsStates[bits] == "1" else "off")
                bits -= 1
                
        # # Outputs treatment
        # if Outputs:
        #     KinconyRx = self.KinconyReadOutputs()
        #     # Traitement STATE des sorties (lecture des sorties et mise à jour Domoticz)
        #     if ("ERROR" in KinconyRx):
        #         Domoticz.Error("UpdateDomoticz - Erreur réception état sorties: '" + KinconyRx + "'")
        #         return
        #     Debug("UpdateDomoticz - Réception état sorties OK : '" + KinconyRx + "'")
        #     mots = list()
        #     mots = KinconyRx.split(",")
        #     mots.reverse()
        #     nb_mots = len(mots)
        #     # Extraction du nombre de mots renvoyés par la carte
        #     if nb_mots != 1:
        #         for mot_en_cours in range(nb_mots):
        #             etat_sorties = bin(int(mots[mot_en_cours]))[2:].zfill(8)
        #             no_bit = 7
        #             for sortie in range(1+(mot_en_cours*8), 9+(mot_en_cours*8)):
        #                 if Devices[int(sortie)].nValue != int(etat_sorties[no_bit]):
        #                     Debug("UpdateDomoticz - Discordance valeur sortie " + str(sortie) + ", mise à jour")
        #                     Devices[int(sortie)].Update(nValue = int(etat_sorties[no_bit]), sValue = "On" if etat_sorties[no_bit] == 1 else "Off")
        #                 no_bit -= 1
        #     else:
        #         etat_sorties = bin(int(mots[0]))[2:].zfill(8)
        #         no_bit = 7
        #         for sortie in range(1, self.nb_sorties+1):
        #             if Devices[int(sortie)].nValue != int(etat_sorties[no_bit]):
        #                 Debug("UpdateDomoticz - Discordance valeur sortie " + str(sortie) + ", mise à jour")
        #                 Devices[int(sortie)].Update(nValue = int(etat_sorties[no_bit]), sValue = "On" if etat_sorties[no_bit] == 1 else "Off")
        #             no_bit -= 1

apps.yaml:

kincony:
  module: kincony
  class: Kincony
  friendly_name: 'Kincony_4IO'
  address: '192.168.1.18'
  port: '4196'
  inputs: 4
  outputs: 4
  frequency: 4
  debug: True
  activation: True

1 Like

How it work
In the init time, the input sensor are create if don’t exist, the connexion is initialized and the thread “CheckInputs” is launch.
This thread call the Update function, which call InputRead and (in futur) OutputRead to … update Home Assistant Inputs (and outputs) state. This thread is a loop and a time.sleep allow to change the scrutating frequency.

How to use
In the apps.yaml, give all the data:

  • friendly_name : used for ID too, avoid spaces (string)
  • address : IP of kincony device (string)
  • port : port of kincony device (string)
  • inputs : number of inputs (integer). Don’t work for more than 8 inputs
  • outputs : number of outpus (integer). Don’t work (yet) for more than 8 outputs
  • frequency : number of update per second (integer). Don’t work very well (see Troubleshooting)
  • debug : enable or disable INFO log (boolean). Will be implemented in few time.
  • activation : enable or disable the thread (boolean). Should be turn to False before kincony.py modification (because risk of multi thread if not stoped before mod)

Troubleshooting

  • The srutating delay is long, very long. Without time.sleep, I can ask inputs state every 500ms, that’s too much. I have to check if it’s possible to optimize code and increase the speed of the thread. Maybe with less log …
  • In case of thread error, no ‘restart’ function at this moment (maybe in futur). Under Domoticz, there is a ‘Heartbeat’ procedure and it’s easy to check, every x seconds if thread alway running.
  • The ouput command make sometimes somes error in the read input loop. A RELAY-SET command is sometimes read by the thread (when an output command occur) and the socket was cleaning, so the command isn’t send to the device.
  • I’m not happy with this code, I think it’s not … clean. Maybe exist a better way to program this.

As you can see, this custom component isn’t finished. So if someone could give me some idea to upgrade or help, it would be appreciated. Remember, I’m not a programmer and I’m new in Home Assistant so please, be gentle :grin: And I’m French, so sorry if my english is not good :slightly_smiling_face:

1 Like