@winter I tried to get a push for that PR to merge but no luck will hopefully get battery reporting soon.
@Bigrob8181 man thats some awesome research! Hopefully we can get that merged in too
@winter I tried to get a push for that PR to merge but no luck will hopefully get battery reporting soon.
@Bigrob8181 man thats some awesome research! Hopefully we can get that merged in too
I got something good…
Hi,
I am just trying to move a few of my zigbee devices from my SmartThings hub over to HA. I first did my SmartThings brand outlet and it worked flawlessly. I then tried my Lightify dimmer button and based on the lights of the switch it appeared to pair but I couldn’t locate it in my entity list. Is there a log file or something I can look for for zigbee?
I also have the Hampton Bay ceiling fan controller (https://www.homedepot.com/p/Hampton-Bay-Universal-Wink-Enabled-White-Ceiling-Fan-Premier-Remote-Control-99432/206591100). I am hesitant to try moving it as I am guessing it won’t work. Has anyone tried it?
Thanks!
Edit: Here is the bellows output from the switch if that helps:
Device:
NWK: 0x0b38
IEEE: 84:18:26:00:00:e8:fa:90
Endpoints:
1: profile=0x104, device_type=DeviceType.LEVEL_CONTROL_SWITCH
Input Clusters:
Basic (0)
Power Configuration (1)
Identify (3)
Poll Control (32)
Temperature Measurement (1026)
Diagnostic (2821)
Output Clusters:
Identify (3)
On/Off (6)
Level control (8)
Ota (25)
for this switch to work, you need to have Pull Request #12528, which conflicts with current 0.67.1 I have tested it in my private branch and have it working, although currently i’m using it for just on/off controlling the lights through the automation.
Regarding the FAN control, I have not tried it personally, but Zigbee FAN pull request #12289 was merged into HA so I’d expect it to work
Thanks! What do you mean by it conflicts with 0.67.1? Does that mean if I am running that version (which I am), I can’t use it?
All I need is the on/off control so this would be perfect!
And then for the real noob question, how does one apply the pull request?
I mean since that pull request was created, the original source base have diverged and now changes made by RCloran don’t apply cleanly to the existing 0.67.1 source.
I won’t be able to explain in a single post how to apply a pull request, but can provide the “direction” to what to google. Pretty much it is all about “git” and mastering git. I can only recommend to start with some git tutorials and once you comprehend the basics, articles like best way to merge a github pull request start making sense I know, i’ve been there
i’ve tried a rebase of that request against 0.67.1 and if you are willing to experiment, you could try to replace the following files in your 0.67.1 hass installation, assuming you are not running a “docker” installation.
./homeassistant/components/binary_sensor/zha.py
"""
Binary sensors on Zigbee Home Automation networks.
For more details on this platform, please refer to the documentation
at https://home-assistant.io/components/binary_sensor.zha/
"""
import logging
from homeassistant.components.binary_sensor import DOMAIN, BinarySensorDevice
from homeassistant.components import zha
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['zha']
# ZigBee Cluster Library Zone Type to Home Assistant device class
CLASS_MAPPING = {
0x000d: 'motion',
0x0015: 'opening',
0x0028: 'smoke',
0x002a: 'moisture',
0x002b: 'gas',
0x002d: 'vibration',
}
async def async_setup_platform(hass, config, async_add_devices,
discovery_info=None):
"""Set up the Zigbee Home Automation binary sensors."""
discovery_info = zha.get_discovery_info(hass, discovery_info)
if discovery_info is None:
return
from zigpy.zcl.clusters.general import OnOff
from zigpy.zcl.clusters.security import IasZone
if IasZone.cluster_id in discovery_info['in_clusters']:
await _async_setup_iaszone(hass, config, async_add_devices,
discovery_info)
elif OnOff.cluster_id in discovery_info['out_clusters']:
await _async_setup_remote(hass, config, async_add_devices,
discovery_info)
async def _async_setup_iaszone(hass, config, async_add_devices, discovery_info=None):
device_class = None
from zigpy.zcl.clusters.security import IasZone
cluster = discovery_info['in_clusters'][IasZone.cluster_id]
if discovery_info['new_join']:
await cluster.bind()
ieee = cluster.endpoint.device.application.ieee
await cluster.write_attributes({'cie_addr': ieee})
try:
zone_type = await cluster['zone_type']
device_class = CLASS_MAPPING.get(zone_type, None)
except Exception: # pylint: disable=broad-except
# If we fail to read from the device, use a non-specific class
pass
sensor = BinarySensor(device_class, **discovery_info)
async_add_devices([sensor], update_before_add=True)
async def _async_setup_remote(hass, config, async_add_devices, discovery_info):
async def safe(coro):
"""Run coro, catching ZigBee delivery errors, and ignoring them."""
import zigpy.exceptions
try:
await coro
except zigpy.exceptions.DeliveryError as exc:
_LOGGER.info("Ignoring error during setup: %s", exc)
if discovery_info['new_join']:
from zigpy.zcl.clusters.general import OnOff, LevelControl
out_clusters = discovery_info['out_clusters']
if OnOff.cluster_id in out_clusters:
cluster = out_clusters[OnOff.cluster_id]
await safe(cluster.bind())
await safe(cluster.configure_reporting(0, 0, 600, 1))
if LevelControl.cluster_id in out_clusters:
cluster = out_clusters[LevelControl.cluster_id]
await safe(cluster.bind())
await safe(cluster.configure_reporting(0, 1, 600, 1))
sensor = Switch(**discovery_info)
async_add_devices([sensor])
class BinarySensor(zha.Entity, BinarySensorDevice):
"""The ZHA Binary Sensor."""
_domain = DOMAIN
def __init__(self, device_class, **kwargs):
"""Initialize the ZHA binary sensor."""
super().__init__(**kwargs)
self._device_class = device_class
from zigpy.zcl.clusters.security import IasZone
self._ias_zone_cluster = self._in_clusters[IasZone.cluster_id]
@property
def should_poll(self) -> bool:
"""Let zha handle polling."""
return False
@property
def is_on(self) -> bool:
"""Return True if entity is on."""
if self._state == 'unknown':
return False
return bool(self._state)
@property
def device_class(self):
"""Return the class of this device, from component DEVICE_CLASSES."""
return self._device_class
def cluster_command(self, tsn, command_id, args):
"""Handle commands received to this cluster."""
if command_id == 0:
self._state = args[0] & 3
_LOGGER.debug("Updated alarm state: %s", self._state)
self.async_schedule_update_ha_state()
elif command_id == 1:
_LOGGER.debug("Enroll requested")
res = self._ias_zone_cluster.enroll_response(0, 0)
self.hass.async_add_job(res)
async def async_update(self):
"""Retrieve latest state."""
from bellows.types.basic import uint16_t
result = await zha.safe_read(self._endpoint.ias_zone,
['zone_status'])
state = result.get('zone_status', self._state)
if isinstance(state, (int, uint16_t)):
self._state = result.get('zone_status', self._state) & 3
class Switch(zha.Entity, BinarySensorDevice):
"""ZHA switch/remote controller/button."""
_domain = DOMAIN
class OnOffListener:
"""Listener for the OnOff ZigBee cluster."""
def __init__(self, entity):
"""Initialize OnOffListener."""
self._entity = entity
def cluster_command(self, tsn, command_id, args):
"""Handle commands received to this cluster."""
if command_id in (0x0000, 0x0040):
self._entity._set_state(False)
elif command_id in (0x0001, 0x0041, 0x0042):
self._entity._set_state(True)
elif command_id == 0x0002:
self._entity._set_state(not self._entity._state)
def attribute_updated(self, attrid, value):
"""Handle attribute updates on this cluster."""
if attrid == 0:
self._entity._state = value
self._entity.schedule_update_ha_state()
class LevelListener:
"""Listener for the LevelControl ZigBee cluster."""
def __init__(self, entity):
"""Initialize LevelListener."""
self._entity = entity
def cluster_command(self, tsn, command_id, args):
"""Handle commands received to this cluster."""
if command_id == 0x0001: # move_to_level
self._entity._set_level(args[0])
elif command_id == 0x0002: # step
# Step (technically shouldn't change on/off)
if args[0] == 0:
self._entity._move_level(args[1])
else:
self._entity._move_level(-args[1])
elif command_id == 0x0004: # move_to_level_with_on_off
self._entity._set_level(args[0])
elif command_id == 0x0005: # move_with_on_off
# We should dim slowly -- for now, just step once
if args[0] == 0:
self._entity._move_level(10)
else:
self._entity._move_level(-10)
def attribute_update(self, attrid, value):
"""Handle attribute updates on this cluster."""
if attrid == 0:
self._entity._set_level(value)
def __init__(self, **kwargs):
"""Initialize Switch."""
self._state = True
self._level = 255
from zigpy.zcl.clusters import general
self._out_listeners = {
general.OnOff.cluster_id: self.OnOffListener(self),
general.LevelControl.cluster_id: self.LevelListener(self),
}
super().__init__(**kwargs)
@property
def is_on(self) -> bool:
"""Return true if the binary sensor is on."""
return self._state
@property
def device_state_attributes(self):
"""Return the device state attributes."""
return {'level': self._state and self._level or 0}
def _move_level(self, change):
"""Increment the level, setting state if appropriate."""
if not self._state and change > 0:
self._level = 0
self._level += min(255, max(0, self._level + change))
self._state = bool(self._level)
self.schedule_update_ha_state()
def _set_level(self, level):
"""Set the level, setting state if appropriate."""
self._level = level
self._state = bool(self._level)
self.schedule_update_ha_state()
def _set_state(self, state):
"""Set the state."""
self._state = state
self.schedule_update_ha_state()
./homeassistant/components/zha/__init__.py
"""
Support for ZigBee Home Automation devices.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/zha/
"""
import collections
import enum
import logging
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant import const as ha_const
from homeassistant.helpers import discovery, entity
from homeassistant.util import slugify
REQUIREMENTS = [
'bellows==0.5.1',
'zigpy==0.0.3',
'zigpy-xbee==0.0.2',
]
DOMAIN = 'zha'
class RadioType(enum.Enum):
"""Possible options for radio type in config."""
ezsp = 'ezsp'
xbee = 'xbee'
CONF_BAUDRATE = 'baudrate'
CONF_DATABASE = 'database_path'
CONF_DEVICE_CONFIG = 'device_config'
CONF_RADIO_TYPE = 'radio_type'
CONF_USB_PATH = 'usb_path'
DATA_DEVICE_CONFIG = 'zha_device_config'
DEVICE_CONFIG_SCHEMA_ENTRY = vol.Schema({
vol.Optional(ha_const.CONF_TYPE): cv.string,
})
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Optional(CONF_RADIO_TYPE, default='ezsp'): cv.enum(RadioType),
CONF_USB_PATH: cv.string,
vol.Optional(CONF_BAUDRATE, default=57600): cv.positive_int,
CONF_DATABASE: cv.string,
vol.Optional(CONF_DEVICE_CONFIG, default={}):
vol.Schema({cv.string: DEVICE_CONFIG_SCHEMA_ENTRY}),
})
}, extra=vol.ALLOW_EXTRA)
ATTR_DURATION = 'duration'
ATTR_IEEE = 'ieee_address'
SERVICE_PERMIT = 'permit'
SERVICE_REMOVE = 'remove'
SERVICE_SCHEMAS = {
SERVICE_PERMIT: vol.Schema({
vol.Optional(ATTR_DURATION, default=60):
vol.All(vol.Coerce(int), vol.Range(1, 254)),
}),
SERVICE_REMOVE: vol.Schema({
vol.Required(ATTR_IEEE): cv.string,
}),
}
# ZigBee definitions
CENTICELSIUS = 'C-100'
# Key in hass.data dict containing discovery info
DISCOVERY_KEY = 'zha_discovery_info'
# Internal definitions
APPLICATION_CONTROLLER = None
_LOGGER = logging.getLogger(__name__)
async def async_setup(hass, config):
"""Set up ZHA.
Will automatically load components to support devices found on the network.
"""
global APPLICATION_CONTROLLER
usb_path = config[DOMAIN].get(CONF_USB_PATH)
baudrate = config[DOMAIN].get(CONF_BAUDRATE)
radio_type = config[DOMAIN].get(CONF_RADIO_TYPE)
if radio_type == RadioType.ezsp:
import bellows.ezsp
from bellows.zigbee.application import ControllerApplication
radio = bellows.ezsp.EZSP()
elif radio_type == RadioType.xbee:
import zigpy_xbee.api
from zigpy_xbee.zigbee.application import ControllerApplication
radio = zigpy_xbee.api.XBee()
await radio.connect(usb_path, baudrate)
database = config[DOMAIN].get(CONF_DATABASE)
APPLICATION_CONTROLLER = ControllerApplication(radio, database)
listener = ApplicationListener(hass, config)
APPLICATION_CONTROLLER.add_listener(listener)
await APPLICATION_CONTROLLER.startup(auto_form=True)
for device in APPLICATION_CONTROLLER.devices.values():
hass.async_add_job(listener.async_device_initialized(device, False))
async def permit(service):
"""Allow devices to join this network."""
duration = service.data.get(ATTR_DURATION)
_LOGGER.info("Permitting joins for %ss", duration)
await APPLICATION_CONTROLLER.permit(duration)
hass.services.async_register(DOMAIN, SERVICE_PERMIT, permit,
schema=SERVICE_SCHEMAS[SERVICE_PERMIT])
async def remove(service):
"""Remove a node from the network."""
from bellows.types import EmberEUI64, uint8_t
ieee = service.data.get(ATTR_IEEE)
ieee = EmberEUI64([uint8_t(p, base=16) for p in ieee.split(':')])
_LOGGER.info("Removing node %s", ieee)
await APPLICATION_CONTROLLER.remove(ieee)
hass.services.async_register(DOMAIN, SERVICE_REMOVE, remove,
schema=SERVICE_SCHEMAS[SERVICE_REMOVE])
return True
class ApplicationListener:
"""All handlers for events that happen on the ZigBee application."""
def __init__(self, hass, config):
"""Initialize the listener."""
self._hass = hass
self._config = config
self._device_registry = collections.defaultdict(list)
hass.data[DISCOVERY_KEY] = hass.data.get(DISCOVERY_KEY, {})
def device_joined(self, device):
"""Handle device joined.
At this point, no information about the device is known other than its
address
"""
# Wait for device_initialized, instead
pass
def device_initialized(self, device):
"""Handle device joined and basic information discovered."""
self._hass.async_add_job(self.async_device_initialized(device, True))
def device_left(self, device):
"""Handle device leaving the network."""
pass
def device_removed(self, device):
"""Handle device being removed from the network."""
for device_entity in self._device_registry[device.ieee]:
self._hass.async_add_job(device_entity.async_remove())
async def async_device_initialized(self, device, join):
"""Handle device joined and basic information discovered (async)."""
import zigpy.profiles
import homeassistant.components.zha.const as zha_const
zha_const.populate_data()
for endpoint_id, endpoint in device.endpoints.items():
if endpoint_id == 0: # ZDO
continue
discovered_info = await _discover_endpoint_info(endpoint)
component = None
profile_clusters = ([], [])
device_key = "{}-{}".format(device.ieee, endpoint_id)
node_config = self._config[DOMAIN][CONF_DEVICE_CONFIG].get(
device_key, {})
if endpoint.profile_id in zigpy.profiles.PROFILES:
profile = zigpy.profiles.PROFILES[endpoint.profile_id]
if zha_const.DEVICE_CLASS.get(endpoint.profile_id,
{}).get(endpoint.device_type,
None):
profile_clusters = profile.CLUSTERS[endpoint.device_type]
profile_info = zha_const.DEVICE_CLASS[endpoint.profile_id]
component = profile_info[endpoint.device_type]
if ha_const.CONF_TYPE in node_config:
component = node_config[ha_const.CONF_TYPE]
profile_clusters = zha_const.COMPONENT_CLUSTERS[component]
if component:
in_clusters = [endpoint.in_clusters[c]
for c in profile_clusters[0]
if c in endpoint.in_clusters]
out_clusters = [endpoint.out_clusters[c]
for c in profile_clusters[1]
if c in endpoint.out_clusters]
discovery_info = {
'application_listener': self,
'endpoint': endpoint,
'in_clusters': {c.cluster_id: c for c in in_clusters},
'out_clusters': {c.cluster_id: c for c in out_clusters},
'new_join': join,
'unique_id': device_key,
}
discovery_info.update(discovered_info)
self._hass.data[DISCOVERY_KEY][device_key] = discovery_info
await discovery.async_load_platform(
self._hass,
component,
DOMAIN,
{'discovery_key': device_key},
self._config,
)
async def attempt_single_cluster_device(cluster, profile_clusters,
device_classes, discovery_attr):
if cluster.cluster_id in profile_clusters:
return
if type(cluster) not in device_classes:
return
component = device_classes[type(cluster)]
cluster_key = "{}-{}".format(device_key, cluster_id)
discovery_info = {
'application_listener': self,
'endpoint': endpoint,
'in_clusters': {},
'out_clusters': {},
'new_join': join,
'unique_id': cluster_key,
'entity_suffix': '_{}'.format(cluster_id),
}
discovery_info[discovery_attr] = {cluster.cluster_id: cluster}
discovery_info.update(discovered_info)
self._hass.data[DISCOVERY_KEY][cluster_key] = discovery_info
await discovery.async_load_platform(
self._hass,
component,
DOMAIN,
{'discovery_key': cluster_key},
self._config,
)
for cluster_id, cluster in endpoint.in_clusters.items():
await attempt_single_cluster_device(
cluster,
profile_clusters[0],
zha_const.SINGLE_INPUT_CLUSTER_DEVICE_CLASS,
'in_clusters',
)
for cluster_id, cluster in endpoint.out_clusters.items():
await attempt_single_cluster_device(
cluster,
profile_clusters[1],
zha_const.SINGLE_OUTPUT_CLUSTER_DEVICE_CLASS,
'out_clusters',
)
def register_entity(self, ieee, entity_obj):
"""Record the creation of a hass entity associated with ieee."""
self._device_registry[ieee].append(entity_obj)
class Entity(entity.Entity):
"""A base class for ZHA entities."""
_domain = None # Must be overridden by subclasses
# Normally the entity itself is the listener. Base classes may set this to
# a dict of cluster ID -> listener to receive messages for specific
# clusters separately
_in_listeners = {}
_out_listeners = {}
def __init__(self, endpoint, in_clusters, out_clusters, manufacturer,
model, application_listener, unique_id, **kwargs):
"""Init ZHA entity."""
self._device_state_attributes = {}
ieee = endpoint.device.ieee
ieeetail = ''.join(['%02x' % (o, ) for o in ieee[-4:]])
if manufacturer and model is not None:
self.entity_id = "{}.{}_{}_{}_{}{}".format(
self._domain,
slugify(manufacturer),
slugify(model),
ieeetail,
endpoint.endpoint_id,
kwargs.get('entity_suffix', ''),
)
self._device_state_attributes['friendly_name'] = "{} {}".format(
manufacturer,
model,
)
else:
self.entity_id = "{}.zha_{}_{}{}".format(
self._domain,
ieeetail,
endpoint.endpoint_id,
kwargs.get('entity_suffix', ''),
)
for cluster_id, cluster in in_clusters.items():
cluster.add_listener(self._in_listeners.get(cluster_id, self))
for cluster_id, cluster in out_clusters.items():
cluster.add_listener(self._out_listeners.get(cluster_id, self))
self._endpoint = endpoint
self._in_clusters = in_clusters
self._out_clusters = out_clusters
self._state = ha_const.STATE_UNKNOWN
self._unique_id = unique_id
application_listener.register_entity(ieee, self)
@property
def unique_id(self) -> str:
"""Return a unique ID."""
return self._unique_id
@property
def device_state_attributes(self):
"""Return device specific state attributes."""
return self._device_state_attributes
def attribute_updated(self, attribute, value):
"""Handle an attribute updated on this cluster."""
pass
def zdo_command(self, tsn, command_id, args):
"""Handle a ZDO command received on this cluster."""
pass
async def _discover_endpoint_info(endpoint):
"""Find some basic information about an endpoint."""
extra_info = {
'manufacturer': None,
'model': None,
}
if 0 not in endpoint.in_clusters:
return extra_info
async def read(attributes):
"""Read attributes and update extra_info convenience function."""
result, _ = await endpoint.in_clusters[0].read_attributes(
attributes,
allow_cache=True,
)
extra_info.update(result)
await read(['manufacturer', 'model'])
if extra_info['manufacturer'] is None or extra_info['model'] is None:
# Some devices fail at returning multiple results. Attempt separately.
await read(['manufacturer'])
await read(['model'])
for key, value in extra_info.items():
if isinstance(value, bytes):
try:
extra_info[key] = value.decode('ascii').strip()
except UnicodeDecodeError:
# Unsure what the best behaviour here is. Unset the key?
pass
return extra_info
def get_discovery_info(hass, discovery_info):
"""Get the full discovery info for a device.
Some of the info that needs to be passed to platforms is not JSON
serializable, so it cannot be put in the discovery_info dictionary. This
component places that info we need to pass to the platform in hass.data,
and this function is a helper for platforms to retrieve the complete
discovery info.
"""
if discovery_info is None:
return
discovery_key = discovery_info.get('discovery_key', None)
all_discovery_info = hass.data.get(DISCOVERY_KEY, {})
return all_discovery_info.get(discovery_key, None)
async def safe_read(cluster, attributes):
"""Swallow all exceptions from network read.
If we throw during initialization, setup fails. Rather have an entity that
exists, but is in a maybe wrong state, than no entity. This method should
probably only be used during initialization.
"""
try:
result, _ = await cluster.read_attributes(
attributes,
allow_cache=False,
)
return result
except Exception: # pylint: disable=broad-except
return {}
./homeassistant/components/zha/const.py
"""All constants related to the ZHA component."""
DEVICE_CLASS = {}
SINGLE_INPUT_CLUSTER_DEVICE_CLASS = {}
SINGLE_OUTPUT_CLUSTER_DEVICE_CLASS = {}
COMPONENT_CLUSTERS = {}
def populate_data():
"""Populate data using constants from bellows.
These cannot be module level, as importing bellows must be done in a
in a function.
"""
from zigpy import zcl
from zigpy.profiles import PROFILES, zha, zll
DEVICE_CLASS[zha.PROFILE_ID] = {
zha.DeviceType.ON_OFF_SWITCH: 'binary_sensor',
zha.DeviceType.LEVEL_CONTROL_SWITCH: 'binary_sensor',
zha.DeviceType.REMOTE_CONTROL: 'binary_sensor',
zha.DeviceType.SMART_PLUG: 'switch',
zha.DeviceType.ON_OFF_LIGHT: 'light',
zha.DeviceType.DIMMABLE_LIGHT: 'light',
zha.DeviceType.COLOR_DIMMABLE_LIGHT: 'light',
zha.DeviceType.ON_OFF_LIGHT_SWITCH: 'binary_sensor',
zha.DeviceType.DIMMER_SWITCH: 'binary_sensor',
zha.DeviceType.COLOR_DIMMER_SWITCH: 'binary_sensor',
}
DEVICE_CLASS[zll.PROFILE_ID] = {
zll.DeviceType.ON_OFF_LIGHT: 'light',
zll.DeviceType.ON_OFF_PLUGIN_UNIT: 'switch',
zll.DeviceType.DIMMABLE_LIGHT: 'light',
zll.DeviceType.DIMMABLE_PLUGIN_UNIT: 'light',
zll.DeviceType.COLOR_LIGHT: 'light',
zll.DeviceType.EXTENDED_COLOR_LIGHT: 'light',
zll.DeviceType.COLOR_TEMPERATURE_LIGHT: 'light',
zll.DeviceType.COLOR_CONTROLLER: 'binary_sensor',
zll.DeviceType.COLOR_SCENE_CONTROLLER: 'binary_sensor',
zll.DeviceType.CONTROLLER: 'binary_sensor',
zll.DeviceType.SCENE_CONTROLLER: 'binary_sensor',
zll.DeviceType.ON_OFF_SENSOR: 'binary_sensor',
}
SINGLE_INPUT_CLUSTER_DEVICE_CLASS.update({
zcl.clusters.general.OnOff: 'switch',
zcl.clusters.measurement.RelativeHumidity: 'sensor',
zcl.clusters.measurement.TemperatureMeasurement: 'sensor',
zcl.clusters.security.IasZone: 'binary_sensor',
zcl.clusters.hvac.Fan: 'fan',
})
SINGLE_OUTPUT_CLUSTER_DEVICE_CLASS.update({
zcl.clusters.general.OnOff: 'binary_sensor',
})
# A map of hass components to all Zigbee clusters it could use
for profile_id, classes in DEVICE_CLASS.items():
profile = PROFILES[profile_id]
for device_type, component in classes.items():
if component not in COMPONENT_CLUSTERS:
COMPONENT_CLUSTERS[component] = (set(), set())
clusters = profile.CLUSTERS[device_type]
COMPONENT_CLUSTERS[component][0].update(clusters[0])
COMPONENT_CLUSTERS[component][1].update(clusters[1])
before you replace the files above, you need to make sure to exclude the existing switch from zigbee network, because there’s some configuration done to the device only what a device joins the network, so it won’t work if you just replace the files with device already existing in zigbee.db .
You can exclude the dimmer switch by calling “zha.remove” with {“ieee_address”: “84:18:26:00:00:e8:fa:90”} JSON data. Confirm the device was excluded with bellows devices -D ~/.homeassistant/zigbee.db
(adjust for your specific file path and zigbee database filename). After that, replaces the files above, start homeassistant, call service zha.permit and join the switch. If everything successful, you should get
Good luck! And yep, things may break, but this is what makes it more fun, isn’t it
Can Ikea Trådfri remotes be used alone as “physical buttons” to activate different automations? If I understand correctly, this would require them to be Zigbee binary sensors.
The temp is still not rounded on this sensor.
Any chance someone could take a look at it.
in components/sensor/zha.py the temperature sensor 1st rounds sensor data and then converts the result from Celsius to Fahrenheit which may produces a non round result, see around line 92
celsius = round(float(self._state) / 100, 1)
return convert_temperature(
celsius, TEMP_CELSIUS, self.unit_of_measurement)
as a quick fix I just round the result of convert_temperature
one more time:
celsius = round(float(self._state) / 100, 1)
return round(convert_temperature(celsius, TEMP_CELSIUS,
self.unit_of_measurement), 1)
I am running hassio and can not find a /components directory to edit the zha.py file.
Submit a PR so we can get this in and hopefully it will get merged. There are a few zha
items that have been pending for a couple of months now so maybe they will all merge one day lol.
Submit it as a Feature request?
If you updated the code and you verified it is working submit a Pull Request in Github so the project can get the updated code and everyone can use it instead of users needing to edit the files the way it was described. Anyone can submit a PR.
My comment was actually directed to @Quatuor who provided the code changes.
Got it.
Until it gets merged is their any way to do this with a value_template
in the configuration.yaml or entity_registry.yaml for now?
you can do a custom component with the folder and changes mentioned above.
Did you get dimming to work for the Osram Lightify switch in your code above?
Nope. After a closer look I found some issues with the original PR. So I was working on a few fixes and some improvements (at least I think so added state restore upon hass restart). Can you give it a try and let me know? here’s a new homeassistant/components/binary_sensor/zha.py
which I was working on. You still are going to need the __init__.py
and const.py
from my previous post.
"""
Binary sensors on Zigbee Home Automation networks.
For more details on this platform, please refer to the documentation
at https://home-assistant.io/components/binary_sensor.zha/
"""
import logging
import time
from enum import IntEnum
from homeassistant.components.binary_sensor import DOMAIN, BinarySensorDevice
from homeassistant.components import zha
from homeassistant.const import STATE_OFF
from homeassistant.helpers.event import async_track_time_change
from homeassistant.helpers.restore_state import async_get_last_state
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['zha']
# ZigBee Cluster Library Zone Type to Home Assistant device class
CLASS_MAPPING = {
0x000d: 'motion',
0x0015: 'opening',
0x0028: 'smoke',
0x002a: 'moisture',
0x002b: 'gas',
0x002d: 'vibration',
}
MOVING_STEP = 15
class OnOffCommands(IntEnum):
OFF = 0x00
ON = 0x01
TOGGLE = 0x02
OFF_WITH_EFFECT = 0x40
ON_WITH_RECALL_GLOBAL_SCENE = 0x41
ON_WITH_TIMED_OFF = 0x42
class LevelCommands(IntEnum):
MOVE_TO_LEVEL = 0x00
MOVE = 0x01
STEP = 0x02
STOP = 0x03
MOVE_TO_LEVEL_WITH_ON_OFF = 0x04
MOVE_WITH_ON_OFF = 0x05
STEP_WITH_ON_OFF = 0x06
STOP_WITH_ON_OFF = 0x07
async def async_setup_platform(hass, config, async_add_devices,
discovery_info=None):
"""Set up the Zigbee Home Automation binary sensors."""
discovery_info = zha.get_discovery_info(hass, discovery_info)
if discovery_info is None:
return
from zigpy.zcl.clusters.general import OnOff
from zigpy.zcl.clusters.security import IasZone
if IasZone.cluster_id in discovery_info['in_clusters']:
await _async_setup_iaszone(hass, config, async_add_devices,
discovery_info)
elif OnOff.cluster_id in discovery_info['out_clusters']:
await _async_setup_remote(hass, config, async_add_devices,
discovery_info)
async def _async_setup_iaszone(hass, config, async_add_devices,
discovery_info=None):
device_class = None
from zigpy.zcl.clusters.security import IasZone
cluster = discovery_info['in_clusters'][IasZone.cluster_id]
if discovery_info['new_join']:
await cluster.bind()
ieee = cluster.endpoint.device.application.ieee
await cluster.write_attributes({'cie_addr': ieee})
try:
zone_type = await cluster['zone_type']
device_class = CLASS_MAPPING.get(zone_type, None)
except Exception: # pylint: disable=broad-except
# If we fail to read from the device, use a non-specific class
pass
sensor = BinarySensor(device_class, **discovery_info)
async_add_devices([sensor], update_before_add=True)
async def _async_setup_remote(hass, config, async_add_devices, discovery_info):
async def safe(coro):
"""Run coro, catching ZigBee delivery errors, and ignoring them."""
import zigpy.exceptions
try:
await coro
except zigpy.exceptions.DeliveryError as exc:
_LOGGER.info("Ignoring error during setup: %s", exc)
if discovery_info['new_join']:
from zigpy.zcl.clusters.general import OnOff, LevelControl
out_clusters = discovery_info['out_clusters']
if OnOff.cluster_id in out_clusters:
cluster = out_clusters[OnOff.cluster_id]
await safe(cluster.bind())
await safe(cluster.configure_reporting(0, 0, 600, 1))
if LevelControl.cluster_id in out_clusters:
cluster = out_clusters[LevelControl.cluster_id]
await safe(cluster.bind())
await safe(cluster.configure_reporting(0, 1, 600, 1))
sensor = Switch(**discovery_info)
async_add_devices([sensor])
class BinarySensor(zha.Entity, BinarySensorDevice):
"""The ZHA Binary Sensor."""
_domain = DOMAIN
def __init__(self, device_class, **kwargs):
"""Initialize the ZHA binary sensor."""
super().__init__(**kwargs)
self._device_class = device_class
from zigpy.zcl.clusters.security import IasZone
self._ias_zone_cluster = self._in_clusters[IasZone.cluster_id]
@property
def should_poll(self) -> bool:
"""Let zha handle polling."""
return False
@property
def is_on(self) -> bool:
"""Return True if entity is on."""
if self._state == 'unknown':
return False
return bool(self._state)
@property
def device_class(self):
"""Return the class of this device, from component DEVICE_CLASSES."""
return self._device_class
def cluster_command(self, tsn, command_id, args):
"""Handle commands received to this cluster."""
if command_id == 0:
self._state = args[0] & 3
_LOGGER.debug("Updated alarm state: %s", self._state)
self.async_schedule_update_ha_state()
elif command_id == 1:
_LOGGER.debug("Enroll requested")
res = self._ias_zone_cluster.enroll_response(0, 0)
self.hass.async_add_job(res)
async def async_update(self):
"""Retrieve latest state."""
from bellows.types.basic import uint16_t
result = await zha.safe_read(self._endpoint.ias_zone,
['zone_status'])
state = result.get('zone_status', self._state)
if isinstance(state, (int, uint16_t)):
self._state = result.get('zone_status', self._state) & 3
class Switch(zha.Entity, BinarySensorDevice):
"""ZHA switch/remote controller/button."""
_domain = DOMAIN
class OnOffListener:
"""Listener for the OnOff ZigBee cluster."""
def __init__(self, entity):
"""Initialize OnOffListener."""
self._entity = entity
self._state = True
def cluster_command(self, tsn, command_id, args):
"""Handle commands received to this cluster."""
cmds = OnOffCommands
_LOGGER.debug('{}: rx {} command with {} args on OnOff cluster'.
format(self._entity.entity_id, command_id, args))
if command_id in (cmds.OFF, cmds.OFF_WITH_EFFECT):
self.set_state(False)
elif command_id in (cmds.ON, cmds.ON_WITH_RECALL_GLOBAL_SCENE,
cmds.ON_WITH_TIMED_OFF):
self.set_state(True)
elif command_id == cmds.TOGGLE:
self.toggle()
def attribute_updated(self, attrid, value):
"""Handle attribute updates on this cluster."""
_LOGGER.debug('{}: reports {}/{} attr_id/value on OnOff cluster'.
format(self._entity.entity_id, attrid, value))
if attrid == 0:
self._state = value
self._entity.schedule_update_ha_state()
@property
def is_on(self) -> bool:
"""state of the switch"""
return self._state
def set_state(self, state):
"""Set the state."""
self._state = state
if self._state and not bool(self._entity.level.level):
self._entity.level.set_level(1, True)
self._entity.schedule_update_ha_state()
def toggle(self):
"""Toggle the state"""
self._state = not self._state
if self._state and not bool(self._entity.level.level):
self._entity.level.set_level(1, True)
self._entity.schedule_update_ha_state()
class LevelListener:
"""Listener for the LevelControl ZigBee cluster."""
class LvlChange:
"""ammount of Level change"""
def __init__(self, direction=0, rate=0, delta=MOVING_STEP,
start_moving=True):
self._direction = direction
if direction == 0x00:
self._delta = delta
else:
self._delta = -delta
self.still_moving = start_moving
if start_moving:
self.when_started = time.time()
self._rate = rate
@property
def delta(self):
"""amount of level change"""
return self._delta
@property
def delta_since_start(self):
"""return amount of change since start of level move"""
dt = time.time() - self.when_started
return int(dt * self.delta)
def stop(self):
"""Stop moving and returns delta since the start"""
self.still_moving = False
return self.delta_since_start
def __radd__(self, other):
return other + self._delta
def __rmul__(self, other):
return other * self._delta
def __init__(self, entity):
"""Initialize LevelListener."""
self._entity = entity
self._level = 255
self._orig_level = self._level
self.delta = self.LvlChange(start_moving=False)
def cluster_command(self, tsn, command_id, args):
"""Handle commands received to this cluster."""
_LOGGER.debug('{}: rx {} command with {} args on Level cluster'.
format(self._entity.entity_id, command_id, args))
if command_id == LevelCommands.MOVE_TO_LEVEL: # move_to_level
self.set_level(args[0])
elif command_id == LevelCommands.MOVE: # move
# We should dim slowly -- for now, just step once
self.delta = self.LvlChange(*args)
self._orig_level = self._level
self.move_level(self.delta)
elif command_id == LevelCommands.STEP: # step
# Step (technically shouldn't change on/off)
delta = self.LvlChange(*args)
self.move_level(delta)
elif command_id in (LevelCommands.STOP,
LevelCommands.STOP_WITH_ON_OFF):
stop_delta = self.delta.stop()
self.set_level(self._orig_level + stop_delta)
elif command_id == LevelCommands.MOVE_TO_LEVEL_WITH_ON_OFF:
# move_to_level_with_on_off
self.set_level(args[0], True)
elif command_id == LevelCommands.MOVE_WITH_ON_OFF: # move_with_on_off
# We should dim slowly -- for now, just step once
self.delta = self.LvlChange(*args)
self._orig_level = self._level
self.move_level(self.delta, True)
elif command_id == LevelCommands.STEP_WITH_ON_OFF: # step_with_on_off
delta = self.LvlChange(*args)
self.move_level(delta, True)
def attribute_update(self, attrid, value):
"""Handle attribute updates on this cluster."""
_LOGGER.debug('{}: reports {}/{} attr_id/value on level cluster'.
format(self._entity.entity_id, attrid, value))
if attrid == 0:
self.set_level(value)
@property
def level(self):
"""current level"""
return self._level
def move_level(self, change, onoff=False):
"""Increment the level, setting state if appropriate."""
self._level = min(255, max(0, self._level + change))
if onoff:
self._entity.onoff.set_state(bool(self._level))
self._entity.schedule_update_ha_state()
def set_level(self, level, onoff=False):
"""Set the level, setting state if appropriate."""
self._level = min(255, max(0, level))
if onoff:
self._entity.onoff.set_state(bool(self._level))
self._entity.schedule_update_ha_state()
async def async_time_interval(self, tracked_time):
"""if button is held, estimate level change"""
if not self.delta.still_moving:
return
delta = self.delta.delta_since_start
self.set_level(self._level + delta)
async def async_added_to_hass(self):
"""Run when about to be added to Hass"""
# Check if we have an old state
old_state = await async_get_last_state(self.hass, self.entity_id)
if old_state is not None:
_LOGGER.debug('{}: restoring old state {}'.format(self.entity_id,
old_state))
if old_state.attributes.get('level') is not None:
self.level.set_level(old_state.attributes['level'])
if old_state.state == STATE_OFF:
self.onoff.set_state(False)
async_track_time_change(self.hass, self.level.async_time_interval)
def __init__(self, **kwargs):
"""Initialize Switch."""
self.onoff = self.OnOffListener(self)
self.level = self.LevelListener(self)
from zigpy.zcl.clusters import general
self._out_listeners = {
general.OnOff.cluster_id: self.onoff,
general.LevelControl.cluster_id: self.level,
}
super().__init__(**kwargs)
@property
def is_on(self) -> bool:
"""Return true if the binary sensor is on."""
return self.onoff.is_on and bool(self.level.level)
@property
def device_state_attributes(self):
"""Return the device state attributes."""
attrs = {}
if self.is_on:
attrs['level'] = self.level.level
return attrs
with the above binary_sensor
I’m using the following automation to control the lights. The idea is: triggered any time the state changes (since we want to catch level change) and if the state of the binary_switch is on then we turn on the lights to the level corresponding to binary_switch attributes[‘level’]
- id: floor_light_switch_on
alias: Turn on floor lamp when switched on
trigger:
- platform: state
entity_id: binary_sensor.dimming_switch
condition:
- condition: state
entity_id: binary_sensor.dimming_switch
state: 'on'
action:
- service: light.turn_on
data_template:
entity_id: light.floor_lamp
brightness: "{{ trigger.to_state.attributes.level }}"
- id: light_switch_off
alias: Turn off floor lamp when switched off
trigger:
- platform: state
entity_id: binary_sensor.dimming_switch
to: 'off'
action:
- service: light.turn_off
entity_id:
- light.floor_lamp
** edit: fixed set level to stay within 0…255
*** edit: added automation to link binary_switch level change to light control
**** edit: update to turn switch off when level is 0
PS: I’m just starting with python so I understand some things i’m doing could be very stupid. Any constructive dialog is welcome
It works well, however dimming is a little slow. One suggestion would be to toggle the state of the switch to off when the level gets to 0.
I am running Hassio on a raspberry pi and ssh into edit config files
can you tell me where to find the components/sensor/zha.py file
you are suggesting to edit.
Started having weird behavior. I have 3 iris open close sensors and 3 iris motion sensors. I’ve been moving over my devices this week, and finally decided to move everything else over. So before I started 2 motion and one open close were added. I have not been able to get the other ones to add at all. It’s like zha permit is not even working.
I even went and deleted the zigbee.db, I was able to add one of the previous devices, but still not the other devices. I then removed the db one more time, and now nothing will add.
I’ve spent all week on this stuff, and not the last few devices I expected to be easy are causing my greif. Any suggestions, or places to look for what may be going on? Thanks!