Looking for push button double click/dimming state machine implementation

What I would like to do is use a Hue wall switch in a push button configuration to control on/off, double click and dimming (changing brightness up and down while the button is pressed).

The Hue wall switch produces left_press, left_hold and left_press_release with Z2M

Here’s a state diagram of the intended implementation:

I’ve been looking around but haven’t been able to find this anywhere. Does someone have something similar running that they can share?

Just to answer my own question. I couldn’t find anything so I ended up doing it myself instead.

Node-RED, AppDaemon and PyScript could probably all be used for the implementation but I decided to go for PyScript. I definitely prefer Python over a visual programming tool/Java and PyScript felt more intuitive than AppDaemon to me.

I didn’t know that there was a left_hold_release command when I created the first state diagram. I also decided to change the workings of it slightly when I settled on a way to send messages from the @state_trigger function to the state machine.

I have a bunch of these switches so I also decided wrap it into an app. Really happy that I went with PyScript!

import asyncio

registered_triggers = []

def make_wall_switch_action(config):

    action_q = asyncio.Queue(10)

    # handles the switch action and adds it to the queue.
    @state_trigger(f"{config['switch_id']} == '{config['button']}_press' or \
                     {config['switch_id']} == '{config['button']}_press_release' or \
                     {config['switch_id']} == '{config['button']}_hold' or \
                     {config['switch_id']} == '{config['button']}_hold_release'")
    def wall_switch_action(**kwargs):
        #log.info(f"{config['switch_id']} {config['button']} button. kwargs == {kwargs}")
        try:
            # attempt to add the trigger action to the queue.
            action_q.put_nowait(kwargs["value"])
        except asyncio.QueueFull:
            # queue is full (should never happen).
            log.error(f"{config['switch_id']} {config['button']} button. Queue is full.")
            

    # the switch state machine that implements the functionality of the switch.
    @time_trigger("startup")
    def wall_switch_state_machine():

        # time in seconds
        time_step_size = 0.1
        dimming_transition_time = time_step_size * 1.1
        full_dimming_time = 2
        
        # timeout iterations
        waiting_on_timeout_iterations = int(2 / time_step_size)
        waiting_off_timeout_iterations = int(2 / time_step_size)
        waiting_double_click_on_timeout_iterations = int(0.4 / time_step_size)
        waiting_double_click_off_timeout_iterations = int(0.4 / time_step_size)
        double_click_on_timeout_iterations = int(2 / time_step_size)
        double_click_off_timeout_iterations = int(2 / time_step_size)
        dim_light_timeout_iterations = int(2 / time_step_size)
        
        # initial state for the state machine.
        current_state = "idle"
        
        i = 0
        while True:
            # idle state.
            # waiting for an action is blocking (remain in the idle state until an action is available).
            if current_state == "idle":
                #log.info(f"{config['switch_id']} {config['button']} button. current_state = idle")
                # retrieve an item from the action queue (or wait until there is an item available).
                action = action_q.get()
                if action == f"{config['button']}_press":
                    # choice.
                    if state.get(config['light_id']) == 'on':
                        next_state = "waiting_on"
                    else:
                        next_state = "waiting_off"
                else:
                    error_msg = f"{config['switch_id']} {config['button']} button. current_state = idle. Erroneous action received = {action}"
                    next_state = "error"
            
            # no actions are handled in the error state.
            # check if an action is available.
            # if yes then rertieve it.
            # if no then set action to "none" and continue.
            elif current_state != "error":
                # retrieve an item from the action queue if one is available (continue if not).
                try:
                    action = action_q.get_nowait()
                except asyncio.QueueEmpty:
                    action = "none"
                    
            # waiting_on state.
            if current_state == "waiting_on":
                #log.info(f"{config['switch_id']} {config['button']} button. current_state = waiting_on")
                if action != "none":
                    if action == f"{config['button']}_press_release":
                        next_state = "waiting_double_click_on"
                    elif action == f"{config['button']}_hold":
                        next_state = "dim_light"
                    else:
                        error_msg = f"{config['switch_id']} {config['button']} button. current_state = waiting_on. Erroneous action received = {action}"
                        next_state = "error"
                elif i >= waiting_on_timeout_iterations:
                    # timeout.
                    error_msg = f"{config['switch_id']} {config['button']} button. current_state = waiting_on. Timeout"
                    next_state = "error"
            
            # waiting_off state.
            elif current_state == "waiting_off":
                #log.info(f"{config['switch_id']} {config['button']} button. current_state = waiting_off")
                if action != "none":
                    if action == f"{config['button']}_press_release":
                        next_state = "waiting_double_click_off"
                    elif action == f"{config['button']}_hold":
                        next_state = "dim_light"
                    else:
                        error_msg = f"{config['switch_id']} {config['button']} button. current_state = waiting_off. Erroneous action received = {action}"
                        next_state = "error"
                elif i >= waiting_off_timeout_iterations:
                    # timeout.
                    error_msg = f"{config['switch_id']} {config['button']} button. current_state = waiting_off. Timeout"
                    next_state = "error"

            # waiting_double_click_on state.
            elif current_state == "waiting_double_click_on":
                #log.info(f"{config['switch_id']} {config['button']} button. current_state = waiting_double_click_on")
                if action != "none":
                    if action == f"{config['button']}_press":
                        next_state = "double_click_on"
                    else:
                        error_msg = f"{config['switch_id']} {config['button']} button. current_state = waiting_double_click_on. Erroneous action received = {action}"
                        next_state = "error"
                elif i >= waiting_double_click_on_timeout_iterations:
                    # timeout.            
                    next_state = "idle"

            # waiting_double_click_off state.
            elif current_state == "waiting_double_click_off":
                #log.info(f"{config['switch_id']} {config['button']} button. current_state = waiting_double_click_off")
                
                if action != "none":
                    if action == f"{config['button']}_press":
                        next_state = "double_click_off"
                    else:
                        error_msg = f"{config['switch_id']} {config['button']} button. current_state = waiting_double_click_off. Erroneous action received = {action}"
                        next_state = "error"
                elif i >= waiting_double_click_off_timeout_iterations:
                    # timeout.
                    next_state = "idle"

            # double_click_on state.
            elif current_state == "double_click_on":
                #log.info(f"{config['switch_id']} {config['button']} button. current_state = double_click_on")
                if entry:
                    # double click action.
                    # default brightness and default light temperature.
                    light.turn_on(entity_id=config['light_id'], brightness=config['brightness'], kelvin=config['kelvin'])

                if action != "none":
                    if action == f"{config['button']}_press_release":
                        next_state = "idle"
                    elif action == f"{config['button']}_hold":
                        next_state = "dim_light"
                    else:
                        error_msg = f"{config['switch_id']} {config['button']} button. current_state = double_click_on. Erroneous action received = {action}"
                        next_state = "error"
                elif i >= double_click_on_timeout_iterations:
                    # timeout.
                    error_msg = f"{config['switch_id']} {config['button']} button. current_state = double_click_on. Timeout"
                    next_state = "error"
                
            # double_click_off state.
            elif current_state == "double_click_off":
                #log.info(f"{config['switch_id']} {config['button']} button. current_state = double_click_off")
                if entry:
                    # double click action.
                    # default brightness and default light temperature.
                    light.turn_on(entity_id=config['light_id'], brightness=config['brightness'], kelvin=config['kelvin'])

                if action != "none":
                    if action == f"{config['button']}_press_release":
                        next_state = "idle"
                    elif action == f"{config['button']}_hold":
                        next_state = "dim_light"
                    else:
                        error_msg = f"{config['switch_id']} {config['button']} button. current_state = double_click_off. Erroneous action received = {action}"
                        next_state = "error"
                elif i >= double_click_off_timeout_iterations:
                    # timeout.
                    error_msg = f"{config['switch_id']} {config['button']} button. current_state = double_click_off. Timeout"
                    next_state = "error"
                

            # dim_light state.
            elif current_state == "dim_light":
                #log.info(f"{config['switch_id']} {config['button']} button. current_state = dim_light")
                if entry:
                    attrs = state.getattr(config['light_id'])
                    brightness = int(attrs.get("brightness", 0))
                    brightness_step_size = int(255 / (full_dimming_time / time_step_size))
                    dimming_direction = "down"
                else:
                    if dimming_direction == "down":
                        brightness = brightness - brightness_step_size
                        if brightness <= 1:
                            brightness = 1
                            dimming_direction = "up"
                    else:
                        brightness = brightness + brightness_step_size
                        if brightness >= 255:
                            brightness = 255
                            dimming_direction = "down"
                    light.turn_on(entity_id=config['light_id'], brightness=brightness, transition=dimming_transition_time)
                
                if action != "none":
                    if action == f"{config['button']}_hold_release":
                        next_state = "idle"
                    elif action == f"{config['button']}_hold":
                        # reset iteration timer.
                        i = 0
                    else:
                        error_msg = f"{config['switch_id']} {config['button']} button. current_state = dim_light. Erroneous action received = {action}"
                        next_state = "error"
                elif i >= dim_light_timeout_iterations:
                    # timeout.
                    error_msg = f"{config['switch_id']} {config['button']} button. current_state = dim_light. Timeout"
                    next_state = "error"
                
            # error state.
            elif current_state == "error":
                #log.info(f"{config['switch_id']} {config['button']} button. current_state = error")
                log.error(error_msg)
                next_state = "idle"
            
            # change the state when requested.
            # set entry = True (entry into a new state).
            # reset iteration timer.
            # no sleep when changing state.
            if current_state != next_state:
                # transitions
                if current_state == "idle" and next_state == "waiting_off":
                    light.turn_on(entity_id=config['light_id'])
                elif current_state == "waiting_double_click_on" and next_state == "idle":
                    light.turn_off(entity_id=config['light_id'])
                
                current_state = next_state
                entry = True
                i = 0
                
            # set entry = False (not entrying into a new state).
            # increase iteration timer.
            else:
                entry = False
                i = i + 1
                task.sleep(time_step_size)
    
    # register it in the global scope so pyscript sees it.
    registered_triggers.append(wall_switch_action)
    registered_triggers.append(wall_switch_state_machine)


@time_trigger("startup")
def wall_switch_action_startup():
    #log.info("wall switch action startup.")
    for app in pyscript.app_config:
        #log.info(f"wall switch action startup. app = {app}")
        make_wall_switch_action(app)