Nope. After a closer look I found some issues with the original PR. So I was working on a few fixes and some improvements (at least I think so added state restore upon hass restart). Can you give it a try and let me know? hereās a new homeassistant/components/binary_sensor/zha.py
which I was working on. You still are going to need the __init__.py
and const.py
from my previous post.
"""
Binary sensors on Zigbee Home Automation networks.
For more details on this platform, please refer to the documentation
at https://home-assistant.io/components/binary_sensor.zha/
"""
import logging
import time
from enum import IntEnum
from homeassistant.components.binary_sensor import DOMAIN, BinarySensorDevice
from homeassistant.components import zha
from homeassistant.const import STATE_OFF
from homeassistant.helpers.event import async_track_time_change
from homeassistant.helpers.restore_state import async_get_last_state
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['zha']
# ZigBee Cluster Library Zone Type to Home Assistant device class
CLASS_MAPPING = {
0x000d: 'motion',
0x0015: 'opening',
0x0028: 'smoke',
0x002a: 'moisture',
0x002b: 'gas',
0x002d: 'vibration',
}
MOVING_STEP = 15
class OnOffCommands(IntEnum):
OFF = 0x00
ON = 0x01
TOGGLE = 0x02
OFF_WITH_EFFECT = 0x40
ON_WITH_RECALL_GLOBAL_SCENE = 0x41
ON_WITH_TIMED_OFF = 0x42
class LevelCommands(IntEnum):
MOVE_TO_LEVEL = 0x00
MOVE = 0x01
STEP = 0x02
STOP = 0x03
MOVE_TO_LEVEL_WITH_ON_OFF = 0x04
MOVE_WITH_ON_OFF = 0x05
STEP_WITH_ON_OFF = 0x06
STOP_WITH_ON_OFF = 0x07
async def async_setup_platform(hass, config, async_add_devices,
discovery_info=None):
"""Set up the Zigbee Home Automation binary sensors."""
discovery_info = zha.get_discovery_info(hass, discovery_info)
if discovery_info is None:
return
from zigpy.zcl.clusters.general import OnOff
from zigpy.zcl.clusters.security import IasZone
if IasZone.cluster_id in discovery_info['in_clusters']:
await _async_setup_iaszone(hass, config, async_add_devices,
discovery_info)
elif OnOff.cluster_id in discovery_info['out_clusters']:
await _async_setup_remote(hass, config, async_add_devices,
discovery_info)
async def _async_setup_iaszone(hass, config, async_add_devices,
discovery_info=None):
device_class = None
from zigpy.zcl.clusters.security import IasZone
cluster = discovery_info['in_clusters'][IasZone.cluster_id]
if discovery_info['new_join']:
await cluster.bind()
ieee = cluster.endpoint.device.application.ieee
await cluster.write_attributes({'cie_addr': ieee})
try:
zone_type = await cluster['zone_type']
device_class = CLASS_MAPPING.get(zone_type, None)
except Exception: # pylint: disable=broad-except
# If we fail to read from the device, use a non-specific class
pass
sensor = BinarySensor(device_class, **discovery_info)
async_add_devices([sensor], update_before_add=True)
async def _async_setup_remote(hass, config, async_add_devices, discovery_info):
async def safe(coro):
"""Run coro, catching ZigBee delivery errors, and ignoring them."""
import zigpy.exceptions
try:
await coro
except zigpy.exceptions.DeliveryError as exc:
_LOGGER.info("Ignoring error during setup: %s", exc)
if discovery_info['new_join']:
from zigpy.zcl.clusters.general import OnOff, LevelControl
out_clusters = discovery_info['out_clusters']
if OnOff.cluster_id in out_clusters:
cluster = out_clusters[OnOff.cluster_id]
await safe(cluster.bind())
await safe(cluster.configure_reporting(0, 0, 600, 1))
if LevelControl.cluster_id in out_clusters:
cluster = out_clusters[LevelControl.cluster_id]
await safe(cluster.bind())
await safe(cluster.configure_reporting(0, 1, 600, 1))
sensor = Switch(**discovery_info)
async_add_devices([sensor])
class BinarySensor(zha.Entity, BinarySensorDevice):
"""The ZHA Binary Sensor."""
_domain = DOMAIN
def __init__(self, device_class, **kwargs):
"""Initialize the ZHA binary sensor."""
super().__init__(**kwargs)
self._device_class = device_class
from zigpy.zcl.clusters.security import IasZone
self._ias_zone_cluster = self._in_clusters[IasZone.cluster_id]
@property
def should_poll(self) -> bool:
"""Let zha handle polling."""
return False
@property
def is_on(self) -> bool:
"""Return True if entity is on."""
if self._state == 'unknown':
return False
return bool(self._state)
@property
def device_class(self):
"""Return the class of this device, from component DEVICE_CLASSES."""
return self._device_class
def cluster_command(self, tsn, command_id, args):
"""Handle commands received to this cluster."""
if command_id == 0:
self._state = args[0] & 3
_LOGGER.debug("Updated alarm state: %s", self._state)
self.async_schedule_update_ha_state()
elif command_id == 1:
_LOGGER.debug("Enroll requested")
res = self._ias_zone_cluster.enroll_response(0, 0)
self.hass.async_add_job(res)
async def async_update(self):
"""Retrieve latest state."""
from bellows.types.basic import uint16_t
result = await zha.safe_read(self._endpoint.ias_zone,
['zone_status'])
state = result.get('zone_status', self._state)
if isinstance(state, (int, uint16_t)):
self._state = result.get('zone_status', self._state) & 3
class Switch(zha.Entity, BinarySensorDevice):
"""ZHA switch/remote controller/button."""
_domain = DOMAIN
class OnOffListener:
"""Listener for the OnOff ZigBee cluster."""
def __init__(self, entity):
"""Initialize OnOffListener."""
self._entity = entity
self._state = True
def cluster_command(self, tsn, command_id, args):
"""Handle commands received to this cluster."""
cmds = OnOffCommands
_LOGGER.debug('{}: rx {} command with {} args on OnOff cluster'.
format(self._entity.entity_id, command_id, args))
if command_id in (cmds.OFF, cmds.OFF_WITH_EFFECT):
self.set_state(False)
elif command_id in (cmds.ON, cmds.ON_WITH_RECALL_GLOBAL_SCENE,
cmds.ON_WITH_TIMED_OFF):
self.set_state(True)
elif command_id == cmds.TOGGLE:
self.toggle()
def attribute_updated(self, attrid, value):
"""Handle attribute updates on this cluster."""
_LOGGER.debug('{}: reports {}/{} attr_id/value on OnOff cluster'.
format(self._entity.entity_id, attrid, value))
if attrid == 0:
self._state = value
self._entity.schedule_update_ha_state()
@property
def is_on(self) -> bool:
"""state of the switch"""
return self._state
def set_state(self, state):
"""Set the state."""
self._state = state
if self._state and not bool(self._entity.level.level):
self._entity.level.set_level(1, True)
self._entity.schedule_update_ha_state()
def toggle(self):
"""Toggle the state"""
self._state = not self._state
if self._state and not bool(self._entity.level.level):
self._entity.level.set_level(1, True)
self._entity.schedule_update_ha_state()
class LevelListener:
"""Listener for the LevelControl ZigBee cluster."""
class LvlChange:
"""ammount of Level change"""
def __init__(self, direction=0, rate=0, delta=MOVING_STEP,
start_moving=True):
self._direction = direction
if direction == 0x00:
self._delta = delta
else:
self._delta = -delta
self.still_moving = start_moving
if start_moving:
self.when_started = time.time()
self._rate = rate
@property
def delta(self):
"""amount of level change"""
return self._delta
@property
def delta_since_start(self):
"""return amount of change since start of level move"""
dt = time.time() - self.when_started
return int(dt * self.delta)
def stop(self):
"""Stop moving and returns delta since the start"""
self.still_moving = False
return self.delta_since_start
def __radd__(self, other):
return other + self._delta
def __rmul__(self, other):
return other * self._delta
def __init__(self, entity):
"""Initialize LevelListener."""
self._entity = entity
self._level = 255
self._orig_level = self._level
self.delta = self.LvlChange(start_moving=False)
def cluster_command(self, tsn, command_id, args):
"""Handle commands received to this cluster."""
_LOGGER.debug('{}: rx {} command with {} args on Level cluster'.
format(self._entity.entity_id, command_id, args))
if command_id == LevelCommands.MOVE_TO_LEVEL: # move_to_level
self.set_level(args[0])
elif command_id == LevelCommands.MOVE: # move
# We should dim slowly -- for now, just step once
self.delta = self.LvlChange(*args)
self._orig_level = self._level
self.move_level(self.delta)
elif command_id == LevelCommands.STEP: # step
# Step (technically shouldn't change on/off)
delta = self.LvlChange(*args)
self.move_level(delta)
elif command_id in (LevelCommands.STOP,
LevelCommands.STOP_WITH_ON_OFF):
stop_delta = self.delta.stop()
self.set_level(self._orig_level + stop_delta)
elif command_id == LevelCommands.MOVE_TO_LEVEL_WITH_ON_OFF:
# move_to_level_with_on_off
self.set_level(args[0], True)
elif command_id == LevelCommands.MOVE_WITH_ON_OFF: # move_with_on_off
# We should dim slowly -- for now, just step once
self.delta = self.LvlChange(*args)
self._orig_level = self._level
self.move_level(self.delta, True)
elif command_id == LevelCommands.STEP_WITH_ON_OFF: # step_with_on_off
delta = self.LvlChange(*args)
self.move_level(delta, True)
def attribute_update(self, attrid, value):
"""Handle attribute updates on this cluster."""
_LOGGER.debug('{}: reports {}/{} attr_id/value on level cluster'.
format(self._entity.entity_id, attrid, value))
if attrid == 0:
self.set_level(value)
@property
def level(self):
"""current level"""
return self._level
def move_level(self, change, onoff=False):
"""Increment the level, setting state if appropriate."""
self._level = min(255, max(0, self._level + change))
if onoff:
self._entity.onoff.set_state(bool(self._level))
self._entity.schedule_update_ha_state()
def set_level(self, level, onoff=False):
"""Set the level, setting state if appropriate."""
self._level = min(255, max(0, level))
if onoff:
self._entity.onoff.set_state(bool(self._level))
self._entity.schedule_update_ha_state()
async def async_time_interval(self, tracked_time):
"""if button is held, estimate level change"""
if not self.delta.still_moving:
return
delta = self.delta.delta_since_start
self.set_level(self._level + delta)
async def async_added_to_hass(self):
"""Run when about to be added to Hass"""
# Check if we have an old state
old_state = await async_get_last_state(self.hass, self.entity_id)
if old_state is not None:
_LOGGER.debug('{}: restoring old state {}'.format(self.entity_id,
old_state))
if old_state.attributes.get('level') is not None:
self.level.set_level(old_state.attributes['level'])
if old_state.state == STATE_OFF:
self.onoff.set_state(False)
async_track_time_change(self.hass, self.level.async_time_interval)
def __init__(self, **kwargs):
"""Initialize Switch."""
self.onoff = self.OnOffListener(self)
self.level = self.LevelListener(self)
from zigpy.zcl.clusters import general
self._out_listeners = {
general.OnOff.cluster_id: self.onoff,
general.LevelControl.cluster_id: self.level,
}
super().__init__(**kwargs)
@property
def is_on(self) -> bool:
"""Return true if the binary sensor is on."""
return self.onoff.is_on and bool(self.level.level)
@property
def device_state_attributes(self):
"""Return the device state attributes."""
attrs = {}
if self.is_on:
attrs['level'] = self.level.level
return attrs
with the above binary_sensor
Iām using the following automation to control the lights. The idea is: triggered any time the state changes (since we want to catch level change) and if the state of the binary_switch is on then we turn on the lights to the level corresponding to binary_switch attributes[ālevelā]
- id: floor_light_switch_on
alias: Turn on floor lamp when switched on
trigger:
- platform: state
entity_id: binary_sensor.dimming_switch
condition:
- condition: state
entity_id: binary_sensor.dimming_switch
state: 'on'
action:
- service: light.turn_on
data_template:
entity_id: light.floor_lamp
brightness: "{{ trigger.to_state.attributes.level }}"
- id: light_switch_off
alias: Turn off floor lamp when switched off
trigger:
- platform: state
entity_id: binary_sensor.dimming_switch
to: 'off'
action:
- service: light.turn_off
entity_id:
- light.floor_lamp
** edit: fixed set level to stay within 0ā¦255
*** edit: added automation to link binary_switch level change to light control
**** edit: update to turn switch off when level is 0
PS: Iām just starting with python so I understand some things iām doing could be very stupid. Any constructive dialog is welcome