I wrote an event translator in AppDaemon that takes raw deconz_events from a Philips Hue dimmer switch and processes them into a more usable event stream. In particular, it allows for things like a “double click” that doesn’t act on the first click. Or a “double click + hold” action (etc.). Question: Is there a better way to do this?
Right now I have an FSM that carries the state logic. The difficult part is issuing the “single_press_confirmed” event which has wait to make sure another press event is not incoming. To do this, I spawn a thread that waits for some time (e.g. 500ms) and then fires a “time lapse” event (which is then consumed by the FSM). It’s surprisingly reliable, but this could be very system specific.
import appdaemon.plugins.hass.hassapi as hass
import time
import threading
# INPUTS (from deconz, except 4 which we generate to indicate a "double click" time lapse)
# 0 = press
# 1 = hold
# 2 = release
# 3 = hold_release
# 4 = double_click lapse
# OUTPUTS
# single_press_confirmed is fired once we have waited
# long enough to know a double_click isn't coming
STATUS = {0 : 'press',
1 : 'hold',
2 : 'release',
3 : 'hold_release',
4 : 'double_click_press',
5 : 'double_click_hold',
6 : 'double_click_release',
7 : 'double_click_hold_release',
8 : 'single_press_confirmed'}
BUTTONS = {0 : 'ON',
1 : 'UP',
2 : 'DOWN',
3 : 'OFF'}
# FSM STATES are
#
# 0 : neutral
# 1 : pressed down
# 2 : held down
# 3 : released after press
# 4 : pressed down again after press/release
# 5 : held down after press/release
# Logic of the FSM, with error correction
# built in if some events get lost
FSM = {(0,0) : (1,0),
(0,1) : (2,1),
(0,2) : (3,2),
(0,3) : (3,3),
(0,4) : (0,None),
(1,0) : (1,0),
(1,1) : (2,1),
(1,2) : (3,2),
(1,3) : (3,3),
(1,4) : (1,None),
(2,1) : (2,1),
(2,2) : (0,3),
(2,3) : (0,3),
(2,4) : (2,None),
(3,0) : (4,4),
(3,1) : (5,5),
(3,2) : (0,6),
(3,3) : (5,5),
(3,4) : (0,8),
(4,1) : (5,5),
(4,2) : (0,6),
(4,4) : (4,None),
(5,1) : (5,5),
(5,2) : (0,7),
(5,3) : (0,7),
(5,4) : (5,None) }
DOUBLE_CLICK_GAP = 0.5
class SwitchServer(hass.Hass):
def initialize(self):
if self.args and 'switch' in self.args:
self.device_id = self.args['switch']
self.log('Loading switch: ' + str(self.device_id), log='debug')
self.event_q = []
self.states = [0,0,0,0]
self.listen_event(self.event_handler, 'deconz_event', id=self.device_id)
self.listen_event(self.event_handler, 'faux_deconz_event', id=self.device_id)
def fsm_step(self, button, switch_input):
state = self.states[button]
new_state, output = FSM.get((self.states[button], switch_input), (None, None))
self.log('switch: ' + self.device_id + '(' + str(state) + ', ' + str(switch_input) + ') => (' + str(new_state) + ', ' +
str(output) + ')', log='debug')
if new_state != None:
self.states[button] = new_state
self.log('Returning ' + str(output), log='debug')
return output
else:
return None
def event_handler(self, event_name, data, kwargs):
# we do this via an event so that AD schedules the events in approx. the correct order
if (event_name == 'faux_deconz_event'):
self.process_event(data['button'], 4, waiter=True)
self.log('Deconz event logger: ' + str(data),log='debug')
event_id = data.get('event', None)
if not(event_id):
self.log('ERROR: Event with no id')
return
self.event_q.append(event_id)
self.process_queue()
def process_queue(self):
if len(self.event_q) > 0:
event_id = self.event_q.pop(0)
button = int(event_id/1000)-1
switch_input = event_id % 10
self.process_event(button, switch_input)
#self.process_queue(device_id)
def process_event(self, button, switch_input, waiter=False):
self.log('Processing input ' + str(switch_input) + ' on button ' + str(button) + ' of ' +
self.device_id, log='debug')
output = self.fsm_step(button, switch_input)
if (output != None):
self.fire_event('switch_server_event', id=self.device_id, output=STATUS[output], button=BUTTONS[button])
# if just released after press
if (output==2):
if not(waiter):
self.log('Spawning the child to wait',log='debug')
waiter = WaitChild(self.fire_event, self.device_id, button, DOUBLE_CLICK_GAP)
waiter.run()
class WaitChild(threading.Thread):
def __init__(self, fire_event, device_id, button, sleep_time):
threading.Thread.__init__(self)
self.fire_event = fire_event
self.sleep_time = sleep_time
self.device_id = device_id
self.button = button
def run(self):
time.sleep(self.sleep_time)
self.fire_event('faux_deconz_event', id=self.device_id, button=self.button)