Handling of Philips Hue switches

Tags: #<Tag:0x00007f73ae1616a0> #<Tag:0x00007f73ae161510>

I wrote an event translator in AppDaemon that takes raw deconz_events from a Philips Hue dimmer switch and processes them into a more usable event stream. In particular, it allows for things like a “double click” that doesn’t act on the first click. Or a “double click + hold” action (etc.). Question: Is there a better way to do this?

Right now I have an FSM that carries the state logic. The difficult part is issuing the “single_press_confirmed” event which has wait to make sure another press event is not incoming. To do this, I spawn a thread that waits for some time (e.g. 500ms) and then fires a “time lapse” event (which is then consumed by the FSM). It’s surprisingly reliable, but this could be very system specific.

import appdaemon.plugins.hass.hassapi as hass
import time
import threading

# INPUTS (from deconz, except 4 which we generate to indicate a "double click" time lapse)

# 0 = press
# 1 = hold
# 2 = release
# 3 = hold_release
# 4 = double_click lapse

# OUTPUTS

# single_press_confirmed is fired once we have waited
# long enough to know a double_click isn't coming

STATUS = {0 : 'press',
          1 : 'hold',
          2 : 'release',
          3 : 'hold_release',
          4 : 'double_click_press',
          5 : 'double_click_hold',
          6 : 'double_click_release',
          7 : 'double_click_hold_release',
          8 : 'single_press_confirmed'}

BUTTONS = {0 : 'ON',
           1 : 'UP',
           2 : 'DOWN',
           3 : 'OFF'}

# FSM STATES are
#
# 0 : neutral
# 1 : pressed down
# 2 : held down
# 3 : released after press
# 4 : pressed down again after press/release
# 5 : held down after press/release

# Logic of the FSM, with error correction
# built in if some events get lost

FSM = {(0,0) : (1,0),
       (0,1) : (2,1),
       (0,2) : (3,2),
       (0,3) : (3,3),
       (0,4) : (0,None),
       (1,0) : (1,0),
       (1,1) : (2,1),
       (1,2) : (3,2),
       (1,3) : (3,3),
       (1,4) : (1,None),
       (2,1) : (2,1),
       (2,2) : (0,3),
       (2,3) : (0,3),
       (2,4) : (2,None),
       (3,0) : (4,4),
       (3,1) : (5,5),
       (3,2) : (0,6),
       (3,3) : (5,5),
       (3,4) : (0,8),
       (4,1) : (5,5),
       (4,2) : (0,6),
       (4,4) : (4,None),
       (5,1) : (5,5),
       (5,2) : (0,7),
       (5,3) : (0,7),
       (5,4) : (5,None) }

DOUBLE_CLICK_GAP = 0.5

class SwitchServer(hass.Hass):

    def initialize(self):

        if self.args and 'switch' in self.args:
            self.device_id = self.args['switch']

        self.log('Loading switch: ' + str(self.device_id), log='debug')

        self.event_q = []
        self.states = [0,0,0,0]

        self.listen_event(self.event_handler, 'deconz_event', id=self.device_id)
        self.listen_event(self.event_handler, 'faux_deconz_event', id=self.device_id)

    def fsm_step(self, button, switch_input):
        state = self.states[button]
        new_state, output = FSM.get((self.states[button], switch_input), (None, None))
        self.log('switch: ' + self.device_id + '(' + str(state) + ', ' + str(switch_input) + ') => (' + str(new_state) + ', ' +
                 str(output) + ')', log='debug')
        if new_state != None:
            self.states[button] = new_state
            self.log('Returning ' + str(output), log='debug')
            return output
        else:
            return None

    def event_handler(self, event_name, data, kwargs):

        # we do this via an event so that AD schedules the events in approx. the correct order
        if (event_name == 'faux_deconz_event'):
            self.process_event(data['button'], 4, waiter=True)

        self.log('Deconz event logger: ' + str(data),log='debug')

        event_id = data.get('event', None)
        if not(event_id):
            self.log('ERROR: Event with no id')
            return

        self.event_q.append(event_id)
        self.process_queue()

    def process_queue(self):
        if len(self.event_q) > 0:
            event_id = self.event_q.pop(0)
            button = int(event_id/1000)-1
            switch_input = event_id % 10
            self.process_event(button, switch_input)
            #self.process_queue(device_id)

    def process_event(self, button, switch_input, waiter=False):

        self.log('Processing input ' + str(switch_input) + ' on button ' + str(button) + ' of ' +
                 self.device_id, log='debug')

        output = self.fsm_step(button, switch_input)

        if (output != None):
            self.fire_event('switch_server_event', id=self.device_id, output=STATUS[output], button=BUTTONS[button])

        # if just released after press
        if (output==2):
            if not(waiter):
                self.log('Spawning the child to wait',log='debug')
                waiter = WaitChild(self.fire_event, self.device_id, button, DOUBLE_CLICK_GAP)
                waiter.run()

class WaitChild(threading.Thread):

    def __init__(self, fire_event, device_id, button, sleep_time):

        threading.Thread.__init__(self)
        self.fire_event = fire_event
        self.sleep_time = sleep_time
        self.device_id = device_id
        self.button = button

    def run(self):

        time.sleep(self.sleep_time)
        self.fire_event('faux_deconz_event', id=self.device_id, button=self.button)