With this fork I get most sensors work, I don’t know what he does different, but maybe you can figure it out with him
You could rework to use aiounifi rather than pyunifi or even move the custom component pieces over to core integration
I see that you are the creator of that library, do you have any documentation on how to use it? I’d like to play around with it a bit. But as far is my google-fu goes, I couldn’t find any documentation unfortunately.
Documentation? Sorry, no documentation. You can find how it is used by looking at main.py or through the home assistant integration. The tests are quite comprehensive as well.
@Alwin_Hummels - were you able to get the sensors for the Firmware and Alerts working at all? Using the fork you mention?
I just saw this thread but had already posted a similar issue here: Unifi OS 6.0.43/Cloud Key 2.0.24 not compatible with Unifi Integration?
While I don’t use the USG, I do run my network off a Cloud Key Gen2 and found the same issue you did.
Is this integration the same supported Unifi integration that comes native with HomeAssistant?
No sorry that are the only sensors that doesn’t work. Thats why I was hopefull for the updated intergration.
No this only applies to a USG, UDM or UDM Pro.
Great component, thank you very much for all this hard work.
Since because of the breach in Unifi, it requested us to use 2FA and i started using it. However now the component is not working. Is there any plans to include 2FA in your component?
Thanks again…
Hi all,
What is the fix for this at the moment best to use the fork mentioned above?
You need to create a new local user without 2FA and use that instead.
from reading the thread 2FA isn’t a problem if you create a local admin (not on ui.com) with read only access.
The issue if i understand correctly is the unifiOS has broken the integration?
Hi, sorry picking up an old thread but seemed to have hit a wall at this point and wondered if there were any pointers you could recall!
From Home Assistant can ping my USG but this integration throws connection errors on start up. Manually navigating to the URL (unifi:8843/api/login and also tried IP instead of hostname) and get a “connection isn’t private…” type response which I suspect in my problem.
I’ve never done anything with SSL certs or the like and don’t want to run the risk of breaking other things, so just wondered if there was a quick skip around this I could investigate (have tried the config with verify_ssl true and false!). Started down the road of research on certs but seems like something I really need to understand before attempting!
Thanks in advance for any suggestions!
Yeah, I actually gave up on it with the self-signed cert and just went with the controller integration which is now actually living in NodeRed and using the Unifi NPM package.
Was able to get the firmware sensor working, but still cannot get alerts working.
2021-02-08 12:38:45 ERROR (SyncWorker_32) [custom_components.unifigateway.sensor] Failed to access alerts info: Expecting value: line 1 column 1 (char 0)
No doubt something very simple to resolve for someone with a bit more knowledge than myself. I have simply remove the “alerts” line from the yaml file for now.
Sensor.py
import logging
import voluptuous as vol
from datetime import timedelta
from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_NAME, CONF_HOST, CONF_USERNAME, CONF_PASSWORD,
CONF_MONITORED_CONDITIONS, CONF_VERIFY_SSL)
# __version__ = '0.2.3'
_LOGGER = logging.getLogger(__name__)
CONF_PORT = 'port'
CONF_SITE_ID = 'site_id'
DEFAULT_NAME = 'UniFi Gateway'
DEFAULT_HOST = 'localhost'
DEFAULT_PORT = 8443
DEFAULT_SITE = 'default'
DEFAULT_VERIFY_SSL = False
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=30)
SENSOR_VPN = 'vpn'
SENSOR_WWW = 'www'
SENSOR_WAN = 'wan'
SENSOR_LAN = 'lan'
SENSOR_WLAN = 'wlan'
SENSOR_ALERTS = 'alerts'
SENSOR_FIRMWARE = 'firmware'
USG_SENSORS = {
SENSOR_VPN: ['VPN', '', 'mdi:folder-key-network'],
SENSOR_WWW: ['WWW', '', 'mdi:web'],
SENSOR_WAN: ['WAN', '', 'mdi:shield-outline'],
SENSOR_LAN: ['LAN', '', 'mdi:lan'],
SENSOR_WLAN: ['WLAN','', 'mdi:wifi'],
SENSOR_ALERTS: ['Alerts', '', 'mdi:information-outline'],
SENSOR_FIRMWARE:['Firmware Upgradable', '', 'mdi:database-plus']
}
POSSIBLE_MONITORED = [ SENSOR_VPN, SENSOR_WWW, SENSOR_WAN, SENSOR_LAN,
SENSOR_WLAN, SENSOR_ALERTS, SENSOR_FIRMWARE ]
DEFAULT_MONITORED = POSSIBLE_MONITORED
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
vol.Optional(CONF_SITE_ID, default=DEFAULT_SITE): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL):
vol.Any(cv.boolean, cv.isfile),
vol.Optional(CONF_MONITORED_CONDITIONS, default=DEFAULT_MONITORED):
vol.All(cv.ensure_list, [vol.In(POSSIBLE_MONITORED)])
})
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Unifi sensor."""
from .controller import Controller, APIError
name = config.get(CONF_NAME)
host = config.get(CONF_HOST)
username = config.get(CONF_USERNAME)
password = config.get(CONF_PASSWORD)
site_id = config.get(CONF_SITE_ID)
port = config.get(CONF_PORT)
verify_ssl = config.get(CONF_VERIFY_SSL)
try:
ctrl = Controller(host, username, password, port, version='unifiOS',
site_id=site_id, ssl_verify=verify_ssl)
except APIError as ex:
_LOGGER.error("Failed to connect to Unifi Security Gateway: %s", ex)
return False
for sensor in config.get(CONF_MONITORED_CONDITIONS):
add_entities([UnifiGatewaySensor(hass, ctrl, name, sensor)], True)
class UnifiGatewaySensor(Entity):
"""Implementation of a UniFi Gateway sensor."""
def __init__(self, hass, ctrl, name, sensor):
"""Initialize the sensor."""
self._hass = hass
self._ctrl = ctrl
self._name = name + ' ' + USG_SENSORS[sensor][0]
self._sensor = sensor
self._state = None
self._alldata = None
self._data = None
self._attributes = {}
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def icon(self):
"""Icon to use in the frontend, if any."""
return USG_SENSORS[self._sensor][2]
@property
def state(self):
"""Return the state of the device."""
return self._state
@property
def state_attributes(self):
"""Return the device state attributes."""
return self._attributes
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Set up the sensor."""
from .controller import APIError
if self._sensor == SENSOR_ALERTS:
self._attributes = {}
try:
unarchived_alerts = self._ctrl.get_alerts()
except APIError as ex:
_LOGGER.error("Failed to access alerts info: %s", ex)
else:
for index, alert in enumerate(unarchived_alerts,start=1):
if not alert['archived']:
self._attributes[str(index)] = alert
self._state = len(self._attributes)
elif self._sensor == SENSOR_FIRMWARE:
self._attributes = {}
self._state = 0
try:
aps = self._ctrl.get_aps()
except APIError as ex:
_LOGGER.error("Failed to scan aps: %s", ex)
else:
# Set the attributes based on device name - this may not be unique
# but is user-readability preferred
for devices in aps:
if devices.get('upgradable'):
self._attributes[devices['name']] = devices['upgradable']
self._state += 1
else:
# get_healthinfo() call made for each of 4 sensors - should only be for 1
try:
# Check that function exists...potential errors on startup otherwise
if hasattr(self._ctrl,'get_healthinfo'):
self._alldata = self._ctrl.get_healthinfo()
for sub in self._alldata:
if sub['subsystem'] == self._sensor:
self._data = sub
self._state = sub['status'].upper()
for attr in sub:
self._attributes[attr] = sub[attr]
else:
_LOGGER.error("no healthinfo attribute for controller")
except APIError as ex:
_LOGGER.error("Failed to access health info: %s", ex)
Controller.py
import json
import logging
import requests
import shutil
import time
import warnings
"""For testing purposes:
logging.basicConfig(filename='pyunifi.log', level=logging.WARN,
format='%(asctime)s %(message)s')
"""
log = logging.getLogger(__name__)
class APIError(Exception):
pass
def retry_login(func, *args, **kwargs):
"""To reattempt login if requests exception(s) occur at time of call"""
def wrapper(*args, **kwargs):
try:
try:
return func(*args, **kwargs)
except (requests.exceptions.RequestException,
APIError) as err:
log.warning("Failed to perform %s due to %s" % (func, err))
controller = args[0]
controller._login()
return func(*args, **kwargs)
except Exception as err:
raise APIError(err)
return wrapper
class Controller(object):
"""Interact with a UniFi controller.
Uses the JSON interface on port 8443 (HTTPS) to communicate with a UniFi
controller. Operations will raise unifi.controller.APIError on obvious
problems (such as login failure), but many errors (such as disconnecting a
nonexistant client) will go unreported.
>>> from unifi.controller import Controller
>>> c = Controller('192.168.1.99', 'admin', 'p4ssw0rd')
>>> for ap in c.get_aps():
... print 'AP named %s with MAC %s' % (ap.get('name'), ap['mac'])
...
AP named Study with MAC dc:9f:db:1a:59:07
AP named Living Room with MAC dc:9f:db:1a:59:08
AP named Garage with MAC dc:9f:db:1a:59:0b
"""
def __init__(self, host, username, password, port=8443,
version='unifiOS', site_id='default', ssl_verify=True):
"""
:param host: the address of the controller host; IP or name
:param username: the username to log in with
:param password: the password to log in with
:param port: the port of the controller host
:param version: the base version of the controller API [v4|v5]
:param site_id: the site ID to connect to
:param ssl_verify: Verify the controllers SSL certificate,
can also be "path/to/custom_cert.pem"
"""
self.log = logging.getLogger(__name__ + ".Controller")
self.host = host
self.username = username
self.password = password
self.site_id = site_id
self.ssl_verify = ssl_verify
self.url = 'https://' + host + '/proxy/network/'
self.session = requests.Session()
self.session.verify = ssl_verify
self.log.debug('Controller for %s', self.url)
self._login()
@staticmethod
def _jsondec(data):
obj = json.loads(data)
if 'meta' in obj:
if obj['meta']['rc'] != 'ok':
raise APIError(obj['meta']['msg'])
if 'data' in obj:
return obj['data']
else:
return obj
def _api_url(self):
return self.url + 'api/s/' + self.site_id + '/'
@retry_login
def _read(self, url, params=None):
# Try block to handle the unifi server being offline.
r = self.session.get(url, params=params)
return self._jsondec(r.text)
def _api_read(self, url, params=None):
return self._read(self._api_url() + url, params)
@retry_login
def _write(self, url, params=None):
r = self.session.post(url, json=params)
return self._jsondec(r.text)
def _api_write(self, url, params=None):
return self._write(self._api_url() + url, params)
@retry_login
def _update(self, url, params=None):
r = self.session.put(url, json=params)
return self._jsondec(r.text)
def _api_update(self, url, params=None):
return self._update(self._api_url() + url, params)
def _login(self):
log.debug('login() as %s', self.username)
# XXX Why doesn't passing in the dict work?
params = {'username': self.username, 'password': self.password}
login_url = 'https://' + self.host + '/api/auth/login'
r = self.session.post(login_url, json=params)
if r.status_code != 200:
raise APIError("Login failed - status code: %i" % r.status_code)
def _logout(self):
log.debug('logout()')
self._api_write('logout')
def switch_site(self, name):
"""
Switch to another site
:param name: Site Name
:return: True or APIError
"""
for site in self.get_sites():
if site['desc'] == name:
self.site_id = site['name']
return True
raise APIError("No site %s found" % name)
def get_alerts(self):
# Return a list of all Alerts.
return self._api_write('stat/alarm')
def get_alerts_unarchived(self):
# Return a list of Alerts unarchived.
return self._api_write('stat/alarm', params={'archived': False})
def get_statistics_last_24h(self):
"""Returns statistical data of the last 24h"""
return self.get_statistics_24h(time.time())
def get_statistics_24h(self, endtime):
"""Return statistical data last 24h from time"""
params = {
'attrs': ["bytes", "num_sta", "time"],
'start': int(endtime - 86400) * 1000,
'end': int(endtime - 3600) * 1000}
return self._write(self._api_url() + 'stat/report/hourly.site', params)
def get_events(self):
"""Return a list of all Events."""
return self._api_read('stat/event')
def get_aps(self):
"""Return a list of all APs,
with significant information about each.
"""
# Set test to 0 instead of NULL
params = {'_depth': 2, 'test': 0}
return self._api_read('stat/device', params)
def get_client(self, mac):
"""Get details about a specific client"""
# stat/user/<mac> works better than stat/sta/<mac>
# stat/sta seems to be only active clients
# stat/user includes known but offline clients
return self._api_read('stat/user/' + mac)[0]
def get_clients(self):
"""Return a list of all active clients,
with significant information about each.
"""
return self._api_read('stat/sta')
def get_users(self):
"""Return a list of all known clients,
with significant information about each.
"""
return self._api_read('list/user')
def get_user_groups(self):
"""Return a list of user groups with its rate limiting settings."""
return self._api_read('list/usergroup')
def get_sysinfo(self):
"""Return basic system informations."""
return self._api_read('stat/sysinfo')
def get_healthinfo(self):
"""Return health information."""
return self._api_read('stat/health')
def get_sites(self):
"""Return a list of all sites,
with their UID and description"""
return self._read(self.url + 'api/self/sites')
def get_wlan_conf(self):
"""Return a list of configured WLANs
with their configuration parameters.
"""
return self._api_read('list/wlanconf')
def _run_command(self, command, params={}, mgr='stamgr'):
log.debug('_run_command(%s)', command)
params.update({'cmd': command})
return self._write(self._api_url() + 'cmd/' + mgr, params=params)
def _mac_cmd(self, target_mac, command, mgr='stamgr', params={}):
log.debug('_mac_cmd(%s, %s)', target_mac, command)
params['mac'] = target_mac
return self._run_command(command, params, mgr)
def get_device_stat(self, target_mac):
"""Gets the current state & configuration of
the given device based on its MAC Address.
:param target_mac: MAC address of the device.
:type target_mac: str
:returns: Dictionary containing metadata, state,
capabilities and configuration of the device
:rtype: dict()
"""
log.debug('get_device_stat(%s)', target_mac)
params = {"macs": [target_mac]}
return self._api_read('stat/device/' + target_mac, params)[0]
def get_switch_port_overrides(self, target_mac):
"""Gets a list of port overrides, in dictionary
format, for the given target MAC address. The
dictionary contains the port_idx, portconf_id,
poe_mode, & name.
:param target_mac: MAC address of the device.
:type target_mac: str
:returns: [ { 'port_idx': int(), 'portconf': str,
'poe_mode': str, 'name': str } ]
:rtype: list( dict() )
"""
log.debug('get_switch_port_overrides(%s)', target_mac)
return self.get_device_stat(target_mac)['port_overrides']
def _switch_port_power(self, target_mac, port_idx, mode):
"""Helper method to set the given PoE mode the port/switch.
:param target_mac: MAC address of the Switch.
:type target_mac: str
:param port_idx: Port ID to target
:type port_idx: int
:param mode: PoE mode to set. ie. auto, on, off.
:type mode: str
:returns: { 'port_overrides': [ { 'port_idx': int(),
'portconf': str, 'poe_mode': str, 'name': str } ] }
:rtype: dict( list( dict() ) )
"""
# TODO: Switch operations should most likely happen in a
# different Class, Switch.
log.debug('_switch_port_power(%s, %s, %s)', target_mac, port_idx, mode)
device_stat = self.get_device_stat(target_mac)
device_id = device_stat['_id']
overrides = device_stat['port_overrides']
found = False
for i in range(0, len(overrides)):
if overrides[i]['port_idx'] == port_idx:
# Override already exists, update..
overrides[i]['poe_mode'] = mode
found = True
break
if not found:
# Retrieve portconf
portconf_id = None
for port in device_stat['port_table']:
if port['port_idx'] == port_idx:
portconf_id = port['portconf_id']
break
if portconf_id is None:
log.error("Port ID %s could not be found in the port_table.")
raise APIError(
'Port ID %s not found in port_table' % str(port_idx)
)
overrides.append({
"port_idx": port_idx,
"portconf_id": portconf_id,
"poe_mode": mode
})
# We return the device_id as it's needed by the parent method
return {"port_overrides": overrides, "device_id": device_id}
def switch_port_power_off(self, target_mac, port_idx):
"""Powers Off the given port on the Switch identified
by the given MAC Address.
:param target_mac: MAC address of the Switch.
:type target_mac: str
:param port_idx: Port ID to power off
:type port_idx: int
:returns: API Response which is the resulting complete port overrides
:rtype: list( dict() )
"""
log.debug('switch_port_power_off(%s, %s)', target_mac, port_idx)
params = self._switch_port_power(target_mac, port_idx, "off")
device_id = params['device_id']
del params['device_id']
return self._api_update('rest/device/' + device_id, params)
def switch_port_power_on(self, target_mac, port_idx):
"""Powers On the given port on the Switch identified
by the given MAC Address.
:param target_mac: MAC address of the Switch.
:type target_mac: str
:param port_idx: Port ID to power on
:type port_idx: int
:returns: API Response which is the resulting complete port overrides
:rtype: list( dict() )
"""
log.debug('switch_port_power_on(%s, %s)', target_mac, port_idx)
params = self._switch_port_power(target_mac, port_idx, "auto")
device_id = params['device_id']
del params['device_id']
return self._api_update('rest/device/' + device_id, params)
def create_site(self, desc='desc'):
"""Create a new site.
:param desc: Name of the site to be created.
"""
return self._run_command('add-site', params={"desc": desc},
mgr='sitemgr')
def block_client(self, mac):
"""Add a client to the block list.
:param mac: the MAC address of the client to block.
"""
return self._mac_cmd(mac, 'block-sta')
def unblock_client(self, mac):
"""Remove a client from the block list.
:param mac: the MAC address of the client to unblock.
"""
return self._mac_cmd(mac, 'unblock-sta')
def disconnect_client(self, mac):
"""Disconnect a client.
Disconnects a client, forcing them to reassociate. Useful when the
connection is of bad quality to force a rescan.
:param mac: the MAC address of the client to disconnect.
"""
return self._mac_cmd(mac, 'kick-sta')
def restart_ap(self, mac):
"""Restart an access point (by MAC).
:param mac: the MAC address of the AP to restart.
"""
return self._mac_cmd(mac, 'restart', 'devmgr')
def restart_ap_name(self, name):
"""Restart an access point (by name).
:param name: the name address of the AP to restart.
"""
if not name:
raise APIError('%s is not a valid name' % str(name))
for ap in self.get_aps():
if ap.get('state', 0) == 1 and ap.get('name', None) == name:
return self.restart_ap(ap['mac'])
def archive_all_alerts(self):
# Archive all Alerts
return self._run_command('archive-all-alarms', mgr='evtmgr')
def create_backup(self, days='0'):
"""Ask controller to create a backup archive file
..warning:
This process puts significant load on the controller
and may render it partially unresponsive for other requests.
:param days: metrics of the last x days will be added to the backup.
'-1' backup all metrics. '0' backup only the configuration.
:return: URL path to backup file
"""
res = self._run_command('backup', mgr='system', params={'days': days})
return res[0]['url']
def get_backup(self, download_path=None, target_file='unifi-backup.unf'):
"""
:param download_path: path to backup; if None is given
one will be created
:param target_file: Filename or full path to download the
backup archive to, should have .unf extension for restore.
"""
if not download_path:
download_path = self.create_backup()
r = self.session.get(self.url + download_path, stream=True)
with open(target_file, 'wb') as _backfh:
return shutil.copyfileobj(r.raw, _backfh)
def authorize_guest(self, guest_mac, minutes, up_bandwidth=None,
down_bandwidth=None, byte_quota=None, ap_mac=None):
"""
Authorize a guest based on his MAC address.
:param guest_mac: the guest MAC address: 'aa:bb:cc:dd:ee:ff'
:param minutes: duration of the authorization in minutes
:param up_bandwidth: up speed allowed in kbps
:param down_bandwidth: down speed allowed in kbps
:param byte_quota: quantity of bytes allowed in MB
:param ap_mac: access point MAC address
"""
cmd = 'authorize-guest'
params = {'mac': guest_mac, 'minutes': minutes}
if up_bandwidth:
params['up'] = up_bandwidth
if down_bandwidth:
params['down'] = down_bandwidth
if byte_quota:
params['bytes'] = byte_quota
if ap_mac:
params['ap_mac'] = ap_mac
return self._run_command(cmd, params=params)
def unauthorize_guest(self, guest_mac):
"""
Unauthorize a guest based on his MAC address.
:param guest_mac: the guest MAC address: 'aa:bb:cc:dd:ee:ff'
"""
cmd = 'unauthorize-guest'
params = {'mac': guest_mac}
return self._run_command(cmd, params=params)
def get_firmware(self, cached=True, available=True,
known=False, site=False):
"""
Return a list of available/cached firmware versions
:param cached: Return cached firmwares
:param available: Return available (and not cached) firmwares
:param known: Return only firmwares for known devices
:param site: Return only firmwares for on-site devices
:return: List of firmware dicts
"""
res = []
if cached:
res.extend(self._run_command('list-cached', mgr='firmware'))
if available:
res.extend(self._run_command('list-available', mgr='firmware'))
if known:
res = [fw for fw in res if fw['knownDevice']]
if site:
res = [fw for fw in res if fw['siteDevice']]
return res
def cache_firmware(self, version, device):
"""
Cache the firmware on the UniFi Controller
.. warning:: Caching one device might very well cache others,
as they're on shared platforms
:param version: version to cache
:param device: device model to cache (e.g. BZ2)
:return: True/False
"""
return self._run_command(
'download', mgr='firmware',
params={'device': device, 'version': version})[0]['result']
def remove_firmware(self, version, device):
"""
Remove cached firmware from the UniFi Controller
.. warning:: Removing one device's firmware might very well remove
others, as they're on shared platforms
:param version: version to cache
:param device: device model to cache (e.g. BZ2)
:return: True/false
"""
return self._run_command(
'remove', mgr='firmware',
params={'device': device, 'version': version})[0]['result']
def get_tag(self):
"""Get all tags and their member MACs"""
return self._api_read('rest/tag')
def upgrade_device(self, mac, version):
"""
Upgrade a device's firmware to verion
:param mac: MAC of dev
:param version: version to upgrade to
"""
self._mac_cmd(mac, 'upgrade', mgr='devmgr',
params={'upgrade_to_firmware': version})
def provision(self, mac):
"""
Force provisioning of a device
:param mac: MAC of device
"""
self._mac_cmd(mac, 'force-provision', mgr='devmgr')
def get_setting(self, section=None, super=False):
"""
Return settings for this site or controller
:param super: Return only controller-wide settings
:param section: Only return this/these section(s)
:return: {section:settings}
"""
res = {}
settings = self._api_read('get/setting')
if section and not isinstance(section, (list, tuple)):
section = [section]
for s in settings:
s_sect = s['key']
if (super and 'site_id' in s) or \
(not super and 'site_id' not in s) or \
(section and s_sect not in section):
continue
for k in ('_id', 'site_id', 'key'):
s.pop(k, None)
res[s_sect] = s
return res
def update_setting(self, settings):
"""
Update settings
:param settings: {section:{settings}}
:return: resulting settings
"""
res = []
for sect, setting in settings.items():
res.extend(self._api_write('set/setting/' + sect, setting))
return res
def update_user_group(self, group_id, down_kbps=-1, up_kbps=-1):
"""
Update user group bandwidth settings
:param group_id: Group ID to modify
:param down_kbps: New bandwidth in KBPS for download
:param up_kbps: New bandwidth in KBPS for upload
"""
res = None
groups = self.get_user_groups()
for group in groups:
if group["_id"] == group_id:
# Apply setting change
res = self._api_update("rest/usergroup/{0}".format(group_id), {
"qos_rate_max_down": down_kbps,
"qos_rate_max_up": up_kbps,
"name": group["name"],
"_id": group_id,
"site_id": self.site_id
})
return res
raise ValueError("Group ID {0} is not valid.".format(group_id))
def set_client_alias(self, mac, alias):
"""
Set the client alias. Set to "" to reset to default
:param mac: The MAC of the client to rename
:param alias: The alias to set
"""
client = self.get_client(mac)['_id']
return self._api_update('rest/user/' + client, {'name': alias})
def create_voucher(self, number, quota, expire, up_bandwidth=None,
down_bandwidth=None, byte_quota=None, note=None):
"""
Create voucher for guests.
:param number: number of vouchers
:param quota: number of using; 0 = unlimited
:param expire: expiration of voucher in minutes
:param up_bandwidth: up speed allowed in kbps
:param down_bandwidth: down speed allowed in kbps
:param byte_quota: quantity of bytes allowed in MB
:param note: description
"""
cmd = 'create-voucher'
params = {'n': number, 'quota': quota, 'expire': 'custom',
'expire_number': expire, 'expire_unit': 1}
if up_bandwidth:
params['up'] = up_bandwidth
if down_bandwidth:
params['down'] = down_bandwidth
if byte_quota:
params['bytes'] = byte_quota
if note:
params['note'] = note
res = self._run_command(cmd, mgr='hotspot', params=params)
return self.list_vouchers(create_time=res[0]['create_time'])
def list_vouchers(self, **filter):
"""
Get list of vouchers
:param filter: Filter vouchers by create_time, code, quota,
used, note, status_expires, status, ...
"""
if 'code' in filter:
filter['code'] = filter['code'].replace('-', '')
vouchers = []
for voucher in self._api_read('stat/voucher'):
voucher_match = True
for key, val in filter.items():
voucher_match &= voucher.get(key) == val
if voucher_match:
vouchers.append(voucher)
return vouchers
def delete_voucher(self, id):
"""
Delete / revoke voucher
:param id: id of voucher
"""
cmd = 'delete-voucher'
params = {'_id': id}
self._run_command(cmd, mgr='hotspot', params=params)
May be deleted
Bump… is anyone else looking at getting this resolved?
So I haven’t had much time to tinker on this until today. I wanted to force the version in the config and when I ran the “check configuration” script I got this error…
Invalid config for [sensor.unifigateway]: [version] is an invalid option for [sensor.unifigateway]. Check: sensor.unifigateway->version. (See ?, line ?).
Here is my config…
sensor:
- platform: unifigateway
host: <unifi ip address>
port: 443
version: unifiOS
username: <unifi local account>
password: <unifi local password>
monitored_conditions:
- www
- lan
- wan
- wlan
- alerts
- firmware
Shouldn’t the version be something like v5 for example?
I have a UDM-pro and want to include it in home assistant.
This is my config:
### Unifi ###
- platform: unifigateway
host: 10.0.1.254
username: !secret unifi_username
password: !secret unifi_password
monitored_conditions:
- www
- wlan
- alerts
- firmware
this is the error message I get.
Logger: custom_components.unifigateway.sensor
Source: custom_components/unifigateway/sensor.py:87
Integration: unifigateway ([documentation](https://github.com/custom-components/sensor.unifigateway))
First occurred: 11:51:07 AM (1 occurrences)
Last logged: 11:51:07 AM
Failed to connect to Unifi Security Gateway: Login failed - status code: 404
and also a warning
Logger: homeassistant.loader
Source: loader.py:802
First occurred: 11:51:03 AM (1 occurrences)
Last logged: 11:51:03 AM
No 'version' key in the manifest file for custom integration 'unifigateway'. This will not be allowed in a future version of Home Assistant. Please report this to the maintainer of 'unifigateway'
Above I saw something about this project not being compatible with UDM an UDM-pro?