Was able to get the firmware sensor working, but still cannot get alerts working.
2021-02-08 12:38:45 ERROR (SyncWorker_32) [custom_components.unifigateway.sensor] Failed to access alerts info: Expecting value: line 1 column 1 (char 0)
No doubt something very simple to resolve for someone with a bit more knowledge than myself. I have simply remove the “alerts” line from the yaml file for now.
Sensor.py
import logging
import voluptuous as vol
from datetime import timedelta
from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_NAME, CONF_HOST, CONF_USERNAME, CONF_PASSWORD,
CONF_MONITORED_CONDITIONS, CONF_VERIFY_SSL)
# __version__ = '0.2.3'
_LOGGER = logging.getLogger(__name__)
CONF_PORT = 'port'
CONF_SITE_ID = 'site_id'
DEFAULT_NAME = 'UniFi Gateway'
DEFAULT_HOST = 'localhost'
DEFAULT_PORT = 8443
DEFAULT_SITE = 'default'
DEFAULT_VERIFY_SSL = False
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=30)
SENSOR_VPN = 'vpn'
SENSOR_WWW = 'www'
SENSOR_WAN = 'wan'
SENSOR_LAN = 'lan'
SENSOR_WLAN = 'wlan'
SENSOR_ALERTS = 'alerts'
SENSOR_FIRMWARE = 'firmware'
USG_SENSORS = {
SENSOR_VPN: ['VPN', '', 'mdi:folder-key-network'],
SENSOR_WWW: ['WWW', '', 'mdi:web'],
SENSOR_WAN: ['WAN', '', 'mdi:shield-outline'],
SENSOR_LAN: ['LAN', '', 'mdi:lan'],
SENSOR_WLAN: ['WLAN','', 'mdi:wifi'],
SENSOR_ALERTS: ['Alerts', '', 'mdi:information-outline'],
SENSOR_FIRMWARE:['Firmware Upgradable', '', 'mdi:database-plus']
}
POSSIBLE_MONITORED = [ SENSOR_VPN, SENSOR_WWW, SENSOR_WAN, SENSOR_LAN,
SENSOR_WLAN, SENSOR_ALERTS, SENSOR_FIRMWARE ]
DEFAULT_MONITORED = POSSIBLE_MONITORED
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
vol.Optional(CONF_SITE_ID, default=DEFAULT_SITE): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL):
vol.Any(cv.boolean, cv.isfile),
vol.Optional(CONF_MONITORED_CONDITIONS, default=DEFAULT_MONITORED):
vol.All(cv.ensure_list, [vol.In(POSSIBLE_MONITORED)])
})
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Unifi sensor."""
from .controller import Controller, APIError
name = config.get(CONF_NAME)
host = config.get(CONF_HOST)
username = config.get(CONF_USERNAME)
password = config.get(CONF_PASSWORD)
site_id = config.get(CONF_SITE_ID)
port = config.get(CONF_PORT)
verify_ssl = config.get(CONF_VERIFY_SSL)
try:
ctrl = Controller(host, username, password, port, version='unifiOS',
site_id=site_id, ssl_verify=verify_ssl)
except APIError as ex:
_LOGGER.error("Failed to connect to Unifi Security Gateway: %s", ex)
return False
for sensor in config.get(CONF_MONITORED_CONDITIONS):
add_entities([UnifiGatewaySensor(hass, ctrl, name, sensor)], True)
class UnifiGatewaySensor(Entity):
"""Implementation of a UniFi Gateway sensor."""
def __init__(self, hass, ctrl, name, sensor):
"""Initialize the sensor."""
self._hass = hass
self._ctrl = ctrl
self._name = name + ' ' + USG_SENSORS[sensor][0]
self._sensor = sensor
self._state = None
self._alldata = None
self._data = None
self._attributes = {}
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def icon(self):
"""Icon to use in the frontend, if any."""
return USG_SENSORS[self._sensor][2]
@property
def state(self):
"""Return the state of the device."""
return self._state
@property
def state_attributes(self):
"""Return the device state attributes."""
return self._attributes
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Set up the sensor."""
from .controller import APIError
if self._sensor == SENSOR_ALERTS:
self._attributes = {}
try:
unarchived_alerts = self._ctrl.get_alerts()
except APIError as ex:
_LOGGER.error("Failed to access alerts info: %s", ex)
else:
for index, alert in enumerate(unarchived_alerts,start=1):
if not alert['archived']:
self._attributes[str(index)] = alert
self._state = len(self._attributes)
elif self._sensor == SENSOR_FIRMWARE:
self._attributes = {}
self._state = 0
try:
aps = self._ctrl.get_aps()
except APIError as ex:
_LOGGER.error("Failed to scan aps: %s", ex)
else:
# Set the attributes based on device name - this may not be unique
# but is user-readability preferred
for devices in aps:
if devices.get('upgradable'):
self._attributes[devices['name']] = devices['upgradable']
self._state += 1
else:
# get_healthinfo() call made for each of 4 sensors - should only be for 1
try:
# Check that function exists...potential errors on startup otherwise
if hasattr(self._ctrl,'get_healthinfo'):
self._alldata = self._ctrl.get_healthinfo()
for sub in self._alldata:
if sub['subsystem'] == self._sensor:
self._data = sub
self._state = sub['status'].upper()
for attr in sub:
self._attributes[attr] = sub[attr]
else:
_LOGGER.error("no healthinfo attribute for controller")
except APIError as ex:
_LOGGER.error("Failed to access health info: %s", ex)
Controller.py
import json
import logging
import requests
import shutil
import time
import warnings
"""For testing purposes:
logging.basicConfig(filename='pyunifi.log', level=logging.WARN,
format='%(asctime)s %(message)s')
"""
log = logging.getLogger(__name__)
class APIError(Exception):
pass
def retry_login(func, *args, **kwargs):
"""To reattempt login if requests exception(s) occur at time of call"""
def wrapper(*args, **kwargs):
try:
try:
return func(*args, **kwargs)
except (requests.exceptions.RequestException,
APIError) as err:
log.warning("Failed to perform %s due to %s" % (func, err))
controller = args[0]
controller._login()
return func(*args, **kwargs)
except Exception as err:
raise APIError(err)
return wrapper
class Controller(object):
"""Interact with a UniFi controller.
Uses the JSON interface on port 8443 (HTTPS) to communicate with a UniFi
controller. Operations will raise unifi.controller.APIError on obvious
problems (such as login failure), but many errors (such as disconnecting a
nonexistant client) will go unreported.
>>> from unifi.controller import Controller
>>> c = Controller('192.168.1.99', 'admin', 'p4ssw0rd')
>>> for ap in c.get_aps():
... print 'AP named %s with MAC %s' % (ap.get('name'), ap['mac'])
...
AP named Study with MAC dc:9f:db:1a:59:07
AP named Living Room with MAC dc:9f:db:1a:59:08
AP named Garage with MAC dc:9f:db:1a:59:0b
"""
def __init__(self, host, username, password, port=8443,
version='unifiOS', site_id='default', ssl_verify=True):
"""
:param host: the address of the controller host; IP or name
:param username: the username to log in with
:param password: the password to log in with
:param port: the port of the controller host
:param version: the base version of the controller API [v4|v5]
:param site_id: the site ID to connect to
:param ssl_verify: Verify the controllers SSL certificate,
can also be "path/to/custom_cert.pem"
"""
self.log = logging.getLogger(__name__ + ".Controller")
self.host = host
self.username = username
self.password = password
self.site_id = site_id
self.ssl_verify = ssl_verify
self.url = 'https://' + host + '/proxy/network/'
self.session = requests.Session()
self.session.verify = ssl_verify
self.log.debug('Controller for %s', self.url)
self._login()
@staticmethod
def _jsondec(data):
obj = json.loads(data)
if 'meta' in obj:
if obj['meta']['rc'] != 'ok':
raise APIError(obj['meta']['msg'])
if 'data' in obj:
return obj['data']
else:
return obj
def _api_url(self):
return self.url + 'api/s/' + self.site_id + '/'
@retry_login
def _read(self, url, params=None):
# Try block to handle the unifi server being offline.
r = self.session.get(url, params=params)
return self._jsondec(r.text)
def _api_read(self, url, params=None):
return self._read(self._api_url() + url, params)
@retry_login
def _write(self, url, params=None):
r = self.session.post(url, json=params)
return self._jsondec(r.text)
def _api_write(self, url, params=None):
return self._write(self._api_url() + url, params)
@retry_login
def _update(self, url, params=None):
r = self.session.put(url, json=params)
return self._jsondec(r.text)
def _api_update(self, url, params=None):
return self._update(self._api_url() + url, params)
def _login(self):
log.debug('login() as %s', self.username)
# XXX Why doesn't passing in the dict work?
params = {'username': self.username, 'password': self.password}
login_url = 'https://' + self.host + '/api/auth/login'
r = self.session.post(login_url, json=params)
if r.status_code != 200:
raise APIError("Login failed - status code: %i" % r.status_code)
def _logout(self):
log.debug('logout()')
self._api_write('logout')
def switch_site(self, name):
"""
Switch to another site
:param name: Site Name
:return: True or APIError
"""
for site in self.get_sites():
if site['desc'] == name:
self.site_id = site['name']
return True
raise APIError("No site %s found" % name)
def get_alerts(self):
# Return a list of all Alerts.
return self._api_write('stat/alarm')
def get_alerts_unarchived(self):
# Return a list of Alerts unarchived.
return self._api_write('stat/alarm', params={'archived': False})
def get_statistics_last_24h(self):
"""Returns statistical data of the last 24h"""
return self.get_statistics_24h(time.time())
def get_statistics_24h(self, endtime):
"""Return statistical data last 24h from time"""
params = {
'attrs': ["bytes", "num_sta", "time"],
'start': int(endtime - 86400) * 1000,
'end': int(endtime - 3600) * 1000}
return self._write(self._api_url() + 'stat/report/hourly.site', params)
def get_events(self):
"""Return a list of all Events."""
return self._api_read('stat/event')
def get_aps(self):
"""Return a list of all APs,
with significant information about each.
"""
# Set test to 0 instead of NULL
params = {'_depth': 2, 'test': 0}
return self._api_read('stat/device', params)
def get_client(self, mac):
"""Get details about a specific client"""
# stat/user/<mac> works better than stat/sta/<mac>
# stat/sta seems to be only active clients
# stat/user includes known but offline clients
return self._api_read('stat/user/' + mac)[0]
def get_clients(self):
"""Return a list of all active clients,
with significant information about each.
"""
return self._api_read('stat/sta')
def get_users(self):
"""Return a list of all known clients,
with significant information about each.
"""
return self._api_read('list/user')
def get_user_groups(self):
"""Return a list of user groups with its rate limiting settings."""
return self._api_read('list/usergroup')
def get_sysinfo(self):
"""Return basic system informations."""
return self._api_read('stat/sysinfo')
def get_healthinfo(self):
"""Return health information."""
return self._api_read('stat/health')
def get_sites(self):
"""Return a list of all sites,
with their UID and description"""
return self._read(self.url + 'api/self/sites')
def get_wlan_conf(self):
"""Return a list of configured WLANs
with their configuration parameters.
"""
return self._api_read('list/wlanconf')
def _run_command(self, command, params={}, mgr='stamgr'):
log.debug('_run_command(%s)', command)
params.update({'cmd': command})
return self._write(self._api_url() + 'cmd/' + mgr, params=params)
def _mac_cmd(self, target_mac, command, mgr='stamgr', params={}):
log.debug('_mac_cmd(%s, %s)', target_mac, command)
params['mac'] = target_mac
return self._run_command(command, params, mgr)
def get_device_stat(self, target_mac):
"""Gets the current state & configuration of
the given device based on its MAC Address.
:param target_mac: MAC address of the device.
:type target_mac: str
:returns: Dictionary containing metadata, state,
capabilities and configuration of the device
:rtype: dict()
"""
log.debug('get_device_stat(%s)', target_mac)
params = {"macs": [target_mac]}
return self._api_read('stat/device/' + target_mac, params)[0]
def get_switch_port_overrides(self, target_mac):
"""Gets a list of port overrides, in dictionary
format, for the given target MAC address. The
dictionary contains the port_idx, portconf_id,
poe_mode, & name.
:param target_mac: MAC address of the device.
:type target_mac: str
:returns: [ { 'port_idx': int(), 'portconf': str,
'poe_mode': str, 'name': str } ]
:rtype: list( dict() )
"""
log.debug('get_switch_port_overrides(%s)', target_mac)
return self.get_device_stat(target_mac)['port_overrides']
def _switch_port_power(self, target_mac, port_idx, mode):
"""Helper method to set the given PoE mode the port/switch.
:param target_mac: MAC address of the Switch.
:type target_mac: str
:param port_idx: Port ID to target
:type port_idx: int
:param mode: PoE mode to set. ie. auto, on, off.
:type mode: str
:returns: { 'port_overrides': [ { 'port_idx': int(),
'portconf': str, 'poe_mode': str, 'name': str } ] }
:rtype: dict( list( dict() ) )
"""
# TODO: Switch operations should most likely happen in a
# different Class, Switch.
log.debug('_switch_port_power(%s, %s, %s)', target_mac, port_idx, mode)
device_stat = self.get_device_stat(target_mac)
device_id = device_stat['_id']
overrides = device_stat['port_overrides']
found = False
for i in range(0, len(overrides)):
if overrides[i]['port_idx'] == port_idx:
# Override already exists, update..
overrides[i]['poe_mode'] = mode
found = True
break
if not found:
# Retrieve portconf
portconf_id = None
for port in device_stat['port_table']:
if port['port_idx'] == port_idx:
portconf_id = port['portconf_id']
break
if portconf_id is None:
log.error("Port ID %s could not be found in the port_table.")
raise APIError(
'Port ID %s not found in port_table' % str(port_idx)
)
overrides.append({
"port_idx": port_idx,
"portconf_id": portconf_id,
"poe_mode": mode
})
# We return the device_id as it's needed by the parent method
return {"port_overrides": overrides, "device_id": device_id}
def switch_port_power_off(self, target_mac, port_idx):
"""Powers Off the given port on the Switch identified
by the given MAC Address.
:param target_mac: MAC address of the Switch.
:type target_mac: str
:param port_idx: Port ID to power off
:type port_idx: int
:returns: API Response which is the resulting complete port overrides
:rtype: list( dict() )
"""
log.debug('switch_port_power_off(%s, %s)', target_mac, port_idx)
params = self._switch_port_power(target_mac, port_idx, "off")
device_id = params['device_id']
del params['device_id']
return self._api_update('rest/device/' + device_id, params)
def switch_port_power_on(self, target_mac, port_idx):
"""Powers On the given port on the Switch identified
by the given MAC Address.
:param target_mac: MAC address of the Switch.
:type target_mac: str
:param port_idx: Port ID to power on
:type port_idx: int
:returns: API Response which is the resulting complete port overrides
:rtype: list( dict() )
"""
log.debug('switch_port_power_on(%s, %s)', target_mac, port_idx)
params = self._switch_port_power(target_mac, port_idx, "auto")
device_id = params['device_id']
del params['device_id']
return self._api_update('rest/device/' + device_id, params)
def create_site(self, desc='desc'):
"""Create a new site.
:param desc: Name of the site to be created.
"""
return self._run_command('add-site', params={"desc": desc},
mgr='sitemgr')
def block_client(self, mac):
"""Add a client to the block list.
:param mac: the MAC address of the client to block.
"""
return self._mac_cmd(mac, 'block-sta')
def unblock_client(self, mac):
"""Remove a client from the block list.
:param mac: the MAC address of the client to unblock.
"""
return self._mac_cmd(mac, 'unblock-sta')
def disconnect_client(self, mac):
"""Disconnect a client.
Disconnects a client, forcing them to reassociate. Useful when the
connection is of bad quality to force a rescan.
:param mac: the MAC address of the client to disconnect.
"""
return self._mac_cmd(mac, 'kick-sta')
def restart_ap(self, mac):
"""Restart an access point (by MAC).
:param mac: the MAC address of the AP to restart.
"""
return self._mac_cmd(mac, 'restart', 'devmgr')
def restart_ap_name(self, name):
"""Restart an access point (by name).
:param name: the name address of the AP to restart.
"""
if not name:
raise APIError('%s is not a valid name' % str(name))
for ap in self.get_aps():
if ap.get('state', 0) == 1 and ap.get('name', None) == name:
return self.restart_ap(ap['mac'])
def archive_all_alerts(self):
# Archive all Alerts
return self._run_command('archive-all-alarms', mgr='evtmgr')
def create_backup(self, days='0'):
"""Ask controller to create a backup archive file
..warning:
This process puts significant load on the controller
and may render it partially unresponsive for other requests.
:param days: metrics of the last x days will be added to the backup.
'-1' backup all metrics. '0' backup only the configuration.
:return: URL path to backup file
"""
res = self._run_command('backup', mgr='system', params={'days': days})
return res[0]['url']
def get_backup(self, download_path=None, target_file='unifi-backup.unf'):
"""
:param download_path: path to backup; if None is given
one will be created
:param target_file: Filename or full path to download the
backup archive to, should have .unf extension for restore.
"""
if not download_path:
download_path = self.create_backup()
r = self.session.get(self.url + download_path, stream=True)
with open(target_file, 'wb') as _backfh:
return shutil.copyfileobj(r.raw, _backfh)
def authorize_guest(self, guest_mac, minutes, up_bandwidth=None,
down_bandwidth=None, byte_quota=None, ap_mac=None):
"""
Authorize a guest based on his MAC address.
:param guest_mac: the guest MAC address: 'aa:bb:cc:dd:ee:ff'
:param minutes: duration of the authorization in minutes
:param up_bandwidth: up speed allowed in kbps
:param down_bandwidth: down speed allowed in kbps
:param byte_quota: quantity of bytes allowed in MB
:param ap_mac: access point MAC address
"""
cmd = 'authorize-guest'
params = {'mac': guest_mac, 'minutes': minutes}
if up_bandwidth:
params['up'] = up_bandwidth
if down_bandwidth:
params['down'] = down_bandwidth
if byte_quota:
params['bytes'] = byte_quota
if ap_mac:
params['ap_mac'] = ap_mac
return self._run_command(cmd, params=params)
def unauthorize_guest(self, guest_mac):
"""
Unauthorize a guest based on his MAC address.
:param guest_mac: the guest MAC address: 'aa:bb:cc:dd:ee:ff'
"""
cmd = 'unauthorize-guest'
params = {'mac': guest_mac}
return self._run_command(cmd, params=params)
def get_firmware(self, cached=True, available=True,
known=False, site=False):
"""
Return a list of available/cached firmware versions
:param cached: Return cached firmwares
:param available: Return available (and not cached) firmwares
:param known: Return only firmwares for known devices
:param site: Return only firmwares for on-site devices
:return: List of firmware dicts
"""
res = []
if cached:
res.extend(self._run_command('list-cached', mgr='firmware'))
if available:
res.extend(self._run_command('list-available', mgr='firmware'))
if known:
res = [fw for fw in res if fw['knownDevice']]
if site:
res = [fw for fw in res if fw['siteDevice']]
return res
def cache_firmware(self, version, device):
"""
Cache the firmware on the UniFi Controller
.. warning:: Caching one device might very well cache others,
as they're on shared platforms
:param version: version to cache
:param device: device model to cache (e.g. BZ2)
:return: True/False
"""
return self._run_command(
'download', mgr='firmware',
params={'device': device, 'version': version})[0]['result']
def remove_firmware(self, version, device):
"""
Remove cached firmware from the UniFi Controller
.. warning:: Removing one device's firmware might very well remove
others, as they're on shared platforms
:param version: version to cache
:param device: device model to cache (e.g. BZ2)
:return: True/false
"""
return self._run_command(
'remove', mgr='firmware',
params={'device': device, 'version': version})[0]['result']
def get_tag(self):
"""Get all tags and their member MACs"""
return self._api_read('rest/tag')
def upgrade_device(self, mac, version):
"""
Upgrade a device's firmware to verion
:param mac: MAC of dev
:param version: version to upgrade to
"""
self._mac_cmd(mac, 'upgrade', mgr='devmgr',
params={'upgrade_to_firmware': version})
def provision(self, mac):
"""
Force provisioning of a device
:param mac: MAC of device
"""
self._mac_cmd(mac, 'force-provision', mgr='devmgr')
def get_setting(self, section=None, super=False):
"""
Return settings for this site or controller
:param super: Return only controller-wide settings
:param section: Only return this/these section(s)
:return: {section:settings}
"""
res = {}
settings = self._api_read('get/setting')
if section and not isinstance(section, (list, tuple)):
section = [section]
for s in settings:
s_sect = s['key']
if (super and 'site_id' in s) or \
(not super and 'site_id' not in s) or \
(section and s_sect not in section):
continue
for k in ('_id', 'site_id', 'key'):
s.pop(k, None)
res[s_sect] = s
return res
def update_setting(self, settings):
"""
Update settings
:param settings: {section:{settings}}
:return: resulting settings
"""
res = []
for sect, setting in settings.items():
res.extend(self._api_write('set/setting/' + sect, setting))
return res
def update_user_group(self, group_id, down_kbps=-1, up_kbps=-1):
"""
Update user group bandwidth settings
:param group_id: Group ID to modify
:param down_kbps: New bandwidth in KBPS for download
:param up_kbps: New bandwidth in KBPS for upload
"""
res = None
groups = self.get_user_groups()
for group in groups:
if group["_id"] == group_id:
# Apply setting change
res = self._api_update("rest/usergroup/{0}".format(group_id), {
"qos_rate_max_down": down_kbps,
"qos_rate_max_up": up_kbps,
"name": group["name"],
"_id": group_id,
"site_id": self.site_id
})
return res
raise ValueError("Group ID {0} is not valid.".format(group_id))
def set_client_alias(self, mac, alias):
"""
Set the client alias. Set to "" to reset to default
:param mac: The MAC of the client to rename
:param alias: The alias to set
"""
client = self.get_client(mac)['_id']
return self._api_update('rest/user/' + client, {'name': alias})
def create_voucher(self, number, quota, expire, up_bandwidth=None,
down_bandwidth=None, byte_quota=None, note=None):
"""
Create voucher for guests.
:param number: number of vouchers
:param quota: number of using; 0 = unlimited
:param expire: expiration of voucher in minutes
:param up_bandwidth: up speed allowed in kbps
:param down_bandwidth: down speed allowed in kbps
:param byte_quota: quantity of bytes allowed in MB
:param note: description
"""
cmd = 'create-voucher'
params = {'n': number, 'quota': quota, 'expire': 'custom',
'expire_number': expire, 'expire_unit': 1}
if up_bandwidth:
params['up'] = up_bandwidth
if down_bandwidth:
params['down'] = down_bandwidth
if byte_quota:
params['bytes'] = byte_quota
if note:
params['note'] = note
res = self._run_command(cmd, mgr='hotspot', params=params)
return self.list_vouchers(create_time=res[0]['create_time'])
def list_vouchers(self, **filter):
"""
Get list of vouchers
:param filter: Filter vouchers by create_time, code, quota,
used, note, status_expires, status, ...
"""
if 'code' in filter:
filter['code'] = filter['code'].replace('-', '')
vouchers = []
for voucher in self._api_read('stat/voucher'):
voucher_match = True
for key, val in filter.items():
voucher_match &= voucher.get(key) == val
if voucher_match:
vouchers.append(voucher)
return vouchers
def delete_voucher(self, id):
"""
Delete / revoke voucher
:param id: id of voucher
"""
cmd = 'delete-voucher'
params = {'_id': id}
self._run_command(cmd, mgr='hotspot', params=params)