UniFi Security Gateway

Was able to get the firmware sensor working, but still cannot get alerts working.

2021-02-08 12:38:45 ERROR (SyncWorker_32) [custom_components.unifigateway.sensor] Failed to access alerts info: Expecting value: line 1 column 1 (char 0)

No doubt something very simple to resolve for someone with a bit more knowledge than myself. I have simply remove the “alerts” line from the yaml file for now.

Sensor.py

import logging
import voluptuous as vol
from datetime import timedelta

from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
    CONF_NAME, CONF_HOST, CONF_USERNAME, CONF_PASSWORD,
    CONF_MONITORED_CONDITIONS, CONF_VERIFY_SSL)

# __version__ = '0.2.3'

_LOGGER = logging.getLogger(__name__)

CONF_PORT = 'port'
CONF_SITE_ID = 'site_id'

DEFAULT_NAME = 'UniFi Gateway'
DEFAULT_HOST = 'localhost'
DEFAULT_PORT = 8443
DEFAULT_SITE = 'default'
DEFAULT_VERIFY_SSL = False

MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=30)

SENSOR_VPN = 'vpn'
SENSOR_WWW = 'www'
SENSOR_WAN = 'wan'
SENSOR_LAN = 'lan'
SENSOR_WLAN = 'wlan'
SENSOR_ALERTS = 'alerts'
SENSOR_FIRMWARE = 'firmware'


USG_SENSORS = {
    SENSOR_VPN:     ['VPN', '', 'mdi:folder-key-network'],
    SENSOR_WWW:     ['WWW', '', 'mdi:web'],
    SENSOR_WAN:     ['WAN', '', 'mdi:shield-outline'],
    SENSOR_LAN:     ['LAN', '', 'mdi:lan'],
    SENSOR_WLAN:    ['WLAN','', 'mdi:wifi'],
    SENSOR_ALERTS:  ['Alerts', '', 'mdi:information-outline'],
    SENSOR_FIRMWARE:['Firmware Upgradable', '', 'mdi:database-plus']
}

POSSIBLE_MONITORED = [ SENSOR_VPN, SENSOR_WWW, SENSOR_WAN, SENSOR_LAN,
                        SENSOR_WLAN, SENSOR_ALERTS, SENSOR_FIRMWARE ]
DEFAULT_MONITORED = POSSIBLE_MONITORED

PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
    vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
    vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
    vol.Optional(CONF_SITE_ID, default=DEFAULT_SITE): cv.string,
    vol.Required(CONF_PASSWORD): cv.string,
    vol.Required(CONF_USERNAME): cv.string,
    vol.Required(CONF_PORT, default=DEFAULT_PORT): cv.port,
    vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL):
        vol.Any(cv.boolean, cv.isfile),
    vol.Optional(CONF_MONITORED_CONDITIONS, default=DEFAULT_MONITORED):
        vol.All(cv.ensure_list, [vol.In(POSSIBLE_MONITORED)])
})

def setup_platform(hass, config, add_entities, discovery_info=None):
    """Set up the Unifi sensor."""
    from .controller import Controller, APIError

    name = config.get(CONF_NAME)
    host = config.get(CONF_HOST)
    username = config.get(CONF_USERNAME)
    password = config.get(CONF_PASSWORD)
    site_id = config.get(CONF_SITE_ID)
    port = config.get(CONF_PORT)
    verify_ssl = config.get(CONF_VERIFY_SSL)

    try:
        ctrl = Controller(host, username, password, port, version='unifiOS',
                          site_id=site_id, ssl_verify=verify_ssl)
    except APIError as ex:
        _LOGGER.error("Failed to connect to Unifi Security Gateway: %s", ex)
        return False

    for sensor in config.get(CONF_MONITORED_CONDITIONS):
        add_entities([UnifiGatewaySensor(hass, ctrl, name, sensor)], True)

class UnifiGatewaySensor(Entity):
    """Implementation of a UniFi Gateway sensor."""

    def __init__(self, hass, ctrl, name, sensor):
        """Initialize the sensor."""
        self._hass = hass
        self._ctrl = ctrl
        self._name = name + ' ' + USG_SENSORS[sensor][0]
        self._sensor = sensor
        self._state = None
        self._alldata = None
        self._data = None
        self._attributes = {}

    @property
    def name(self):
        """Return the name of the sensor."""
        return self._name

    @property
    def icon(self):
        """Icon to use in the frontend, if any."""
        return USG_SENSORS[self._sensor][2]

    @property
    def state(self):
        """Return the state of the device."""
        return self._state

    @property
    def state_attributes(self):
        """Return the device state attributes."""
        return self._attributes

    @Throttle(MIN_TIME_BETWEEN_UPDATES)
    def update(self):
        """Set up the sensor."""
        from .controller import APIError
        if self._sensor == SENSOR_ALERTS:
          self._attributes = {}

          try:
              unarchived_alerts = self._ctrl.get_alerts()
          except APIError as ex:
              _LOGGER.error("Failed to access alerts info: %s", ex)
          else:
              for index, alert in enumerate(unarchived_alerts,start=1):
                  if not alert['archived']:
                      self._attributes[str(index)] = alert

              self._state = len(self._attributes)

        elif self._sensor == SENSOR_FIRMWARE:
          self._attributes = {}
          self._state = 0

          try:
            aps = self._ctrl.get_aps()
          except APIError as ex:
            _LOGGER.error("Failed to scan aps: %s", ex)
          else:
            # Set the attributes based on device name - this may not be unique
            # but is user-readability preferred
            for devices in aps:
              if devices.get('upgradable'):
                  self._attributes[devices['name']] = devices['upgradable']
                  self._state += 1

        else:
          # get_healthinfo() call made for each of 4 sensors - should only be for 1
          try:
              # Check that function exists...potential errors on startup otherwise
              if hasattr(self._ctrl,'get_healthinfo'):
                  self._alldata = self._ctrl.get_healthinfo()

                  for sub in self._alldata:
                      if sub['subsystem'] == self._sensor:
                          self._data = sub
                          self._state = sub['status'].upper()
                          for attr in sub:
                              self._attributes[attr] = sub[attr]

              else:
                  _LOGGER.error("no healthinfo attribute for controller")
          except APIError as ex:
              _LOGGER.error("Failed to access health info: %s", ex)

Controller.py

import json
import logging
import requests
import shutil
import time
import warnings


"""For testing purposes:
logging.basicConfig(filename='pyunifi.log', level=logging.WARN,
                    format='%(asctime)s %(message)s')
"""
log = logging.getLogger(__name__)


class APIError(Exception):
    pass


def retry_login(func, *args, **kwargs):
    """To reattempt login if requests exception(s) occur at time of call"""
    def wrapper(*args, **kwargs):
        try:
            try:
                return func(*args, **kwargs)
            except (requests.exceptions.RequestException,
                    APIError) as err:
                log.warning("Failed to perform %s due to %s" % (func, err))
                controller = args[0]
                controller._login()
                return func(*args, **kwargs)
        except Exception as err:
            raise APIError(err)
    return wrapper


class Controller(object):

    """Interact with a UniFi controller.

    Uses the JSON interface on port 8443 (HTTPS) to communicate with a UniFi
    controller. Operations will raise unifi.controller.APIError on obvious
    problems (such as login failure), but many errors (such as disconnecting a
    nonexistant client) will go unreported.

    >>> from unifi.controller import Controller
    >>> c = Controller('192.168.1.99', 'admin', 'p4ssw0rd')
    >>> for ap in c.get_aps():
    ...     print 'AP named %s with MAC %s' % (ap.get('name'), ap['mac'])
    ...
    AP named Study with MAC dc:9f:db:1a:59:07
    AP named Living Room with MAC dc:9f:db:1a:59:08
    AP named Garage with MAC dc:9f:db:1a:59:0b

    """

    def __init__(self, host, username, password, port=8443,
                 version='unifiOS', site_id='default', ssl_verify=True):
        """
        :param host: the address of the controller host; IP or name
        :param username: the username to log in with
        :param password: the password to log in with
        :param port: the port of the controller host
        :param version: the base version of the controller API [v4|v5]
        :param site_id: the site ID to connect to
        :param ssl_verify: Verify the controllers SSL certificate,
            can also be "path/to/custom_cert.pem"
        """

        self.log = logging.getLogger(__name__ + ".Controller")

        self.host = host
        self.username = username
        self.password = password
        self.site_id = site_id
        self.ssl_verify = ssl_verify
        self.url = 'https://' + host + '/proxy/network/'


        self.session = requests.Session()
        self.session.verify = ssl_verify

        self.log.debug('Controller for %s', self.url)
        self._login()


    @staticmethod
    def _jsondec(data):
        obj = json.loads(data)
        if 'meta' in obj:
            if obj['meta']['rc'] != 'ok':
                raise APIError(obj['meta']['msg'])
        if 'data' in obj:
            return obj['data']
        else:
            return obj

    def _api_url(self):
        return self.url + 'api/s/' + self.site_id + '/'

    @retry_login
    def _read(self, url, params=None):
        # Try block to handle the unifi server being offline.
        r = self.session.get(url, params=params)
        return self._jsondec(r.text)

    def _api_read(self, url, params=None):
        return self._read(self._api_url() + url, params)

    @retry_login
    def _write(self, url, params=None):
        r = self.session.post(url, json=params)
        return self._jsondec(r.text)

    def _api_write(self, url, params=None):
        return self._write(self._api_url() + url, params)

    @retry_login
    def _update(self, url, params=None):
        r = self.session.put(url, json=params)
        return self._jsondec(r.text)

    def _api_update(self, url, params=None):
        return self._update(self._api_url() + url, params)

    def _login(self):
        log.debug('login() as %s', self.username)

        # XXX Why doesn't passing in the dict work?
        params = {'username': self.username, 'password': self.password}
        login_url = 'https://' + self.host + '/api/auth/login'

        r = self.session.post(login_url, json=params)
        if r.status_code != 200:
            raise APIError("Login failed - status code: %i" % r.status_code)

    def _logout(self):
        log.debug('logout()')
        self._api_write('logout')

    def switch_site(self, name):
        """
        Switch to another site

        :param name: Site Name
        :return: True or APIError
        """
        for site in self.get_sites():
            if site['desc'] == name:
                self.site_id = site['name']
                return True
        raise APIError("No site %s found" % name)

    def get_alerts(self):
        # Return a list of all Alerts.
        return self._api_write('stat/alarm')

    def get_alerts_unarchived(self):
        # Return a list of Alerts unarchived.
        return self._api_write('stat/alarm', params={'archived': False})

    def get_statistics_last_24h(self):
        """Returns statistical data of the last 24h"""
        return self.get_statistics_24h(time.time())

    def get_statistics_24h(self, endtime):
        """Return statistical data last 24h from time"""

        params = {
            'attrs': ["bytes", "num_sta", "time"],
            'start': int(endtime - 86400) * 1000,
            'end': int(endtime - 3600) * 1000}
        return self._write(self._api_url() + 'stat/report/hourly.site', params)

    def get_events(self):
        """Return a list of all Events."""
        return self._api_read('stat/event')

    def get_aps(self):
        """Return a list of all APs,
        with significant information about each.
        """
        # Set test to 0 instead of NULL
        params = {'_depth': 2, 'test': 0}
        return self._api_read('stat/device', params)

    def get_client(self, mac):
        """Get details about a specific client"""

        # stat/user/<mac> works better than stat/sta/<mac>
        # stat/sta seems to be only active clients
        # stat/user includes known but offline clients
        return self._api_read('stat/user/' + mac)[0]

    def get_clients(self):
        """Return a list of all active clients,
        with significant information about each.
        """
        return self._api_read('stat/sta')

    def get_users(self):
        """Return a list of all known clients,
        with significant information about each.
        """
        return self._api_read('list/user')

    def get_user_groups(self):
        """Return a list of user groups with its rate limiting settings."""
        return self._api_read('list/usergroup')

    def get_sysinfo(self):
        """Return basic system informations."""
        return self._api_read('stat/sysinfo')

    def get_healthinfo(self):
        """Return health information."""
        return self._api_read('stat/health')

    def get_sites(self):
        """Return a list of all sites,
        with their UID and description"""
        return self._read(self.url + 'api/self/sites')

    def get_wlan_conf(self):
        """Return a list of configured WLANs
        with their configuration parameters.
        """
        return self._api_read('list/wlanconf')

    def _run_command(self, command, params={}, mgr='stamgr'):
        log.debug('_run_command(%s)', command)
        params.update({'cmd': command})
        return self._write(self._api_url() + 'cmd/' + mgr, params=params)

    def _mac_cmd(self, target_mac, command, mgr='stamgr', params={}):
        log.debug('_mac_cmd(%s, %s)', target_mac, command)
        params['mac'] = target_mac
        return self._run_command(command, params, mgr)

    def get_device_stat(self, target_mac):
        """Gets the current state & configuration of
        the given device based on its MAC Address.
        :param target_mac: MAC address of the device.
        :type target_mac: str
        :returns: Dictionary containing metadata, state,
            capabilities and configuration of the device
        :rtype: dict()
        """
        log.debug('get_device_stat(%s)', target_mac)
        params = {"macs": [target_mac]}
        return self._api_read('stat/device/' + target_mac, params)[0]

    def get_switch_port_overrides(self, target_mac):
        """Gets a list of port overrides, in dictionary
        format, for the given target MAC address. The
        dictionary contains the port_idx, portconf_id,
        poe_mode, & name.

        :param target_mac: MAC address of the device.
        :type target_mac: str
        :returns: [ { 'port_idx': int(), 'portconf': str,
            'poe_mode': str, 'name': str } ]
        :rtype: list( dict() )
        """
        log.debug('get_switch_port_overrides(%s)', target_mac)
        return self.get_device_stat(target_mac)['port_overrides']

    def _switch_port_power(self, target_mac, port_idx, mode):
        """Helper method to set the given PoE mode the port/switch.

        :param target_mac: MAC address of the Switch.
        :type target_mac: str
        :param port_idx: Port ID to target
        :type port_idx: int
        :param mode: PoE mode to set. ie. auto, on, off.
        :type mode: str
        :returns: { 'port_overrides': [ { 'port_idx': int(),
            'portconf': str, 'poe_mode': str, 'name': str } ] }
        :rtype: dict( list( dict() ) )
        """
        # TODO: Switch operations should most likely happen in a
        # different Class, Switch.
        log.debug('_switch_port_power(%s, %s, %s)', target_mac, port_idx, mode)
        device_stat = self.get_device_stat(target_mac)
        device_id = device_stat['_id']
        overrides = device_stat['port_overrides']
        found = False
        for i in range(0, len(overrides)):
            if overrides[i]['port_idx'] == port_idx:
                # Override already exists, update..
                overrides[i]['poe_mode'] = mode
                found = True
                break
        if not found:
            # Retrieve portconf
            portconf_id = None
            for port in device_stat['port_table']:
                if port['port_idx'] == port_idx:
                    portconf_id = port['portconf_id']
                    break
            if portconf_id is None:
                log.error("Port ID %s could not be found in the port_table.")
                raise APIError(
                    'Port ID %s not found in port_table' % str(port_idx)
                )
            overrides.append({
                "port_idx": port_idx,
                "portconf_id": portconf_id,
                "poe_mode": mode
            })
        # We return the device_id as it's needed by the parent method
        return {"port_overrides": overrides, "device_id": device_id}

    def switch_port_power_off(self, target_mac, port_idx):
        """Powers Off the given port on the Switch identified
        by the given MAC Address.

        :param target_mac: MAC address of the Switch.
        :type target_mac: str
        :param port_idx: Port ID to power off
        :type port_idx: int
        :returns: API Response which is the resulting complete port overrides
        :rtype: list( dict() )
        """
        log.debug('switch_port_power_off(%s, %s)', target_mac, port_idx)
        params = self._switch_port_power(target_mac, port_idx, "off")
        device_id = params['device_id']
        del params['device_id']
        return self._api_update('rest/device/' + device_id, params)

    def switch_port_power_on(self, target_mac, port_idx):
        """Powers On the given port on the Switch identified
        by the given MAC Address.

        :param target_mac: MAC address of the Switch.
        :type target_mac: str
        :param port_idx: Port ID to power on
        :type port_idx: int
        :returns: API Response which is the resulting complete port overrides
        :rtype: list( dict() )
        """
        log.debug('switch_port_power_on(%s, %s)', target_mac, port_idx)
        params = self._switch_port_power(target_mac, port_idx, "auto")
        device_id = params['device_id']
        del params['device_id']
        return self._api_update('rest/device/' + device_id, params)

    def create_site(self, desc='desc'):
        """Create a new site.

        :param desc: Name of the site to be created.
        """
        return self._run_command('add-site', params={"desc": desc},
                                 mgr='sitemgr')

    def block_client(self, mac):
        """Add a client to the block list.

        :param mac: the MAC address of the client to block.
        """
        return self._mac_cmd(mac, 'block-sta')

    def unblock_client(self, mac):
        """Remove a client from the block list.

        :param mac: the MAC address of the client to unblock.
        """
        return self._mac_cmd(mac, 'unblock-sta')

    def disconnect_client(self, mac):
        """Disconnect a client.

        Disconnects a client, forcing them to reassociate. Useful when the
        connection is of bad quality to force a rescan.

        :param mac: the MAC address of the client to disconnect.
        """
        return self._mac_cmd(mac, 'kick-sta')

    def restart_ap(self, mac):
        """Restart an access point (by MAC).

        :param mac: the MAC address of the AP to restart.
        """
        return self._mac_cmd(mac, 'restart', 'devmgr')

    def restart_ap_name(self, name):
        """Restart an access point (by name).

        :param name: the name address of the AP to restart.
        """
        if not name:
            raise APIError('%s is not a valid name' % str(name))
        for ap in self.get_aps():
            if ap.get('state', 0) == 1 and ap.get('name', None) == name:
                return self.restart_ap(ap['mac'])

    def archive_all_alerts(self):
#        Archive all Alerts
        return self._run_command('archive-all-alarms', mgr='evtmgr')

    def create_backup(self, days='0'):
        """Ask controller to create a backup archive file

        ..warning:
            This process puts significant load on the controller
            and may render it partially unresponsive for other requests.

        :param days: metrics of the last x days will be added to the backup.
            '-1' backup all metrics. '0' backup only the configuration.
        :return: URL path to backup file
        """
        res = self._run_command('backup', mgr='system', params={'days': days})
        return res[0]['url']

    def get_backup(self, download_path=None, target_file='unifi-backup.unf'):
        """
        :param download_path: path to backup; if None is given
            one will be created
        :param target_file: Filename or full path to download the
            backup archive to, should have .unf extension for restore.
        """
        if not download_path:
            download_path = self.create_backup()

        r = self.session.get(self.url + download_path, stream=True)
        with open(target_file, 'wb') as _backfh:
            return shutil.copyfileobj(r.raw, _backfh)

    def authorize_guest(self, guest_mac, minutes, up_bandwidth=None,
                        down_bandwidth=None, byte_quota=None, ap_mac=None):
        """
        Authorize a guest based on his MAC address.

        :param guest_mac: the guest MAC address: 'aa:bb:cc:dd:ee:ff'
        :param minutes: duration of the authorization in minutes
        :param up_bandwidth: up speed allowed in kbps
        :param down_bandwidth: down speed allowed in kbps
        :param byte_quota: quantity of bytes allowed in MB
        :param ap_mac: access point MAC address
        """
        cmd = 'authorize-guest'
        params = {'mac': guest_mac, 'minutes': minutes}

        if up_bandwidth:
            params['up'] = up_bandwidth
        if down_bandwidth:
            params['down'] = down_bandwidth
        if byte_quota:
            params['bytes'] = byte_quota
        if ap_mac:
            params['ap_mac'] = ap_mac
        return self._run_command(cmd, params=params)

    def unauthorize_guest(self, guest_mac):
        """
        Unauthorize a guest based on his MAC address.

        :param guest_mac: the guest MAC address: 'aa:bb:cc:dd:ee:ff'
        """
        cmd = 'unauthorize-guest'
        params = {'mac': guest_mac}
        return self._run_command(cmd, params=params)

    def get_firmware(self, cached=True, available=True,
                     known=False, site=False):
        """
        Return a list of available/cached firmware versions

        :param cached: Return cached firmwares
        :param available: Return available (and not cached) firmwares
        :param known: Return only firmwares for known devices
        :param site: Return only firmwares for on-site devices
        :return: List of firmware dicts
        """
        res = []
        if cached:
            res.extend(self._run_command('list-cached', mgr='firmware'))
        if available:
            res.extend(self._run_command('list-available', mgr='firmware'))

        if known:
            res = [fw for fw in res if fw['knownDevice']]
        if site:
            res = [fw for fw in res if fw['siteDevice']]
        return res

    def cache_firmware(self, version, device):
        """
        Cache the firmware on the UniFi Controller

        .. warning:: Caching one device might very well cache others,
            as they're on shared platforms

        :param version: version to cache
        :param device: device model to cache (e.g. BZ2)
        :return: True/False
        """
        return self._run_command(
            'download', mgr='firmware',
            params={'device': device, 'version': version})[0]['result']

    def remove_firmware(self, version, device):
        """
        Remove cached firmware from the UniFi Controller

        .. warning:: Removing one device's firmware might very well remove
            others, as they're on shared platforms

        :param version: version to cache
        :param device: device model to cache (e.g. BZ2)
        :return: True/false
        """
        return self._run_command(
            'remove', mgr='firmware',
            params={'device': device, 'version': version})[0]['result']

    def get_tag(self):
        """Get all tags and their member MACs"""
        return self._api_read('rest/tag')

    def upgrade_device(self, mac, version):
        """
        Upgrade a device's firmware to verion
        :param mac: MAC of dev
        :param version: version to upgrade to
        """
        self._mac_cmd(mac, 'upgrade', mgr='devmgr',
                      params={'upgrade_to_firmware': version})

    def provision(self, mac):
        """
        Force provisioning of a device
        :param mac: MAC of device
        """
        self._mac_cmd(mac, 'force-provision', mgr='devmgr')

    def get_setting(self, section=None, super=False):
        """
        Return settings for this site or controller

        :param super: Return only controller-wide settings
        :param section: Only return this/these section(s)
        :return: {section:settings}
        """
        res = {}
        settings = self._api_read('get/setting')
        if section and not isinstance(section, (list, tuple)):
            section = [section]

        for s in settings:
            s_sect = s['key']
            if (super and 'site_id' in s) or \
               (not super and 'site_id' not in s) or \
               (section and s_sect not in section):
                continue
            for k in ('_id', 'site_id', 'key'):
                s.pop(k, None)
            res[s_sect] = s
        return res

    def update_setting(self, settings):
        """
        Update settings

        :param settings: {section:{settings}}
        :return: resulting settings
        """
        res = []
        for sect, setting in settings.items():
            res.extend(self._api_write('set/setting/' + sect, setting))
        return res

    def update_user_group(self, group_id, down_kbps=-1, up_kbps=-1):
        """
        Update user group bandwidth settings

        :param group_id: Group ID to modify
        :param down_kbps: New bandwidth in KBPS for download
        :param up_kbps: New bandwidth in KBPS for upload
        """

        res = None
        groups = self.get_user_groups()

        for group in groups:
            if group["_id"] == group_id:
                # Apply setting change
                res = self._api_update("rest/usergroup/{0}".format(group_id), {
                    "qos_rate_max_down": down_kbps,
                    "qos_rate_max_up": up_kbps,
                    "name": group["name"],
                    "_id": group_id,
                    "site_id": self.site_id
                })
                return res

        raise ValueError("Group ID {0} is not valid.".format(group_id))

    def set_client_alias(self, mac, alias):
        """
        Set the client alias. Set to "" to reset to default
        :param mac: The MAC of the client to rename
        :param alias: The alias to set
        """
        client = self.get_client(mac)['_id']
        return self._api_update('rest/user/' + client, {'name': alias})

    def create_voucher(self, number, quota, expire, up_bandwidth=None,
                       down_bandwidth=None, byte_quota=None, note=None):
        """
        Create voucher for guests.

        :param number: number of vouchers
        :param quota: number of using; 0 = unlimited
        :param expire: expiration of voucher in minutes
        :param up_bandwidth: up speed allowed in kbps
        :param down_bandwidth: down speed allowed in kbps
        :param byte_quota: quantity of bytes allowed in MB
        :param note: description
        """
        cmd = 'create-voucher'
        params = {'n': number, 'quota': quota, 'expire': 'custom',
                  'expire_number': expire, 'expire_unit': 1}

        if up_bandwidth:
            params['up'] = up_bandwidth
        if down_bandwidth:
            params['down'] = down_bandwidth
        if byte_quota:
            params['bytes'] = byte_quota
        if note:
            params['note'] = note
        res = self._run_command(cmd, mgr='hotspot', params=params)
        return self.list_vouchers(create_time=res[0]['create_time'])

    def list_vouchers(self, **filter):
        """
        Get list of vouchers

        :param filter:  Filter vouchers by create_time, code, quota,
                        used, note, status_expires, status, ...

        """
        if 'code' in filter:
            filter['code'] = filter['code'].replace('-', '')

        vouchers = []
        for voucher in self._api_read('stat/voucher'):
            voucher_match = True
            for key, val in filter.items():
                voucher_match &= voucher.get(key) == val
            if voucher_match:
                vouchers.append(voucher)
        return vouchers

    def delete_voucher(self, id):
        """
        Delete / revoke voucher

        :param id: id of voucher
        """
        cmd = 'delete-voucher'
        params = {'_id': id}
        self._run_command(cmd, mgr='hotspot', params=params)
1 Like

May be deleted

Bump… is anyone else looking at getting this resolved?

So I haven’t had much time to tinker on this until today. I wanted to force the version in the config and when I ran the “check configuration” script I got this error…

Invalid config for [sensor.unifigateway]: [version] is an invalid option for [sensor.unifigateway]. Check: sensor.unifigateway->version. (See ?, line ?).

Here is my config…

sensor:
  - platform: unifigateway
    host: <unifi ip address>
    port: 443
    version: unifiOS
    username: <unifi local account>
    password: <unifi local password>
    monitored_conditions:
      - www
      - lan
      - wan
      - wlan
      - alerts
      - firmware

Shouldn’t the version be something like v5 for example?

I have a UDM-pro and want to include it in home assistant.
This is my config:

### Unifi ###
- platform: unifigateway
  host: 10.0.1.254
  username: !secret unifi_username
  password: !secret unifi_password
  monitored_conditions:
    - www
    - wlan
    - alerts
    - firmware

this is the error message I get.

Logger: custom_components.unifigateway.sensor
Source: custom_components/unifigateway/sensor.py:87
Integration: unifigateway ([documentation](https://github.com/custom-components/sensor.unifigateway))
First occurred: 11:51:07 AM (1 occurrences)
Last logged: 11:51:07 AM

Failed to connect to Unifi Security Gateway: Login failed - status code: 404

and also a warning

Logger: homeassistant.loader
Source: loader.py:802
First occurred: 11:51:03 AM (1 occurrences)
Last logged: 11:51:03 AM

No 'version' key in the manifest file for custom integration 'unifigateway'. This will not be allowed in a future version of Home Assistant. Please report this to the maintainer of 'unifigateway'

Above I saw something about this project not being compatible with UDM an UDM-pro?

@JeroenDeckers dude, this was an awesome custom component before UnifiOS. Since that time there has been limited effort to making the changes to make things work again. There is a new version that is not exposed to HACS and several PRs that are waiting to be merged, including the fix for the manifest version issue. Perhaps the developers will find time to work on it and perhaps not.

I don’t believe the main developer @jchasey even has access to a UDM-pro. Please remember the UDM-Pro and UnifiOS are different to the Unifi system we used previously. And please don’t take this as a knock in any way on @jchasey, because its not at all.

The other possibility, could be if the built in integration which provides other elements extends their sensors to include the www, wlan, vpn, firmware, etc elements we had previously received with this custom component.

As far as your auth issue, in Unifi you need to go to /users/<your_ha_user>/profile and setup a local username and password. I have a user dedicated to home assistant. This works just fine. I am able to authenticate to UnifiOS and protect with no issues.

I have created a local user and i’m using those credentials.
I’m sure the credentials work because i tried logging in.

Is this config block sufficient?

### Unifi ###
- platform: unifigateway
  host: 10.0.1.254
  username: !secret unifi_username
  password: !secret unifi_password
  monitored_conditions:
    - www
    - wlan
    - alerts
    - firmware

@JeroenDeckers your config looks correct, but it still does not work because there are changes that need to be add to support the UDM-Pro. Here is my config just to illustrate. This config worked before I upgraded my router to the UDM-Pro. The only change I have made is adding the Version but that doesn’t work on the older version that is in HACS. I am running version 0.3.0 with no success…

### Unifi Sensors ###
  - platform: unifigateway
    host: 192.168.1.254
    port: 443
    version: unifiOS
    username: <username>
    password: <password>
    monitored_conditions:
      - www
      - wan
      - wlan
      - lan
      - alerts
      - firmware 

HI, just grab the two files above instead and place in your CC folder. It works for me without issue on the UDM Pro apart from the alerts sensor.

which 2 files? i’ve installed the component with HACS.
Did you need to configure the version in your config files?
Can you post your code?

scroll up and you will see them… dont worry about the version number

The files are listed here - UniFi Security Gateway.

I have the component installed via HACS, so I just overwrote the contents of the existing sensor.py, but had to add a new file for the controller.py.

After restarting Core, I’m delighted to report that I can HA can now connect to my UDM-Pro.

Thanks for the code, @grantc!

All good, the only sensor not working was “alerts” - just remove it from the yaml file if you don’t want errors in the log

Good shout. I can confirm that it also works with the latest 1.9.0 firmware.

Working for me too, thank you very much.

Has anyone got any examples of how to display the attributes of the sensors please

regards

  - platform: template
    sensors:
      unifi_gateway_cpu:
        friendly_name: 'CPU'
        value_template: "{{ states.sensor.unifi_gateway_wan.attributes['gw_system-stats']['cpu'] }}"
        icon_template: mdi:cpu-64-bit

      unifi_gateway_mem:
        friendly_name: 'Memory'
        value_template: "{{ states.sensor.unifi_gateway_wan.attributes['gw_system-stats']['mem'] }}"
        icon_template: mdi:memory

      unifi_gateway_wan_ip:
        friendly_name: 'WAN IP'
        value_template: '{{ states.sensor.unifi_gateway_wan.attributes.wan_ip }}'
        icon_template: mdi:wan

      unifi_gateway_wired_clients:
        friendly_name: 'UDM-Pro Wired Clients'
        value_template: "{{ states.sensor.unifi_gateway_lan.attributes.num_user }}"
        icon_template: mdi:lan

      unifi_gateway_wireless_clients:
        friendly_name: 'UDM-Pro Wireless Clients'
        value_template: "{{ states.sensor.unifi_gateway_wlan.attributes.num_user }}"
        icon_template: mdi:wifi

      unifi_gateway_www_xput_up:
        friendly_name: 'UDM-Pro Upload'
        unit_of_measurement: Mbps
        value_template: "{{ states.sensor.unifi_gateway_www.attributes.xput_up }}"
        icon_template: mdi:upload

      unifi_gateway_www_xput_down:
        friendly_name: 'UDM-Pro Download'
        unit_of_measurement: Mbps
        value_template: "{{ states.sensor.unifi_gateway_www.attributes.xput_down }}"
        icon_template: mdi:download

      unifi_gateway_www_speedtest_ping:
        friendly_name: 'UDM-Pro Ping'
        unit_of_measurement: ms
        value_template: "{{ states.sensor.unifi_gateway_www.attributes.speedtest_ping }}"
        icon_template: mdi:run-fast

      unifi_gateway_firmware:
        friendly_name: 'UDM-Firmware Version'
        value_template: "{{ states.sensor.unifi_gateway_wan.attributes.gw_version }}"
        icon_template: mdi:counter

  - platform: template
    sensors:
      unifi_uptime_templated:
        icon_template: mdi:av-timer
        friendly_name: UDM-Pro Uptime
        value_template: >-
              {% set time = (states.sensor.unifi_gateway_wan.attributes['gw_system-stats']['uptime'] | int) | int %}
              {% set minutes = ((time % 3600) / 60) | int %}
              {% set hours = ((time % 86400) / 3600) | int %}
              {% set days = (time / 86400) | int %}

              {%- if time < 60 -%}
                Less than a minute
                {%- else -%}
                {%- if days > 0 -%}
                  {{ days }}d
                {%- endif -%}
                {%- if hours > 0 -%}
                  {%- if days > 0 -%}
                    {{ ' ' }}
                  {%- endif -%}
                  {{ hours }}h
                 {%- endif -%}
                 {%- if minutes > 0 -%}
                   {%- if days > 0 or hours > 0 -%}
                     {{ ' ' }}
                   {%- endif -%}
                   {{ minutes }}m
                 {%- endif -%}
               {%- endif -%}
2 Likes

wow! thank you very much.

much appreciated

@grantc, Thanks for the updated .py files. I am up and running for the first time since I switched from a USG to my UDM-P! This is very much appreciated … :grin:

So I’m a bit new to the home assistant group but I’ve been working on implementing this and I keep receiving a “Failed to connect to Unifi Security Gateway: Login failed - status code: 412” I’ve been searching around and have verified that my password is correct on the config. Any suggestions? Using a USG with US8-150 switch and AP