Wwa_partition: Possible solution for SK6812 WWA (#260)

Please excuse my first post here, and first attempt to write an external component.
The size of the post isn’t meant to offend, but there’s a fair bit going on so I’ve tried to be complete, rather than concise.
Hopefully I’ve tucked most of the irrelevant code into packages.

I assume that reading all the code inline is preferred?

I have made myself a strip light mounted under my shaving cabinet in the bathroom running on a generic ESP32-C3 “Supermini” that’s powered from 5V USB and contains a number of ‘pixel’ LEDs.
At each end are 6x WS2812 RGB LEDs for notifications (easy to see from the shower), and 36x SK6812 WWA LEDs in the middle for illumination of the sink.
The light faces down onto the sink, and on the side of the strip is a LD2410B radar detector module which is connected by twisted pair to a UART.
Picture attached for context.

This light doubles as a bluetooth proxy for Bermuda Trilateration, but that’s another story!

The idea is that the brightness and color temperature of the WWA LEDs are varied, such that:

  1. brightness is inversely proportional to the detected distance, and off when nobody present.
  2. at closest detected distance (hand washing) full brightness is required during the day time (all three color temperatures need to be lit).
  3. Color temperature becomes warmer during night time, and brightness is reduced.

So, I’ve come up with an external component I’ve called wwa_partition for controlling the sk6812 as a partition of a esp32_rmt_led_strip, I’ve got the color temperature working nicely over the entire range, with full brightness of all 3 LEDs possible at the warm temp. The light presents itself as a normal color temperature light to HA, with the amber temperature being the warm temp. Without transitions the light behaves perfectly in both brightness and color temperature.

The idea of using a partition might seem a bit off, but bear with me:

  1. This actually started with an experiment where I had 6x WS2812 + 36 SK6812 + 6x WS2812 in a single strip, so the code developed with the need just to address the middle section. I have since moved the SK6812 and WS2812 onto separate RMT channels because the first two LEDs in the second group of WS2812 tended to glitch a bit (note the two blue LEDs in the picture).
  2. Esphome already knows how to talk to sk6812 via esp32_rmt_led_strip, so my intention is only to add the wwa functionality, not to reinvent anything else.

So, the relevant code for this device currently looks something like this:

# bathsink_snippet.yaml

substitutions:

  devicename: bathsink
  DeviceName: "BathSink"

packages:

  supermini: !include _common/packages/boards/supermini.yaml
  ld2410:    !include _common/packages/ld2410.yaml

external_components:

  - source:
      type: local
      path: my_components
    components: [wwa_partition]

light:

  - platform: esp32_rmt_led_strip
    rgb_order: GRB
    pin: GPIO5
    num_leds: 36
    rmt_channel: 0
    chipset: sk6812
    name: "${DeviceName} Sink"
    id: "${devicename}_sink_strip"
    max_refresh_rate: 125ms
    internal: true

  - platform: wwa_partition
    name: "${DeviceName} Sink"
    id: ${devicename}_sink
    cold_white_color_temperature: 6500K
    warm_white_color_temperature: 2000K
    segments:
      - id: ${devicename}_sink_strip
        from: 0
        to: 35
    gamma_correct: 1.0
    #~ default_transition_length: 1250ms
    default_transition_length: 200ms

  - platform: esp32_rmt_led_strip
    rgb_order: GRB
    pin: GPIO9
    num_leds: 6
    rmt_channel: 1
    chipset: ws2812
    name: "${DeviceName} Alert"
    id: "${devicename}_alert"
    max_refresh_rate: 125ms

number:

  - platform: template
    name: "Smoothing Alpha Towards"
    id: smoothing_alpha_towards
    min_value: 0.5
    max_value: 1.0
    step: 0.01
    unit_of_measurement: ""
    optimistic: true
    initial_value: 0.95  # Default value for fast reaction towards the sensor
    entity_category: CONFIG

  - platform: template
    name: "Smoothing Alpha Away"
    id: smoothing_alpha_away
    min_value: 0.01
    max_value: 0.2
    step: 0.01
    unit_of_measurement: ""
    optimistic: true
    initial_value: 0.1  # Default value for slow reaction away from the sensor
    entity_category: CONFIG


sensor:

  - platform: template
    name: "${DeviceName} Smoothed Detection Distance"
    id: ${devicename}_smoothed_detection_distance
    unit_of_measurement: "cm"
    update_interval: 100ms
    lambda: |-
      static float previous_value = 0.0;
      const float detection_distance = id(bathsink_ld2410_detection_distance).state;
      const bool presence = id(bathsink_ld2410_presence).state;

      // Get the dynamic smoothing factors from the Home Assistant sliders
      const float alpha_towards = id(smoothing_alpha_towards).state;
      const float alpha_away = id(smoothing_alpha_away).state;

      if (presence) {
        if (detection_distance < previous_value) {
          // Moving towards sensor, react faster
          previous_value = alpha_towards * detection_distance + (1 - alpha_towards) * previous_value;
        } else {
          // Moving away from sensor, react slower
          previous_value = alpha_away * detection_distance + (1 - alpha_away) * previous_value;
        }
      } else {
        // No presence, reset to 0
        previous_value = 0.0;
      }

      return previous_value;
  - platform: template
    name: "${DeviceName} Distance Brightness"
    id: ${devicename}_distance_brightness
    unit_of_measurement: '%'
    #~ update_interval: 100ms
    update_interval: 250ms
    lambda: |-
      float brightness = 0;
      const float min_distance = 80.0;
      const float max_distance = 300.0;

      if (id(${devicename}_smoothed_detection_distance).state == 0) {
        brightness = 0;  // No detection, set brightness to 0
      } else if (id(${devicename}_smoothed_detection_distance).state <= min_distance) {
        brightness = 100.0;  // If closer than min distance, set brightness to 100%
      } else {
        // Scale brightness linearly between min and max distance
        brightness = (max_distance - id(${devicename}_smoothed_detection_distance).state) / (max_distance - min_distance) * 100.0;
      }

      return brightness;

    on_value:
      then:
        - if:
            condition:
              switch.is_on: ${devicename}_auto_brightness
            then:
              - if:
                  condition:
                    lambda: 'return( id(bathsink_distance_brightness).state <= 0.0 );'
                  then:
                    - if:
                        condition:
                          light.is_on: ${devicename}_sink
                        then:
                          - light.turn_off:
                              id: ${devicename}_sink
                              #~ transition_length: 2200ms
                  else:
                    - if:
                        condition:
                          lambda: 'return abs(id(bathsink_distance_brightness).state - id(${devicename}_sink).current_values.get_brightness()) > 0.05;'
                        then:
                          - if:
                              condition:
                                lambda: 'return( id(bathsink_distance_brightness).state > 90.0 );'
                              then:
                                - light.turn_on:
                                    id: ${devicename}_sink
                                    brightness: 1.0
                                    color_mode: COLOR_TEMPERATURE
                                    color_temperature: 3250K
                                    #~ transition_length: 500ms
                              else:
                                - light.turn_on:
                                    id: ${devicename}_sink
                                    brightness: !lambda |-
                                      return ( id(bathsink_distance_brightness).state / 100) ;
                                    color_mode: COLOR_TEMPERATURE
                                    color_temperature: 4500K

# light.py
# shamelessly scraped and mangled from:
# https://gitlab.lukasslaby.cz/root/esphome/-/blob/bc7c11be96666236fd0101d8a451d49b1ff454bd/esphome/components/partition/light.py

import esphome.codegen as cg
import esphome.config_validation as cv
import esphome.final_validate as fv
from esphome.components import light
from esphome.const import (
    CONF_ID,
    CONF_OUTPUT_ID,
    CONF_FROM,
    CONF_TO,
    CONF_SEGMENTS,
    CONF_COLD_WHITE_COLOR_TEMPERATURE,
    CONF_WARM_WHITE_COLOR_TEMPERATURE,
    CONF_REVERSED,
    CONF_SINGLE_LIGHT_ID,
    CONF_ADDRESSABLE_LIGHT_ID,
    CONF_LIGHT_ID,
    CONF_NUM_LEDS,
)

CODEOWNERS = ["@mzb"]

wwa_partitions_ns = cg.esphome_ns.namespace('wwa_partition')

AddressableSegment = cg.esphome_ns.class_("AddressableSegment")
AddressableLightWrapper = cg.esphome_ns.namespace("light").class_(
    "AddressableLightWrapper"
)
PartitionOutput = wwa_partitions_ns.class_(
    "PartitionOutput", light.AddressableLight
)

def validate_from_to(value):
    if CONF_ID in value and value[CONF_FROM] > value[CONF_TO]:
        raise cv.Invalid(
            f"From ({value[CONF_FROM]}) must not be larger than to ({value[CONF_TO]})"
        )
    return value

def validate_segment(config):
    fconf = fv.full_config.get()

    if CONF_ID in config:  # only validate addressable segments
        path = fconf.get_path_for_id(config[CONF_ID])[:-1]
        segment_light_config = fconf.get_config_for_path(path)

        if CONF_NUM_LEDS in segment_light_config:
            segment_len = segment_light_config[CONF_NUM_LEDS]
            if config[CONF_FROM] >= segment_len:
                raise cv.Invalid(
                    f"FROM ({config[CONF_FROM]}) must be less than the number of LEDs in light '{config[CONF_ID]}' ({segment_len})",
                    [CONF_FROM],
                )
            if config[CONF_TO] >= segment_len:
                raise cv.Invalid(
                    f"TO ({config[CONF_TO]}) must be less than the number of LEDs in light '{config[CONF_ID]}' ({segment_len})",
                    [CONF_TO],
                )


ADDRESSABLE_SEGMENT_SCHEMA = cv.Schema(
    {
        cv.Required(CONF_ID): cv.use_id(light.AddressableLightState),
        cv.Required(CONF_FROM): cv.positive_int,
        cv.Required(CONF_TO): cv.positive_int,
        cv.Optional(CONF_REVERSED, default=False): cv.boolean,
    }
)

NONADDRESSABLE_SEGMENT_SCHEMA = cv.COMPONENT_SCHEMA.extend(
    {
        cv.Required(CONF_SINGLE_LIGHT_ID): cv.use_id(light.LightState),
        cv.GenerateID(CONF_ADDRESSABLE_LIGHT_ID): cv.declare_id(
            AddressableLightWrapper
        ),
        cv.GenerateID(CONF_LIGHT_ID): cv.declare_id(light.types.LightState),
    }
)

CONFIG_SCHEMA = light.ADDRESSABLE_LIGHT_SCHEMA.extend(
    {
        cv.GenerateID(CONF_OUTPUT_ID): cv.declare_id(PartitionOutput),
        cv.Required(CONF_COLD_WHITE_COLOR_TEMPERATURE): cv.color_temperature,
        cv.Required(CONF_WARM_WHITE_COLOR_TEMPERATURE): cv.color_temperature,
        cv.Required(CONF_SEGMENTS): cv.All(
            cv.ensure_list(
                cv.Any(ADDRESSABLE_SEGMENT_SCHEMA, NONADDRESSABLE_SEGMENT_SCHEMA),
                validate_from_to,
            ),
            cv.Length(min=1),
        ),
    }
)

FINAL_VALIDATE_SCHEMA = cv.Schema(
    {
        cv.Required(CONF_SEGMENTS): [validate_segment],
    },
    extra=cv.ALLOW_EXTRA,
)


async def to_code(config):
    segments = []
    for conf in config[CONF_SEGMENTS]:
        if CONF_SINGLE_LIGHT_ID in conf:
            wrapper = cg.new_Pvariable(
                conf[CONF_ADDRESSABLE_LIGHT_ID],
                await cg.get_variable(conf[CONF_SINGLE_LIGHT_ID]),
            )
            light_state = cg.new_Pvariable(
                conf[CONF_LIGHT_ID],
                conf[CONF_COLD_WHITE_COLOR_TEMPERATURE],
                conf[CONF_WARM_WHITE_COLOR_TEMPERATURE],
                wrapper
            )
            cg.add(light_state.set_supported_color_modes({ColorMode.COLOR_TEMPERATURE}))

            # Set color temperature values
            if 'cold_white_color_temperature' in conf:
                cg.add(light_state.set_cold_white_color_temperature(config['cold_white_color_temperature']))
            if 'warm_white_color_temperature' in conf:
                cg.add(light_state.set_warm_white_color_temperature(config['warm_white_color_temperature']))

            await cg.register_component(light_state, conf)
            segments.append(wwa_partitions_ns.AddressableSegment(light_state, 0, 1, False))

        else:
            segments.append(
                wwa_partitions_ns.AddressableSegment(
                    await cg.get_variable(conf[CONF_ID]),
                    conf[CONF_FROM],
                    conf[CONF_TO] - conf[CONF_FROM] + 1,
                    conf[CONF_REVERSED],
                )
            )

    var = cg.new_Pvariable(config[CONF_OUTPUT_ID], segments)
    await cg.register_component(var, config)
    await light.register_light(var, config)
// wwa_partition.cpp
#include "wwa_partition.h"
#include "esphome/core/log.h"

namespace esphome {
namespace wwa_partition {

static constace esphome
// wwa_partition.h
#pragma once

#include <utility>
#include <vector>

#include "esphome/core/component.h"
#include "esphome/core/log.h"
#include "esphome/components/light/addressable_light.h"

// Define the color temperatures of the LEDs in Kelvin
const float T_AMBER = 2000.0f;  // Amber LED color temperature
const float T_WARM  = 3250.0f;  // Warm White LED color temperature
const float T_COLD  = 6500.0f;  // Cold White LED color temperature

namespace esphome {
namespace wwa_partition {

class AddressableSegment {
 public:
  AddressableSegment(light::LightState *src, int32_t src_offset, int32_t size, bool reversed)
      : src_(static_cast<light::AddressableLight *>(src->get_output())),
        src_offset_(src_offset),
        size_(size) {}

  light::AddressableLight *get_src() const { return this->src_; }
  int32_t get_src_offset() const { return this->src_offset_; }
  int32_t get_size() const { return this->size_; }
  int32_t get_dst_offset() const { return this->dst_offset_; }
  void set_dst_offset(int32_t dst_offset) { this->dst_offset_ = dst_offset; }

  void update_segment() {
    this->src_->schedule_show ();
  }

 protected:
  light::AddressableLight *src_;
  int32_t src_offset_;
  int32_t size_;
  int32_t dst_offset_;
};

class PartitionOutput : public light::AddressableLight {
 public:

  // Member variables to handle transitions
  float start_brightness_ = 0.0f;
  float start_color_temp_ = 0.0f;
  float target_brightness_ = 0.0f;
  float target_color_temp_ = 0.0f;
  uint32_t transition_start_time_ = 0;
  uint32_t transition_length_ = 0;
  bool transitioning_ = false;
  uint32_t brightening_transition_length_{1000}; // 1 second
  uint32_t dimming_transition_length_{5000};     // 5 seconds

  light::LightState *state_{nullptr};

  explicit PartitionOutput(std::vector<AddressableSegment> segments) : segments_(std::move(segments)) {
    int32_t off = 0;
    for (auto &seg : this->segments_) {
      seg.set_dst_offset(off);
      off += seg.get_size();
    }
  }

  int32_t size() const override {
    auto &last_seg = this->segments_[this->segments_.size() - 1];
    return last_seg.get_dst_offset() + last_seg.get_size();
  }

  void clear_effect_data() override {
    for (auto &seg : this->segments_) {
      seg.get_src()->clear_effect_data();
    }
  }

  // Override the get_traits method
  light::LightTraits get_traits() override {
    auto traits = light::LightTraits();
    traits.set_supported_color_modes({light::ColorMode::COLOR_TEMPERATURE, light::ColorMode::BRIGHTNESS});
    traits.set_min_mireds(153);  // Approx 6500K
    traits.set_max_mireds(500);  // Approx 2000K
    return traits;
  }

  void write_state(light::LightState *state) override {
      float brightness = 1.0f;
      state->current_values_as_brightness(&brightness);

      float color_temp = state->current_values.get_color_temperature();

      // If color_temp is not set, default to 4444K
      if (std::isnan(color_temp)) {
          color_temp = 3250.0f;
      }

      apply_state_(brightness, color_temp);
  }


  void loop() override {
    if (!transitioning_)
      return;

    // Compute progress
    uint32_t now = millis();
    float progress = 1.0f;

    if (transition_length_ > 0) {
      uint32_t elapsed = now - transition_start_time_;
      if (elapsed < transition_length_) {
        progress = static_cast<float>(elapsed) / transition_length_;
      } else {
        progress = 1.0f;  // Transition complete
        transitioning_ = false;
      }

      //~ ESP_LOGW("WWA_LIGHT", "TRANSITIONING %f", progress);
    }

    // Interpolate brightness and color temperature
    float brightness = start_brightness_ +
                       (target_brightness_ - start_brightness_) * progress;
    float color_temp = start_color_temp_ +
                       (target_color_temp_ - start_color_temp_) * progress;

    // Apply the interpolated state
    apply_state_(brightness, color_temp);
  }

 protected:
  light::ESPColorView get_view_internal(int32_t index) const override {
    uint32_t lo = 0;
    uint32_t hi = this->segments_.size() - 1;
    while (lo < hi) {
      uint32_t mid = (lo + hi) / 2;
      int32_t begin = this->segments_[mid].get_dst_offset();
      int32_t end = begin + this->segments_[mid].get_size();
      if (index < begin) {
        hi = mid - 1;
      } else if (index >= end) {
        lo = mid + 1;
      } else {
        lo = hi = mid;
      }
    }
    auto &seg = this->segments_[lo];
    // offset within the segment
    int32_t seg_off = index - seg.get_dst_offset();
    // offset within the src
    int32_t src_off;
    src_off = seg.get_src_offset() + seg_off;

    auto view = (*seg.get_src())[src_off];
    view.raw_set_color_correction(&this->correction_);
    return view;
  }

  std::vector<AddressableSegment> segments_;

  float color_temperature_ww_ = 500.0;
  float color_temperature_cw_ = 130.0;

 private:

    float current_brightness_ = 0.0f;
    float current_color_temp_ = 0.0f;

    void apply_state_(float brightness, float color_temp) {
      // Update current brightness and color temperature
      current_brightness_ = brightness;
      current_color_temp_ = color_temp;

      // Calculate color components
      float amber, warm, cold;

      compute_led_intensities(color_temp, brightness, amber, warm, cold);


      //~ ESP_LOGW("WWA_LIGHT", "%f %f %f", amber,warm,cold );

      // Apply the calculated color to the segments
      for (auto &seg : this->segments_) {
        for (int i = 0; i < seg.get_size(); i++) {
          int index = seg.get_src_offset() + i;
          auto color_view = seg.get_src()->operator[](index);
          color_view.set_rgb(
              amber * 255 * brightness,
              cold * 255 * brightness,
              warm * 255 * brightness
          );
        }
        seg.update_segment();
      }
    }

    // Function to convert Kelvin to Mireds
    float kelvin_to_mireds(float kelvin) {
        return 1e6 / kelvin;
    }

    // Function to convert Mireds to Kelvin
    float mireds_to_kelvin(float mireds) {
        return 1e6 / mireds;
    }

    void compute_led_intensities(float color_temp_mireds, float brightness, float &i_amber, float &i_warm, float &i_cold) {
        const float T_AMBER = 2000.0f;
        const float T_WARM = 3250.0f;
        const float T_COLD = 6500.0f;
        const float GAMMA = 3.0f; // Gamma value for correction

        // Convert color temperature from mireds to Kelvin
        float color_temp = 1000000.0f / color_temp_mireds;

        // Clamp the desired temperature between the minimum and maximum
        float t_desired = std::max(T_AMBER, std::min(T_COLD, color_temp));

        // Apply gamma correction to the brightness input
        float adjusted_brightness = powf(brightness, 1.0f / GAMMA);

        if (t_desired <= T_WARM) {
            // Between T_AMBER and T_WARM
            i_amber = 1.0f;
            i_warm = (t_desired - T_AMBER) / (T_WARM - T_AMBER);
            i_cold = 0.0f;
        } else {
            // Between T_WARM and T_COLD
            i_amber = (T_COLD - t_desired) / (T_COLD - T_WARM);
            i_warm = (T_COLD - t_desired) / (T_COLD - T_WARM);
            i_cold = (t_desired - T_WARM) / (T_COLD - T_WARM);
        }

        // At T_WARM, ensure all intensities are 1.0
        if (fabsf(t_desired - T_WARM) < 1e-3f) {
            i_amber = 1.0f;
            i_warm = 1.0f;
            i_cold = 1.0f;
        }

        // Scale by the adjusted brightness
        i_amber *= adjusted_brightness;
        i_warm  *= adjusted_brightness;
        i_cold  *= adjusted_brightness;

        // Ensure intensities do not exceed 1.0
        i_amber = std::max(0.0f, std::min(1.0f, i_amber));
        i_warm  = std::max(0.0f, std::min(1.0f, i_warm));
        i_cold  = std::max(0.0f, std::min(1.0f, i_cold));
    }
};

}  // namespace wwa_partition
}  // namespace esphome

The issue I’m trying to resolve is how transitions work, but here my ignorance is really hitting hard.

How are transitions supposed to be handled by the write_state?

My current solution has been to keep default_transition_length to be very short, and slow down the max_refresh_rate so that full brightness is achieved properly, but the underlying issue is my lack of understanding of the transition mechanism within esphome.

I much appreciate pointers and suggestions if you’ve got some time.

Thanks for reading!

ps: __init__.py is empty file but must exist.

pps: this could be a start to addressing Feature Request #260