My First Custom Component

Hi All,

I am currently trying to write my first custom component. Luckily I found a similar project from @dries007. This one I was able to customize :slight_smile:

My first step was to get a light.py running. The nice thing is that it already works to create and control an ON/OFF lamp. :confetti_ball: :tada:

Now I’m trying to rewrite the whole thing into the dimmable version.
Unfortunately I have the problem that the lamp is always displayed in the HA interface only as ON/OFF lamp (I would rather expect a slider here).
If I then switch on the lamp via the HA interface is in the log only a 0 for the brightness to read…

Does anyone have an idea what I am missing here ?
Thanks for your help :slight_smile:

For info I am running on a Pi4 with HA OS :slight_smile:
Cheers
Marvin

here once the log when switching on the lamp:

2023-04-28 20:34:49.110 INFO (MainThread) [anlox_can] Argumente brightness: 0
2023-04-28 20:34:50.444 INFO (MainThread) [anlox_can] Argumente turn_on: {}

And here the code of my light.py

import asyncio
import logging
from typing import Any

import can
#from homeassistant.components.light import LightEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_LIGHTS, CONF_NAME, EVENT_HOMEASSISTANT_STOP
from homeassistant.components.light import (ATTR_BRIGHTNESS, PLATFORM_SCHEMA, LightEntity)
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback

from .const import *


_LOGGER = logging.getLogger(DOMAIN)
_LOGGER.setLevel(logging.DEBUG)


async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities: AddEntitiesCallback):
    data = hass.data[DOMAIN][config_entry.entry_id]
    _LOGGER.debug('async_setup_entry %r %r', data, config_entry)

    # Load CAN bus. Must be operational already (done by external network tool).
    # Setting bitrate might work, but ideally that should also be set already.
    # We only care about messages related to feedback from a SET command or the reply from a GET command.
    bus = can.Bus(bustype=data[CONF_INTERFACE], channel=data[CONF_CHANNEL], bitrate=125000, receive_own_messages=True, can_filters=[
        {"can_id": 0x1, "can_mask": 0x700, "extended": False},  # Reply
        {"can_id": 0x100, "can_mask": 0x700, "extended": False},  # Reply
    ])

    # Global CAN bus lock, required since the reply to a GET does not include any differentiator.
    # This means we must lock, then send out a GET request.
    # The reply will then only be acked by the entity that holds the lock.
    # I don't like this, it smells, but it works and IDK how to do it better.
    lock = asyncio.Lock()

    entities = [anloxLight(bus, e, lock, config_entry.entry_id) for e in data[CONF_LIGHTS]]
    notifier = can.Notifier(bus, [e.on_can_message_received for e in entities], loop=asyncio.get_running_loop())
    async_add_entities(entities)

    @callback
    def stop(event):
        notifier.stop()
        bus.shutdown()

    hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop)


# noinspection PyAbstractClass
class anloxLight(LightEntity):
    _attr_has_entity_name = True
    _attr_name = None

    @property
    def device_info(self):
        return {
            "identifiers": {
                (DOMAIN, self.unique_id)
            },
            "name": self._name,
        }

    def __init__(self, bus: can.BusABC, o: dict, lock: asyncio.Lock, prefix: str= None): 
        self._bus = bus
        self._lock = lock
        # Unpack some config values
        self._name: str = o[CONF_NAME]
        self._module = o[CONF_MODULE]
        self._relay = o[CONF_RELAY]
        # Prepare fixed ids & payloads
        self._set_id: int = self._module
        self._bytes_off: bytes = bytes((1, self._relay, 0x00, 0x00))
        self._bytes_on: bytes = bytes((1, self._relay, 0xFF, 0xFF))
        self._bytes_status: bytes = bytes((3, self._relay))
        # Internal state of light
        self._is_on: bool = False
        self._brightness = 0 #None
        #self._light.brightness = None
        self._MSB = 0 #>> 8
        self._LSB = 0 #& 0b11111111
        self._bytes_dimmvalue: bytes = bytes((1, self._relay, self._MSB, self._LSB)) #wie es soll
        #self._bytes_dimmvalue: bytes = bytes((1, self._relay, self._brightness, self._brightness)) #test
        #self._dimmvalue = self._MSB, self._LSB
        # Internals to do locking & support GET operation
        self._awaiting_update = False
        self._event_update = asyncio.Event()
        # Logger
        self._log = _LOGGER.getChild(self._name)
        self._attr_unique_id = f"anlox.{prefix}.{self._module}.{self._relay}"

    @property
    def is_on(self) -> bool:
        return self._is_on
    
    @is_on.setter
    def is_on(self, value: bool):
        self._is_on = value
        # This call makes HA update the internal state after getting an update via CAN.
        self.hass.async_add_job(self.async_update_ha_state)

    @property
    def brightness(self):
    
        """Return the brightness of the light.
        This method is optional. Removing it indicates to Home Assistant
        that brightness is not supported for this light.
        """
        _LOGGER.info("Argumente brightness: %i", self._brightness)
        return self._brightness
    
    @brightness.setter
    def brightness(self, value: bool):
        _LOGGER.info(f"Argumente brightnesssetter : ")
        self._attr_brightness = value
        # This call makes HA update the internal state after getting an update via CAN.
        self.hass.async_add_job(self.async_update_ha_state)

    @property
    def unique_id(self) -> str:
        return self._attr_unique_id
    

    async def async_turn_on(self, **kwargs: Any) -> None:
        _LOGGER.info(f"Argumente turn_on: {kwargs}")
        self.brightness = kwargs.get(ATTR_BRIGHTNESS, 255) * 257
        self._MSB = (self.brightness >> 8)
        self._LSB = (self.brightness & 0x11111111)

        self._bus.send(can.Message(arbitration_id=self._set_id, data=self._bytes_dimmvalue, is_extended_id=False), timeout=.1)
        #_LOGGER.info("Argumente turn_on; MSB: %i LSB: %i \n dimmvalue: %i", self._MSB, self._MSB, self._dimmvalue)

        #self._bus.send(can.Message(arbitration_id=self._set_id, data=self._bytes_on, is_extended_id=False), timeout=.1) #on/off

    async def async_turn_off(self, **kwargs: Any) -> None:
        self._bus.send(can.Message(arbitration_id=self._set_id, data=self._bytes_off, is_extended_id=False), timeout=.1)

    def on_can_message_received(self, msg: can.Message):
        # Reply to SET, this we can filter because data contains data from.
        if msg.arbitration_id == 0x2 and msg.data[0] == 2 and msg.data[1] == self._relay:
            self.is_on = msg.data[2] != 0x00 or msg.data[3] != 0x00
        # Reply to GET, this we can only filter by _knowing_ that we are waiting on an update.
        if msg.arbitration_id == 0x2 and self._awaiting_update:
            self.is_on = msg.data[2] != 0x00 or msg.data[3] != 0x00
        self._event_update.set()

    async def async_update(self):
        # The update cycle must be blocked on the CAN bus lock.
        async with self._lock:
            try:
                # Inform handler that we expect an update.
                self._awaiting_update = True
                # Small delay, otherwise we overload the CAN module.
                await asyncio.sleep(.01)
                # Ask CAN module for an update
                self._bus.send(can.Message(arbitration_id=self._set_id, data=self._bytes_status, is_extended_id=False), timeout=.1)
                # Wait for reply to come
                await asyncio.wait_for(self._event_update.wait(), 0.5)
                # Small delay, otherwise we overload the CAN module.
                await asyncio.sleep(.01)
            finally:
                # In all cases, no matter how we get out of this, we must unset the _awaiting_update flag.
                self._awaiting_update = False

Please, let me know if you have any success creating unit tests.

I have a forgotten topic about this subject.

in the meantime I made it work that I got the correct dimvalue out. But still the light only appears as an on/off light…
What do I have to do so that I can adjust the brightness via a slider?
Thank you :slight_smile:

import asyncio
import logging
from typing import Any

import can
#from homeassistant.components.light import LightEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_LIGHTS, CONF_NAME, EVENT_HOMEASSISTANT_STOP
from homeassistant.components.light import (ATTR_BRIGHTNESS, PLATFORM_SCHEMA, LightEntity, SUPPORT_BRIGHTNESS,)
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback

from .const import *


_LOGGER = logging.getLogger(DOMAIN)
_LOGGER.setLevel(logging.DEBUG)

#SUPPORT_anlox_can_led = SUPPORT_BRIGHTNESS


async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities: AddEntitiesCallback):
    data = hass.data[DOMAIN][config_entry.entry_id]
    _LOGGER.debug('async_setup_entry %r %r', data, config_entry)

    # Load CAN bus. Must be operational already (done by external network tool).
    # Setting bitrate might work, but ideally that should also be set already.
    # We only care about messages related to feedback from a SET command or the reply from a GET command.
    bus = can.Bus(bustype=data[CONF_INTERFACE], channel=data[CONF_CHANNEL], bitrate=125000, receive_own_messages=True, can_filters=[
        {"can_id": 0x1, "can_mask": 0x700, "extended": False},  # Reply
        {"can_id": 0x100, "can_mask": 0x700, "extended": False},  # Reply
    ])

    # Global CAN bus lock, required since the reply to a GET does not include any differentiator.
    # This means we must lock, then send out a GET request.
    # The reply will then only be acked by the entity that holds the lock.
    # I don't like this, it smells, but it works and IDK how to do it better.
    lock = asyncio.Lock()

    entities = [anloxLight(bus, e, lock, config_entry.entry_id) for e in data[CONF_LIGHTS]]
    notifier = can.Notifier(bus, [e.on_can_message_received for e in entities], loop=asyncio.get_running_loop())
    async_add_entities(entities)

    @callback
    def stop(event):
        notifier.stop()
        bus.shutdown()

    hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop)


# noinspection PyAbstractClass
class anloxLight(LightEntity):
    _attr_has_entity_name = True
    _attr_name = None

    @property
    def device_info(self):
        return {
            "identifiers": {
                (DOMAIN, self.unique_id)
            },
            "name": self._name,
        }

    def __init__(self, bus: can.BusABC, o: dict, lock: asyncio.Lock, prefix: str= None): 
        self._bus = bus
        self._lock = lock
        # Unpack some config values
        self._name: str = o[CONF_NAME]
        self._module = o[CONF_MODULE]
        self._relay = o[CONF_RELAY]
        # Prepare fixed ids & payloads
        self._set_id: int = self._module
        self._bytes_off: bytes = bytes((1, self._relay, 0x00, 0x00))
        self._bytes_on: bytes = bytes((1, self._relay, 0xFF, 0xFF))
        self._bytes_status: bytes = bytes((3, self._relay))
        # Internal state of light
        self._is_on: bool = False
        self._brightness = 0
        #self._light.brightness = None
        self._MSB = 0
        self._LSB = 0 
        self._bytes_dimmvalue: bytes = bytes((1, self._relay, self._MSB, self._LSB)) #wie es soll
        # Internals to do locking & support GET operation
        self._awaiting_update = False
        self._event_update = asyncio.Event()
        # Logger
        self._log = _LOGGER.getChild(self._name)
        self._attr_unique_id = f"anlox.{prefix}.{self._module}.{self._relay}"

    @property
    def is_on(self) -> bool:
        return self._is_on
    
    @is_on.setter
    def is_on(self, value: bool):
        self._is_on = value
        # This call makes HA update the internal state after getting an update via CAN.
        self.hass.async_add_job(self.async_update_ha_state)

    @property
    def brightness(self):
    
        """Return the brightness of the light.
        This method is optional. Removing it indicates to Home Assistant
        that brightness is not supported for this light.
        """
        _LOGGER.info("Argumente brightness: %i", self._brightness)
        self.async_write_ha_state()
        return self._brightness
    
    @property
    def unique_id(self) -> str:
        return self._attr_unique_id
    

    async def async_turn_on(self, **kwargs: Any) -> None:
        self._brightness = kwargs.get(ATTR_BRIGHTNESS, 255) * 257
        self._MSB = (self._brightness >> 8)
        self._LSB = (self._brightness & 0b11111111) 
        self._bytes_dimmvalue: bytes = bytes((1, self._relay, self._MSB, self._LSB))
        self._bus.send(can.Message(arbitration_id=self._set_id, data=self._bytes_dimmvalue, is_extended_id=False), timeout=.1)
        _LOGGER.info(f"Argumente turn_on: {kwargs}")
        _LOGGER.info("Argumente MSB: %i", self._MSB)
        _LOGGER.info("Argumente LSB: %i", self._LSB)
        _LOGGER.info("Argumente dimmvalue: %i", self._bytes_dimmvalue)

    async def async_turn_off(self, **kwargs: Any) -> None:
        self._bus.send(can.Message(arbitration_id=self._set_id, data=self._bytes_off, is_extended_id=False), timeout=.1)

    def on_can_message_received(self, msg: can.Message):
        # Reply to SET, this we can filter because data contains data from.
        if msg.arbitration_id == 0x2 and msg.data[0] == 2 and msg.data[1] == self._relay:
            self.is_on = msg.data[2] != 0x00 or msg.data[3] != 0x00
        # Reply to GET, this we can only filter by _knowing_ that we are waiting on an update.
        if msg.arbitration_id == 0x2 and self._awaiting_update:
            self.is_on = msg.data[2] != 0x00 or msg.data[3] != 0x00
        self._event_update.set()

    async def async_update(self):
        # The update cycle must be blocked on the CAN bus lock.
        async with self._lock:
            try:
                # Inform handler that we expect an update.
                self._awaiting_update = True
                # Small delay, otherwise we overload the CAN module.
                await asyncio.sleep(.01)
                # Ask CAN module for an update
                self._bus.send(can.Message(arbitration_id=self._set_id, data=self._bytes_status, is_extended_id=False), timeout=.1)
                # Wait for reply to come
                await asyncio.wait_for(self._event_update.wait(), 0.5)
                # Small delay, otherwise we overload the CAN module.
                await asyncio.sleep(.01)
            finally:
                # In all cases, no matter how we get out of this, we must unset the _awaiting_update flag.
                self._awaiting_update = False

Hi All,

I have done some more research. I think I have been able to locate the error.
I guess that I still have to define the Color Mode so that HA knows that it is a dimmable lamp.
Unfortunately, I do not understand the documentation correctly. Can anyone tell me how to add the correct color mode here?

My Supporten Colormode:

  • If SUPPORT_BRIGHTNESS is set and no color modes have yet been added, add ColorMode.BRIGHTNESS

Thanks for your help.
Best
Marvin