-
Firstly I add serial port to my system (because it’s vitural connected via Virtual Machine eSXI), then I get new /dev/ttyUSB0 device and then when I start AD and my application I see the initialization message. Then I restart the application (by changing somecode there) or it is possible to stop AD addon → in this case I see “close” message
-
It is USB-RS485 reader.
-
Just the regular data, nothing special. A kind of Modbus protocol
-
As soon as data is requested from AD
-
yes.
-
I use VM with DEbian Linux installed inside. VM is under eSXI solution
-
Version supervisor-2021.04.0
import hassapi as hass
import datetime
import serial
import json
import mqttapi as mqtt
import struct
import binascii
from binascii import a2b_hex, b2a_hex
INITIAL_MODBUS = 0xFFFF
INITIAL_DF1 = 0x0000
table = (
0x0000, 0xC0C1, 0xC181, 0x0140, 0xC301, 0x03C0, 0x0280, 0xC241,
0xC601, 0x06C0, 0x0780, 0xC741, 0x0500, 0xC5C1, 0xC481, 0x0440,
0xCC01, 0x0CC0, 0x0D80, 0xCD41, 0x0F00, 0xCFC1, 0xCE81, 0x0E40,
0x0A00, 0xCAC1, 0xCB81, 0x0B40, 0xC901, 0x09C0, 0x0880, 0xC841,
0xD801, 0x18C0, 0x1980, 0xD941, 0x1B00, 0xDBC1, 0xDA81, 0x1A40,
0x1E00, 0xDEC1, 0xDF81, 0x1F40, 0xDD01, 0x1DC0, 0x1C80, 0xDC41,
0x1400, 0xD4C1, 0xD581, 0x1540, 0xD701, 0x17C0, 0x1680, 0xD641,
0xD201, 0x12C0, 0x1380, 0xD341, 0x1100, 0xD1C1, 0xD081, 0x1040,
0xF001, 0x30C0, 0x3180, 0xF141, 0x3300, 0xF3C1, 0xF281, 0x3240,
0x3600, 0xF6C1, 0xF781, 0x3740, 0xF501, 0x35C0, 0x3480, 0xF441,
0x3C00, 0xFCC1, 0xFD81, 0x3D40, 0xFF01, 0x3FC0, 0x3E80, 0xFE41,
0xFA01, 0x3AC0, 0x3B80, 0xFB41, 0x3900, 0xF9C1, 0xF881, 0x3840,
0x2800, 0xE8C1, 0xE981, 0x2940, 0xEB01, 0x2BC0, 0x2A80, 0xEA41,
0xEE01, 0x2EC0, 0x2F80, 0xEF41, 0x2D00, 0xEDC1, 0xEC81, 0x2C40,
0xE401, 0x24C0, 0x2580, 0xE541, 0x2700, 0xE7C1, 0xE681, 0x2640,
0x2200, 0xE2C1, 0xE381, 0x2340, 0xE101, 0x21C0, 0x2080, 0xE041,
0xA001, 0x60C0, 0x6180, 0xA141, 0x6300, 0xA3C1, 0xA281, 0x6240,
0x6600, 0xA6C1, 0xA781, 0x6740, 0xA501, 0x65C0, 0x6480, 0xA441,
0x6C00, 0xACC1, 0xAD81, 0x6D40, 0xAF01, 0x6FC0, 0x6E80, 0xAE41,
0xAA01, 0x6AC0, 0x6B80, 0xAB41, 0x6900, 0xA9C1, 0xA881, 0x6840,
0x7800, 0xB8C1, 0xB981, 0x7940, 0xBB01, 0x7BC0, 0x7A80, 0xBA41,
0xBE01, 0x7EC0, 0x7F80, 0xBF41, 0x7D00, 0xBDC1, 0xBC81, 0x7C40,
0xB401, 0x74C0, 0x7580, 0xB541, 0x7700, 0xB7C1, 0xB681, 0x7640,
0x7200, 0xB2C1, 0xB381, 0x7340, 0xB101, 0x71C0, 0x7080, 0xB041,
0x5000, 0x90C1, 0x9181, 0x5140, 0x9301, 0x53C0, 0x5280, 0x9241,
0x9601, 0x56C0, 0x5780, 0x9741, 0x5500, 0x95C1, 0x9481, 0x5440,
0x9C01, 0x5CC0, 0x5D80, 0x9D41, 0x5F00, 0x9FC1, 0x9E81, 0x5E40,
0x5A00, 0x9AC1, 0x9B81, 0x5B40, 0x9901, 0x59C0, 0x5880, 0x9841,
0x8801, 0x48C0, 0x4980, 0x8941, 0x4B00, 0x8BC1, 0x8A81, 0x4A40,
0x4E00, 0x8EC1, 0x8F81, 0x4F40, 0x8D01, 0x4DC0, 0x4C80, 0x8C41,
0x4400, 0x84C1, 0x8581, 0x4540, 0x8701, 0x47C0, 0x4680, 0x8641,
0x8201, 0x42C0, 0x4380, 0x8341, 0x4100, 0x81C1, 0x8081, 0x4040 )
# - mqtt topic
# - mqtt payload
# - address 0x3412 = Low byte: 34, High byte:12
###input_number.curtains_a2_1
gt_mqtt_units = [
["/devices/a2/curtains_1", "OPEN", "0x0101"],
["/devices/a2/curtains_1", "CLOSE", "0x0101"],
["/devices/a2/curtains_1", "STOP", "0x0101"],
["/devices/a2/curtains_1", "ADDR", " "],
["/devices/a2/curtains_2", "OPEN", "0xFEFE"],
["/devices/a2/curtains_2", "CLOSE", "0xFEFE"],
["/devices/a2/curtains_2", "STOP", "0xFEFE"]
]
gt_units = [
#---------------------------------------------------------------------------
# loggia
["input_number.curtains_a2_1", "0x0101"], #0x3412
["input_number.curtains_a2_2", "0xFEFE"],
# ["input_number.curtains_a2_2", "0x0a02"]
]
class Context():
serial_port = None
# REMOVED extra lines
# $output = pack("C*",0x55,0xFE,0xFE,0x03,0x01,0xB9,0x24); #55FEFE0301B924
class curtain(mqtt.Mqtt, hass.Hass):
def initialize(self):
self.debug("Hello from AppDaemon")
self.debug("Hello from AppDaemon ddd " + str(Context.serial_port))
try:
if Context.serial_port == None:
self.debug("MQTT: -> open serial ... ")
Context.serial_port = serial.Serial(
port = '/dev/ttyUSB0', #/dev/ttyUSB0
baudrate = 9600, # 9600 bauds
bytesize = serial.EIGHTBITS, # 7bits
parity = serial.PARITY_NONE, # even parity
stopbits = serial.STOPBITS_ONE, # 1 stop bit
xonxoff = False, # no flow control
timeout = 1
)
self.debug("MQTT: -> serial is opened: " + Context.serial_port.name)
except Exception as e:
self.debug("MQTT: -> open serial ERROR " + str(e))
# unit[0] - virtual sensor (var)
# unit[1] - real sensor
for unit in gt_mqtt_units:
# run listen
self.listen_event(self.mqtt_callback, 'MQTT_MESSAGE', topic=unit[0], payload=unit[1], v_addr=unit[2], namespace = "mqtt")
for unit in gt_units:
# run listen
self.listen_state(self.callback, unit[0], addr = unit[1])
# except Exception as e:
# self.debug("MQTT: -> open serial ERROR ")
# pass
# finally:
# self.debug("MQTT: -> open serial ERROR ")
# self.ser = serial.Serial(
# port = '/dev/ttyUSB00000', #/dev/ttyUSB0
# baudrate = 9600, # 9600 bauds
# bytesize = serial.EIGHTBITS, # 7bits
# parity = serial.PARITY_NONE, # even parity
# stopbits = serial.STOPBITS_ONE, # 1 stop bit
# xonxoff = False, # no flow control
# timeout = 1
# )
# self.debug("MQTT: -> open serial " + self.ser.name)
# timer is needed to get the position
time_1min = datetime.time(0, 0, 0)
self.run_minutely(self.check_pos, time_1min)
# ser.close()
#----------------------------------------------------
def terminate(self):
self.debug("MQTT: -> terminate enter ")
if Context.serial_port != None:
Context.serial_port.close()
self.debug("MQTT: -> serial port closed ")
self.debug("MQTT: -> terminate exit ")
def check_pos(self, kwargs):
self.debug("MQTT: timer -> get position ")
for unit in gt_units:
# unit[0] = input_number.curtains_a2_1
# unit[1] = addr
str_out = self.make_serial_out_get_position(addr = unit[1])
self.debug("MQTT: serial out " +str(str_out))
if Context.serial_port != None:
if Context.serial_port.isOpen():
Context.serial_port.write(str_out)
Context.serial_port.flush()
try:
#read
doc = Context.serial_port.readline()
self.debug("MQTT 2. Recieved 1a:" + str(doc))
except Exception as e:
self.debug("MQTT Error : try to parse an incomplete message")
pass
# str_out
str2 = struct.unpack('B B B B B B B B', doc)
str1 = []
str1.append(struct.pack('B', str2[0]))
str1.append(struct.pack('B', str2[1]))
str1.append(struct.pack('B', str2[2]))
str1.append(struct.pack('B', str2[3]))
str1.append(struct.pack('B', str2[4]))
str1.append(struct.pack('B', str2[5]))
# str1.append(struct.pack('B', int('{:02x}'.format(doc[1]))))
crc1 = calcString(str1, INITIAL_MODBUS)
crc1_high, crc1_low = bytes(crc1)
self.debug("MQTT 2. Recieved 1b:" + str(str2))
self.debug("MQTT 2. Recieved 1c:" + str(str2[5]))
if ( crc1_low == '0x{:02x}'.format(str2[6]) and
crc1_high == '0x{:02x}'.format(str2[7]) ):
self.debug("MQTT CRC OK:")
position = int('0x{:02x}'.format(doc[5]),16)
#doc[5] просто в int значение
# self.call_service("input_number/set_value", entity_id = unit[0], value = position)
self.call_service("input_number/set_value", entity_id = unit[0], value = position)
# self.set_value(unit[0], position)
self.debug("MQTT 2. set_value:" + str(unit[0]) + " " + str(position))
self.debug("MQTT 2. Recieved:" + str(position))
else:
self.debug("MQTT: Error - serial closed ")
#----------------------------------------------------
def callback(self, entity, attribute, old, new, kwargs):
addr_high, addr_low = bytes(int(kwargs.get('addr'),16))
if isInt(int(float(self.get_state(entity)))):
self.debug("MQTT: Position" + str(int(float(self.get_state(entity)))))
position = hex(int(float(self.get_state(entity))))
command = '0x04'
str_out = self.make_serial_out(addr = kwargs.get('addr'), command = command, data = position)
# command = '0x01'
# str_out = self.make_serial_out(addr = kwargs.get('addr'), command = command, data = "")
self.debug("MQTT: serial out " +str(str_out))
if Context.serial_port.isOpen():
Context.serial_port.write(str_out)
Context.serial_port.flush()
try:
#read
doc = Context.serial_port.readline()
self.debug("MQTT: Position Recieved 1a:" + str(doc))
except Exception as e:
self.debug("MQTT Error : try to parse an incomplete message")
pass
else:
self.debug("MQTT: Error - serial closed")
else:
self.debug("MQTT: Position is not int" + self.get_state(entity))
#----------------------------------------------------
def mqtt_callback(self, event_name, data, kwargs): #data, kwargs): #self, event_name, *args, **kwargs):
payload = kwargs.get('payload')
self.debug("MQTT-mqtt_callback: Command ")
if payload == 'OPEN':
addr_high, addr_low = bytes(int(kwargs.get('v_addr'),16))
command = '0x01'
str_out = self.make_serial_out(addr = kwargs.get('v_addr'), command = command, data = "")
elif payload == 'CLOSE':
addr_high, addr_low = bytes(int(kwargs.get('v_addr'),16))
command = '0x02'
str_out = self.make_serial_out(addr = kwargs.get('v_addr'), command = command, data = "")
elif payload == 'STOP':
addr_high, addr_low = bytes(int(kwargs.get('v_addr'),16))
command = '0x03'
str_out = self.make_serial_out(addr = kwargs.get('v_addr'), command = command, data = "")
elif payload == 'ADDR':
# self.check_pos(self)
command = ''
str_out = self.make_serial_out_set_addr(addr_curr = "0x0000", addr = self.get_state('input_text.curtains_new_addr'))
self.debug("MQTT: Command " + command)
self.debug("MQTT: serial out " + str(str_out))
if Context.serial_port.isOpen():
Context.serial_port.write(str_out)
try:
#read
doc = Context.serial_port.readline()
self.debug("MQTT-mqtt_callback: Command Recieved 1a:" + str(doc))
except Exception as e:
self.debug("MQTT Error : try to parse an incomplete message")
pass
Context.serial_port.flush()
else:
self.debug("MQTT: Error - serial closed ")
#--------------------------------------------------------
# compose serial string | commands: OPEN CLOSE STOP
#--------------------------------------------------------
def make_serial_out(self, **kwargs):
command = kwargs.get('command')
data = kwargs.get('data')
addr_high, addr_low = bytes(int(kwargs.get('addr'),16))
str1 = []
str1.append(struct.pack('B', int('0x55',16)))
str1.append(struct.pack('B', int(addr_low,16)))
str1.append(struct.pack('B', int(addr_high,16)))
str1.append(struct.pack('B', int('0x03',16)))
str1.append(struct.pack('B', int(command,16)))
if data != "":
str1.append(struct.pack('B', int(data,16)))
crc1 = calcString(str1, INITIAL_MODBUS)
crc1_high, crc1_low = bytes(crc1)
str1.append(struct.pack('B', int(crc1_low,16)))
str1.append(struct.pack('B', int(crc1_high,16)))
if data != "":
str_out = struct.pack('B B B B B B B B',
int('0x55',16),
int(addr_low,16),
int(addr_high,16),
int('0x03',16),
int(command,16),
int(data,16),
int(crc1_low,16),
int(crc1_high,16))
else:
str_out = struct.pack('B B B B B B B',
int('0x55',16),
int(addr_low,16),
int(addr_high,16),
int('0x03',16),
int(command,16),
int(crc1_low,16),
int(crc1_high,16))
return str_out
#--------------------------------------------------------
# compose serial string | command: set new address
#--------------------------------------------------------
def make_serial_out_set_addr(self, **kwargs):
addr_high, addr_low = bytes(int(kwargs.get('addr_curr'),16))
addr_n_high, addr_n_low = bytes(int(kwargs.get('addr'),16))
str1 = []
str1.append(struct.pack('B', int('0x55',16)))
str1.append(struct.pack('B', int(addr_low,16)))
str1.append(struct.pack('B', int(addr_high,16)))
str1.append(struct.pack('B', int('0x02',16)))
str1.append(struct.pack('B', int('0x00',16)))
str1.append(struct.pack('B', int('0x02',16)))
str1.append(struct.pack('B', int(addr_n_low,16)))
str1.append(struct.pack('B', int(addr_n_high,16)))
crc1 = calcString(str1, INITIAL_MODBUS)
crc1_high, crc1_low = bytes(crc1)
str1.append(struct.pack('B', int(crc1_low,16)))
str1.append(struct.pack('B', int(crc1_high,16)))
str_out = struct.pack('B B B B B B B B B B',
int('0x55',16),
int(addr_low,16),
int(addr_high,16),
int('0x02',16),
int('0x00',16),
int('0x02',16),
int(addr_n_low,16),
int(addr_n_high,16),
int(crc1_low,16),
int(crc1_high,16))
return str_out
#--------------------------------------------------------
# compose serial string
#--------------------------------------------------------
def make_serial_out_get_position(self, **kwargs):
addr_high, addr_low = bytes(int(kwargs.get('addr'),16))
str1 = []
str1.append(struct.pack('B', int('0x55',16)))
str1.append(struct.pack('B', int(addr_low,16)))
str1.append(struct.pack('B', int(addr_high,16)))
str1.append(struct.pack('B', int('0x01',16)))
str1.append(struct.pack('B', int('0x02',16)))
str1.append(struct.pack('B', int('0x01',16)))
crc1 = calcString(str1, INITIAL_MODBUS)
crc1_high, crc1_low = bytes(crc1)
str1.append(struct.pack('B', int(crc1_low,16)))
str1.append(struct.pack('B', int(crc1_high,16)))
str_out = struct.pack('B B B B B B B B',
int('0x55',16),
int(addr_low,16),
int(addr_high,16),
int('0x01',16),
int('0x02',16),
int('0x01',16),
int(crc1_low,16),
int(crc1_high,16))
return str_out
#--------------------------------------------------------
def do_process (self, entity, attribute, old, new, kwargs):
clicks = '1'
Context.serial_port.write(bytes(clicks.encode('ascii')))
buffer = ""
while True:
try:
buffer = ser.readline()
self.log(buffer)
doc = json.loads(buffer)
read_b1 = doc["b1"]
read_b2 = doc["b2"]
read_s1 = doc["s1"]
read_s2 = doc["s2"]
self.log(read_b1)
buffer = ""
except json.JSONDecodeError:
self.log("Error : try to parse an incomplete message")
#----------------------------------------------------
def debug(self, text):
if self.args["DEBUG"] == 1:
self.log("Curtain " + text)
#----------------------------------------------------
def isInt(value):
try:
int(value)
return True
except ValueError:
return False
except TypeError:
return False
def calcByte( ch, crc):
#Given a new Byte and previous CRC, Calc a new CRC-16"""
if type(ch) == type("c"):
by = ord(ch)
else:
by = ch
crc = (crc >> 8) ^ table[(crc ^ by) & 0xFF]
return (crc & 0xFFFF)
def calcString(st, crc):
#Given a bunary string and starting CRC, Calc a final CRC-16 """
for ch in st:
crc = (crc >> 8) ^ table[(crc ^ ord(ch)) & 0xFF]
return crc
def bytes(num):
return hex(num >> 8), hex(num & 0xFF)
def ascii_to_packed_hex(string_data):
# convert each pair of hex digits in the string into an integer
# then convert those to characters and join them into a string
return ''.join(chr(int(digits, 16)) for digits in
(string_data[x:x+2] for x in range(0, len(string_data), 2)))
def decode_int(num):
# remove 0x at the start
dec = hex(num)[2:]
# if num is long hex() will add L at the end
dec = dec.replace("L", "", 1)
if len(dec) % 2 == 1:
dec = "0" + dec
return a2b_hex(dec)
#v_sensor = kwargs.get('v_sensor')
# aaa = bytes.fromhex('01')
# self.debug("MQTT: HEX " +str(ascii_to_packed_hex('55')))
# ppp = decode_int('\x55')
# self.debug("MQTT: st1 " +str(ppp))
# st = "\x55" + "\x12\x34" + "\x03" + "\x04" + "\x32"
#55 - фиксированный префикс
#12 34 - адрес устройства
#03 - отправить команду
#04 - положение карниза
#32 - число 50 в hex
#С9 38 - CRC16/modbus с обратным расположением байтов
# crc = calcString(st, INITIAL_MODBUS)
# crc_high, crc_low = bytes(crc)
#addr_high, addr_low = bytes(int(kwargs.get('v_addr'),16))
##position = hex(int(float(self.get_state(entity))))
#command = '0x04'
# aaa = []
# aaa.append(ascii_to_packed_hex('55'))
# aaa.append(ascii_to_packed_hex('aa'))
# aaa.append(ascii_to_packed_hex('01'))
# self.debug("MQTT: HEX " +str(aaa))
# self.debug("MQTT: st " +st)
# str1 = struct.pack('B B B B B B B B',
# int('0x55',16),
# int(addr_low,16),
# int(addr_high,16),
# int('0x03',16),
# int('0x04',16),
# int(position,16),
# int(crc_low,16),
# int(crc_high,16))
# aaa1 = []
#aaa1.append(ascii_to_packed_hex(int('0x55',16)))
#byte_body = binascii.a2b_hex(struct.pack('B',int('0x55',16)))
# hexadecimal_string = "55"
# byte_array = bytearray.fromhex(hexadecimal_string)
# self.debug("MQTT: byte_body " +str(byte_array))
# self.debug("MQTT: st11111 " +str(crc1_high) + " " + str(crc1_low))
# self.debug("MQTT: st222 " +str(str2))
# #str1 = struct.pack('H H H H H H H', '55', addr_low, addr_high, '3', position, crc_low, crc_high)
# self.ser.write(b'hello')
# self.debug("MQTT: crc" + str(crc_low) + " " + str(crc_high))
# self.debug("MQTT: addr" + str(addr_low) + " " + str(addr_high))
# self.debug("MQTT: pos" + str(position))
# self.debug("MQTT: str1 " +str(str2))
# if ser.isOpen():
#send data
# ser.write(data)
# ser.flush()
# try:
# #read
# doc = ser.readline()
# self.log(2)
# except Exception as e:
# self.log("Error : try to parse an incomplete message")
# pass
# ser.close()
# doc = json.loads(doc)
# self.log("2. Recieved: %s\n",doc)
# str1 = []
# str1.append(struct.pack('B', int('0x55',16)))
# str1.append(struct.pack('B', int(addr_low,16)))
# str1.append(struct.pack('B', int(addr_high,16)))
# str1.append(struct.pack('B', int('0x03',16)))
# str1.append(struct.pack('B', int(command,16)))
# crc1 = calcString(str1, INITIAL_MODBUS)
# crc1_high, crc1_low = bytes(crc1)
# str1.append(struct.pack('B', int(crc1_low,16)))
# str1.append(struct.pack('B', int(crc1_high,16)))
# str_out = struct.pack('B B B B B B B',
# int('0x55',16),
### int(addr_low,16),
# int(addr_high,16),
# int('0x03',16),
## int(command,16),
# int(crc1_low,16),
# int(crc1_high,16))
# if ser.isOpen():
#send data
# ser.write(data)
# ser.flush()
# try:
# #read
# doc = ser.readline()
# self.log(2)
# except Exception as e:
# self.log("Error : try to parse an incomplete message")
# pass
# ser.close()
# doc = json.loads(doc)
# self.log("2. Recieved: %s\n",doc)
# else:
# self.debug("MQTT: not Int " + self.get_state(entity))