Hi All,
I am currently trying to write my first custom component. Luckily I found a similar project from @dries007. This one I was able to customize
My first step was to get a light.py running. The nice thing is that it already works to create and control an ON/OFF lamp.
Now I’m trying to rewrite the whole thing into the dimmable version.
Unfortunately I have the problem that the lamp is always displayed in the HA interface only as ON/OFF lamp (I would rather expect a slider here).
If I then switch on the lamp via the HA interface is in the log only a 0 for the brightness to read…
Does anyone have an idea what I am missing here ?
Thanks for your help
For info I am running on a Pi4 with HA OS
Cheers
Marvin
here once the log when switching on the lamp:
2023-04-28 20:34:49.110 INFO (MainThread) [anlox_can] Argumente brightness: 0
2023-04-28 20:34:50.444 INFO (MainThread) [anlox_can] Argumente turn_on: {}
And here the code of my light.py
import asyncio
import logging
from typing import Any
import can
#from homeassistant.components.light import LightEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_LIGHTS, CONF_NAME, EVENT_HOMEASSISTANT_STOP
from homeassistant.components.light import (ATTR_BRIGHTNESS, PLATFORM_SCHEMA, LightEntity)
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import *
_LOGGER = logging.getLogger(DOMAIN)
_LOGGER.setLevel(logging.DEBUG)
async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities: AddEntitiesCallback):
data = hass.data[DOMAIN][config_entry.entry_id]
_LOGGER.debug('async_setup_entry %r %r', data, config_entry)
# Load CAN bus. Must be operational already (done by external network tool).
# Setting bitrate might work, but ideally that should also be set already.
# We only care about messages related to feedback from a SET command or the reply from a GET command.
bus = can.Bus(bustype=data[CONF_INTERFACE], channel=data[CONF_CHANNEL], bitrate=125000, receive_own_messages=True, can_filters=[
{"can_id": 0x1, "can_mask": 0x700, "extended": False}, # Reply
{"can_id": 0x100, "can_mask": 0x700, "extended": False}, # Reply
])
# Global CAN bus lock, required since the reply to a GET does not include any differentiator.
# This means we must lock, then send out a GET request.
# The reply will then only be acked by the entity that holds the lock.
# I don't like this, it smells, but it works and IDK how to do it better.
lock = asyncio.Lock()
entities = [anloxLight(bus, e, lock, config_entry.entry_id) for e in data[CONF_LIGHTS]]
notifier = can.Notifier(bus, [e.on_can_message_received for e in entities], loop=asyncio.get_running_loop())
async_add_entities(entities)
@callback
def stop(event):
notifier.stop()
bus.shutdown()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop)
# noinspection PyAbstractClass
class anloxLight(LightEntity):
_attr_has_entity_name = True
_attr_name = None
@property
def device_info(self):
return {
"identifiers": {
(DOMAIN, self.unique_id)
},
"name": self._name,
}
def __init__(self, bus: can.BusABC, o: dict, lock: asyncio.Lock, prefix: str= None):
self._bus = bus
self._lock = lock
# Unpack some config values
self._name: str = o[CONF_NAME]
self._module = o[CONF_MODULE]
self._relay = o[CONF_RELAY]
# Prepare fixed ids & payloads
self._set_id: int = self._module
self._bytes_off: bytes = bytes((1, self._relay, 0x00, 0x00))
self._bytes_on: bytes = bytes((1, self._relay, 0xFF, 0xFF))
self._bytes_status: bytes = bytes((3, self._relay))
# Internal state of light
self._is_on: bool = False
self._brightness = 0 #None
#self._light.brightness = None
self._MSB = 0 #>> 8
self._LSB = 0 #& 0b11111111
self._bytes_dimmvalue: bytes = bytes((1, self._relay, self._MSB, self._LSB)) #wie es soll
#self._bytes_dimmvalue: bytes = bytes((1, self._relay, self._brightness, self._brightness)) #test
#self._dimmvalue = self._MSB, self._LSB
# Internals to do locking & support GET operation
self._awaiting_update = False
self._event_update = asyncio.Event()
# Logger
self._log = _LOGGER.getChild(self._name)
self._attr_unique_id = f"anlox.{prefix}.{self._module}.{self._relay}"
@property
def is_on(self) -> bool:
return self._is_on
@is_on.setter
def is_on(self, value: bool):
self._is_on = value
# This call makes HA update the internal state after getting an update via CAN.
self.hass.async_add_job(self.async_update_ha_state)
@property
def brightness(self):
"""Return the brightness of the light.
This method is optional. Removing it indicates to Home Assistant
that brightness is not supported for this light.
"""
_LOGGER.info("Argumente brightness: %i", self._brightness)
return self._brightness
@brightness.setter
def brightness(self, value: bool):
_LOGGER.info(f"Argumente brightnesssetter : ")
self._attr_brightness = value
# This call makes HA update the internal state after getting an update via CAN.
self.hass.async_add_job(self.async_update_ha_state)
@property
def unique_id(self) -> str:
return self._attr_unique_id
async def async_turn_on(self, **kwargs: Any) -> None:
_LOGGER.info(f"Argumente turn_on: {kwargs}")
self.brightness = kwargs.get(ATTR_BRIGHTNESS, 255) * 257
self._MSB = (self.brightness >> 8)
self._LSB = (self.brightness & 0x11111111)
self._bus.send(can.Message(arbitration_id=self._set_id, data=self._bytes_dimmvalue, is_extended_id=False), timeout=.1)
#_LOGGER.info("Argumente turn_on; MSB: %i LSB: %i \n dimmvalue: %i", self._MSB, self._MSB, self._dimmvalue)
#self._bus.send(can.Message(arbitration_id=self._set_id, data=self._bytes_on, is_extended_id=False), timeout=.1) #on/off
async def async_turn_off(self, **kwargs: Any) -> None:
self._bus.send(can.Message(arbitration_id=self._set_id, data=self._bytes_off, is_extended_id=False), timeout=.1)
def on_can_message_received(self, msg: can.Message):
# Reply to SET, this we can filter because data contains data from.
if msg.arbitration_id == 0x2 and msg.data[0] == 2 and msg.data[1] == self._relay:
self.is_on = msg.data[2] != 0x00 or msg.data[3] != 0x00
# Reply to GET, this we can only filter by _knowing_ that we are waiting on an update.
if msg.arbitration_id == 0x2 and self._awaiting_update:
self.is_on = msg.data[2] != 0x00 or msg.data[3] != 0x00
self._event_update.set()
async def async_update(self):
# The update cycle must be blocked on the CAN bus lock.
async with self._lock:
try:
# Inform handler that we expect an update.
self._awaiting_update = True
# Small delay, otherwise we overload the CAN module.
await asyncio.sleep(.01)
# Ask CAN module for an update
self._bus.send(can.Message(arbitration_id=self._set_id, data=self._bytes_status, is_extended_id=False), timeout=.1)
# Wait for reply to come
await asyncio.wait_for(self._event_update.wait(), 0.5)
# Small delay, otherwise we overload the CAN module.
await asyncio.sleep(.01)
finally:
# In all cases, no matter how we get out of this, we must unset the _awaiting_update flag.
self._awaiting_update = False