Hi all,
What is the fix for this at the moment best to use the fork mentioned above?
Hi all,
What is the fix for this at the moment best to use the fork mentioned above?
You need to create a new local user without 2FA and use that instead.
from reading the thread 2FA isn’t a problem if you create a local admin (not on ui.com) with read only access.
The issue if i understand correctly is the unifiOS has broken the integration?
Hi, sorry picking up an old thread but seemed to have hit a wall at this point and wondered if there were any pointers you could recall!
From Home Assistant can ping my USG but this integration throws connection errors on start up. Manually navigating to the URL (unifi:8843/api/login and also tried IP instead of hostname) and get a “connection isn’t private…” type response which I suspect in my problem.
I’ve never done anything with SSL certs or the like and don’t want to run the risk of breaking other things, so just wondered if there was a quick skip around this I could investigate (have tried the config with verify_ssl true and false!). Started down the road of research on certs but seems like something I really need to understand before attempting!
Thanks in advance for any suggestions!
Yeah, I actually gave up on it with the self-signed cert and just went with the controller integration which is now actually living in NodeRed and using the Unifi NPM package.
Was able to get the firmware sensor working, but still cannot get alerts working.
2021-02-08 12:38:45 ERROR (SyncWorker_32) [custom_components.unifigateway.sensor] Failed to access alerts info: Expecting value: line 1 column 1 (char 0)
No doubt something very simple to resolve for someone with a bit more knowledge than myself. I have simply remove the “alerts” line from the yaml file for now.
Sensor.py
import logging
import voluptuous as vol
from datetime import timedelta
from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_NAME, CONF_HOST, CONF_USERNAME, CONF_PASSWORD,
CONF_MONITORED_CONDITIONS, CONF_VERIFY_SSL)
# __version__ = '0.2.3'
_LOGGER = logging.getLogger(__name__)
CONF_PORT = 'port'
CONF_SITE_ID = 'site_id'
DEFAULT_NAME = 'UniFi Gateway'
DEFAULT_HOST = 'localhost'
DEFAULT_PORT = 8443
DEFAULT_SITE = 'default'
DEFAULT_VERIFY_SSL = False
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=30)
SENSOR_VPN = 'vpn'
SENSOR_WWW = 'www'
SENSOR_WAN = 'wan'
SENSOR_LAN = 'lan'
SENSOR_WLAN = 'wlan'
SENSOR_ALERTS = 'alerts'
SENSOR_FIRMWARE = 'firmware'
USG_SENSORS = {
SENSOR_VPN: ['VPN', '', 'mdi:folder-key-network'],
SENSOR_WWW: ['WWW', '', 'mdi:web'],
SENSOR_WAN: ['WAN', '', 'mdi:shield-outline'],
SENSOR_LAN: ['LAN', '', 'mdi:lan'],
SENSOR_WLAN: ['WLAN','', 'mdi:wifi'],
SENSOR_ALERTS: ['Alerts', '', 'mdi:information-outline'],
SENSOR_FIRMWARE:['Firmware Upgradable', '', 'mdi:database-plus']
}
POSSIBLE_MONITORED = [ SENSOR_VPN, SENSOR_WWW, SENSOR_WAN, SENSOR_LAN,
SENSOR_WLAN, SENSOR_ALERTS, SENSOR_FIRMWARE ]
DEFAULT_MONITORED = POSSIBLE_MONITORED
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
vol.Optional(CONF_SITE_ID, default=DEFAULT_SITE): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL):
vol.Any(cv.boolean, cv.isfile),
vol.Optional(CONF_MONITORED_CONDITIONS, default=DEFAULT_MONITORED):
vol.All(cv.ensure_list, [vol.In(POSSIBLE_MONITORED)])
})
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Unifi sensor."""
from .controller import Controller, APIError
name = config.get(CONF_NAME)
host = config.get(CONF_HOST)
username = config.get(CONF_USERNAME)
password = config.get(CONF_PASSWORD)
site_id = config.get(CONF_SITE_ID)
port = config.get(CONF_PORT)
verify_ssl = config.get(CONF_VERIFY_SSL)
try:
ctrl = Controller(host, username, password, port, version='unifiOS',
site_id=site_id, ssl_verify=verify_ssl)
except APIError as ex:
_LOGGER.error("Failed to connect to Unifi Security Gateway: %s", ex)
return False
for sensor in config.get(CONF_MONITORED_CONDITIONS):
add_entities([UnifiGatewaySensor(hass, ctrl, name, sensor)], True)
class UnifiGatewaySensor(Entity):
"""Implementation of a UniFi Gateway sensor."""
def __init__(self, hass, ctrl, name, sensor):
"""Initialize the sensor."""
self._hass = hass
self._ctrl = ctrl
self._name = name + ' ' + USG_SENSORS[sensor][0]
self._sensor = sensor
self._state = None
self._alldata = None
self._data = None
self._attributes = {}
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def icon(self):
"""Icon to use in the frontend, if any."""
return USG_SENSORS[self._sensor][2]
@property
def state(self):
"""Return the state of the device."""
return self._state
@property
def state_attributes(self):
"""Return the device state attributes."""
return self._attributes
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Set up the sensor."""
from .controller import APIError
if self._sensor == SENSOR_ALERTS:
self._attributes = {}
try:
unarchived_alerts = self._ctrl.get_alerts()
except APIError as ex:
_LOGGER.error("Failed to access alerts info: %s", ex)
else:
for index, alert in enumerate(unarchived_alerts,start=1):
if not alert['archived']:
self._attributes[str(index)] = alert
self._state = len(self._attributes)
elif self._sensor == SENSOR_FIRMWARE:
self._attributes = {}
self._state = 0
try:
aps = self._ctrl.get_aps()
except APIError as ex:
_LOGGER.error("Failed to scan aps: %s", ex)
else:
# Set the attributes based on device name - this may not be unique
# but is user-readability preferred
for devices in aps:
if devices.get('upgradable'):
self._attributes[devices['name']] = devices['upgradable']
self._state += 1
else:
# get_healthinfo() call made for each of 4 sensors - should only be for 1
try:
# Check that function exists...potential errors on startup otherwise
if hasattr(self._ctrl,'get_healthinfo'):
self._alldata = self._ctrl.get_healthinfo()
for sub in self._alldata:
if sub['subsystem'] == self._sensor:
self._data = sub
self._state = sub['status'].upper()
for attr in sub:
self._attributes[attr] = sub[attr]
else:
_LOGGER.error("no healthinfo attribute for controller")
except APIError as ex:
_LOGGER.error("Failed to access health info: %s", ex)
Controller.py
import json
import logging
import requests
import shutil
import time
import warnings
"""For testing purposes:
logging.basicConfig(filename='pyunifi.log', level=logging.WARN,
format='%(asctime)s %(message)s')
"""
log = logging.getLogger(__name__)
class APIError(Exception):
pass
def retry_login(func, *args, **kwargs):
"""To reattempt login if requests exception(s) occur at time of call"""
def wrapper(*args, **kwargs):
try:
try:
return func(*args, **kwargs)
except (requests.exceptions.RequestException,
APIError) as err:
log.warning("Failed to perform %s due to %s" % (func, err))
controller = args[0]
controller._login()
return func(*args, **kwargs)
except Exception as err:
raise APIError(err)
return wrapper
class Controller(object):
"""Interact with a UniFi controller.
Uses the JSON interface on port 8443 (HTTPS) to communicate with a UniFi
controller. Operations will raise unifi.controller.APIError on obvious
problems (such as login failure), but many errors (such as disconnecting a
nonexistant client) will go unreported.
>>> from unifi.controller import Controller
>>> c = Controller('192.168.1.99', 'admin', 'p4ssw0rd')
>>> for ap in c.get_aps():
... print 'AP named %s with MAC %s' % (ap.get('name'), ap['mac'])
...
AP named Study with MAC dc:9f:db:1a:59:07
AP named Living Room with MAC dc:9f:db:1a:59:08
AP named Garage with MAC dc:9f:db:1a:59:0b
"""
def __init__(self, host, username, password, port=8443,
version='unifiOS', site_id='default', ssl_verify=True):
"""
:param host: the address of the controller host; IP or name
:param username: the username to log in with
:param password: the password to log in with
:param port: the port of the controller host
:param version: the base version of the controller API [v4|v5]
:param site_id: the site ID to connect to
:param ssl_verify: Verify the controllers SSL certificate,
can also be "path/to/custom_cert.pem"
"""
self.log = logging.getLogger(__name__ + ".Controller")
self.host = host
self.username = username
self.password = password
self.site_id = site_id
self.ssl_verify = ssl_verify
self.url = 'https://' + host + '/proxy/network/'
self.session = requests.Session()
self.session.verify = ssl_verify
self.log.debug('Controller for %s', self.url)
self._login()
@staticmethod
def _jsondec(data):
obj = json.loads(data)
if 'meta' in obj:
if obj['meta']['rc'] != 'ok':
raise APIError(obj['meta']['msg'])
if 'data' in obj:
return obj['data']
else:
return obj
def _api_url(self):
return self.url + 'api/s/' + self.site_id + '/'
@retry_login
def _read(self, url, params=None):
# Try block to handle the unifi server being offline.
r = self.session.get(url, params=params)
return self._jsondec(r.text)
def _api_read(self, url, params=None):
return self._read(self._api_url() + url, params)
@retry_login
def _write(self, url, params=None):
r = self.session.post(url, json=params)
return self._jsondec(r.text)
def _api_write(self, url, params=None):
return self._write(self._api_url() + url, params)
@retry_login
def _update(self, url, params=None):
r = self.session.put(url, json=params)
return self._jsondec(r.text)
def _api_update(self, url, params=None):
return self._update(self._api_url() + url, params)
def _login(self):
log.debug('login() as %s', self.username)
# XXX Why doesn't passing in the dict work?
params = {'username': self.username, 'password': self.password}
login_url = 'https://' + self.host + '/api/auth/login'
r = self.session.post(login_url, json=params)
if r.status_code != 200:
raise APIError("Login failed - status code: %i" % r.status_code)
def _logout(self):
log.debug('logout()')
self._api_write('logout')
def switch_site(self, name):
"""
Switch to another site
:param name: Site Name
:return: True or APIError
"""
for site in self.get_sites():
if site['desc'] == name:
self.site_id = site['name']
return True
raise APIError("No site %s found" % name)
def get_alerts(self):
# Return a list of all Alerts.
return self._api_write('stat/alarm')
def get_alerts_unarchived(self):
# Return a list of Alerts unarchived.
return self._api_write('stat/alarm', params={'archived': False})
def get_statistics_last_24h(self):
"""Returns statistical data of the last 24h"""
return self.get_statistics_24h(time.time())
def get_statistics_24h(self, endtime):
"""Return statistical data last 24h from time"""
params = {
'attrs': ["bytes", "num_sta", "time"],
'start': int(endtime - 86400) * 1000,
'end': int(endtime - 3600) * 1000}
return self._write(self._api_url() + 'stat/report/hourly.site', params)
def get_events(self):
"""Return a list of all Events."""
return self._api_read('stat/event')
def get_aps(self):
"""Return a list of all APs,
with significant information about each.
"""
# Set test to 0 instead of NULL
params = {'_depth': 2, 'test': 0}
return self._api_read('stat/device', params)
def get_client(self, mac):
"""Get details about a specific client"""
# stat/user/<mac> works better than stat/sta/<mac>
# stat/sta seems to be only active clients
# stat/user includes known but offline clients
return self._api_read('stat/user/' + mac)[0]
def get_clients(self):
"""Return a list of all active clients,
with significant information about each.
"""
return self._api_read('stat/sta')
def get_users(self):
"""Return a list of all known clients,
with significant information about each.
"""
return self._api_read('list/user')
def get_user_groups(self):
"""Return a list of user groups with its rate limiting settings."""
return self._api_read('list/usergroup')
def get_sysinfo(self):
"""Return basic system informations."""
return self._api_read('stat/sysinfo')
def get_healthinfo(self):
"""Return health information."""
return self._api_read('stat/health')
def get_sites(self):
"""Return a list of all sites,
with their UID and description"""
return self._read(self.url + 'api/self/sites')
def get_wlan_conf(self):
"""Return a list of configured WLANs
with their configuration parameters.
"""
return self._api_read('list/wlanconf')
def _run_command(self, command, params={}, mgr='stamgr'):
log.debug('_run_command(%s)', command)
params.update({'cmd': command})
return self._write(self._api_url() + 'cmd/' + mgr, params=params)
def _mac_cmd(self, target_mac, command, mgr='stamgr', params={}):
log.debug('_mac_cmd(%s, %s)', target_mac, command)
params['mac'] = target_mac
return self._run_command(command, params, mgr)
def get_device_stat(self, target_mac):
"""Gets the current state & configuration of
the given device based on its MAC Address.
:param target_mac: MAC address of the device.
:type target_mac: str
:returns: Dictionary containing metadata, state,
capabilities and configuration of the device
:rtype: dict()
"""
log.debug('get_device_stat(%s)', target_mac)
params = {"macs": [target_mac]}
return self._api_read('stat/device/' + target_mac, params)[0]
def get_switch_port_overrides(self, target_mac):
"""Gets a list of port overrides, in dictionary
format, for the given target MAC address. The
dictionary contains the port_idx, portconf_id,
poe_mode, & name.
:param target_mac: MAC address of the device.
:type target_mac: str
:returns: [ { 'port_idx': int(), 'portconf': str,
'poe_mode': str, 'name': str } ]
:rtype: list( dict() )
"""
log.debug('get_switch_port_overrides(%s)', target_mac)
return self.get_device_stat(target_mac)['port_overrides']
def _switch_port_power(self, target_mac, port_idx, mode):
"""Helper method to set the given PoE mode the port/switch.
:param target_mac: MAC address of the Switch.
:type target_mac: str
:param port_idx: Port ID to target
:type port_idx: int
:param mode: PoE mode to set. ie. auto, on, off.
:type mode: str
:returns: { 'port_overrides': [ { 'port_idx': int(),
'portconf': str, 'poe_mode': str, 'name': str } ] }
:rtype: dict( list( dict() ) )
"""
# TODO: Switch operations should most likely happen in a
# different Class, Switch.
log.debug('_switch_port_power(%s, %s, %s)', target_mac, port_idx, mode)
device_stat = self.get_device_stat(target_mac)
device_id = device_stat['_id']
overrides = device_stat['port_overrides']
found = False
for i in range(0, len(overrides)):
if overrides[i]['port_idx'] == port_idx:
# Override already exists, update..
overrides[i]['poe_mode'] = mode
found = True
break
if not found:
# Retrieve portconf
portconf_id = None
for port in device_stat['port_table']:
if port['port_idx'] == port_idx:
portconf_id = port['portconf_id']
break
if portconf_id is None:
log.error("Port ID %s could not be found in the port_table.")
raise APIError(
'Port ID %s not found in port_table' % str(port_idx)
)
overrides.append({
"port_idx": port_idx,
"portconf_id": portconf_id,
"poe_mode": mode
})
# We return the device_id as it's needed by the parent method
return {"port_overrides": overrides, "device_id": device_id}
def switch_port_power_off(self, target_mac, port_idx):
"""Powers Off the given port on the Switch identified
by the given MAC Address.
:param target_mac: MAC address of the Switch.
:type target_mac: str
:param port_idx: Port ID to power off
:type port_idx: int
:returns: API Response which is the resulting complete port overrides
:rtype: list( dict() )
"""
log.debug('switch_port_power_off(%s, %s)', target_mac, port_idx)
params = self._switch_port_power(target_mac, port_idx, "off")
device_id = params['device_id']
del params['device_id']
return self._api_update('rest/device/' + device_id, params)
def switch_port_power_on(self, target_mac, port_idx):
"""Powers On the given port on the Switch identified
by the given MAC Address.
:param target_mac: MAC address of the Switch.
:type target_mac: str
:param port_idx: Port ID to power on
:type port_idx: int
:returns: API Response which is the resulting complete port overrides
:rtype: list( dict() )
"""
log.debug('switch_port_power_on(%s, %s)', target_mac, port_idx)
params = self._switch_port_power(target_mac, port_idx, "auto")
device_id = params['device_id']
del params['device_id']
return self._api_update('rest/device/' + device_id, params)
def create_site(self, desc='desc'):
"""Create a new site.
:param desc: Name of the site to be created.
"""
return self._run_command('add-site', params={"desc": desc},
mgr='sitemgr')
def block_client(self, mac):
"""Add a client to the block list.
:param mac: the MAC address of the client to block.
"""
return self._mac_cmd(mac, 'block-sta')
def unblock_client(self, mac):
"""Remove a client from the block list.
:param mac: the MAC address of the client to unblock.
"""
return self._mac_cmd(mac, 'unblock-sta')
def disconnect_client(self, mac):
"""Disconnect a client.
Disconnects a client, forcing them to reassociate. Useful when the
connection is of bad quality to force a rescan.
:param mac: the MAC address of the client to disconnect.
"""
return self._mac_cmd(mac, 'kick-sta')
def restart_ap(self, mac):
"""Restart an access point (by MAC).
:param mac: the MAC address of the AP to restart.
"""
return self._mac_cmd(mac, 'restart', 'devmgr')
def restart_ap_name(self, name):
"""Restart an access point (by name).
:param name: the name address of the AP to restart.
"""
if not name:
raise APIError('%s is not a valid name' % str(name))
for ap in self.get_aps():
if ap.get('state', 0) == 1 and ap.get('name', None) == name:
return self.restart_ap(ap['mac'])
def archive_all_alerts(self):
# Archive all Alerts
return self._run_command('archive-all-alarms', mgr='evtmgr')
def create_backup(self, days='0'):
"""Ask controller to create a backup archive file
..warning:
This process puts significant load on the controller
and may render it partially unresponsive for other requests.
:param days: metrics of the last x days will be added to the backup.
'-1' backup all metrics. '0' backup only the configuration.
:return: URL path to backup file
"""
res = self._run_command('backup', mgr='system', params={'days': days})
return res[0]['url']
def get_backup(self, download_path=None, target_file='unifi-backup.unf'):
"""
:param download_path: path to backup; if None is given
one will be created
:param target_file: Filename or full path to download the
backup archive to, should have .unf extension for restore.
"""
if not download_path:
download_path = self.create_backup()
r = self.session.get(self.url + download_path, stream=True)
with open(target_file, 'wb') as _backfh:
return shutil.copyfileobj(r.raw, _backfh)
def authorize_guest(self, guest_mac, minutes, up_bandwidth=None,
down_bandwidth=None, byte_quota=None, ap_mac=None):
"""
Authorize a guest based on his MAC address.
:param guest_mac: the guest MAC address: 'aa:bb:cc:dd:ee:ff'
:param minutes: duration of the authorization in minutes
:param up_bandwidth: up speed allowed in kbps
:param down_bandwidth: down speed allowed in kbps
:param byte_quota: quantity of bytes allowed in MB
:param ap_mac: access point MAC address
"""
cmd = 'authorize-guest'
params = {'mac': guest_mac, 'minutes': minutes}
if up_bandwidth:
params['up'] = up_bandwidth
if down_bandwidth:
params['down'] = down_bandwidth
if byte_quota:
params['bytes'] = byte_quota
if ap_mac:
params['ap_mac'] = ap_mac
return self._run_command(cmd, params=params)
def unauthorize_guest(self, guest_mac):
"""
Unauthorize a guest based on his MAC address.
:param guest_mac: the guest MAC address: 'aa:bb:cc:dd:ee:ff'
"""
cmd = 'unauthorize-guest'
params = {'mac': guest_mac}
return self._run_command(cmd, params=params)
def get_firmware(self, cached=True, available=True,
known=False, site=False):
"""
Return a list of available/cached firmware versions
:param cached: Return cached firmwares
:param available: Return available (and not cached) firmwares
:param known: Return only firmwares for known devices
:param site: Return only firmwares for on-site devices
:return: List of firmware dicts
"""
res = []
if cached:
res.extend(self._run_command('list-cached', mgr='firmware'))
if available:
res.extend(self._run_command('list-available', mgr='firmware'))
if known:
res = [fw for fw in res if fw['knownDevice']]
if site:
res = [fw for fw in res if fw['siteDevice']]
return res
def cache_firmware(self, version, device):
"""
Cache the firmware on the UniFi Controller
.. warning:: Caching one device might very well cache others,
as they're on shared platforms
:param version: version to cache
:param device: device model to cache (e.g. BZ2)
:return: True/False
"""
return self._run_command(
'download', mgr='firmware',
params={'device': device, 'version': version})[0]['result']
def remove_firmware(self, version, device):
"""
Remove cached firmware from the UniFi Controller
.. warning:: Removing one device's firmware might very well remove
others, as they're on shared platforms
:param version: version to cache
:param device: device model to cache (e.g. BZ2)
:return: True/false
"""
return self._run_command(
'remove', mgr='firmware',
params={'device': device, 'version': version})[0]['result']
def get_tag(self):
"""Get all tags and their member MACs"""
return self._api_read('rest/tag')
def upgrade_device(self, mac, version):
"""
Upgrade a device's firmware to verion
:param mac: MAC of dev
:param version: version to upgrade to
"""
self._mac_cmd(mac, 'upgrade', mgr='devmgr',
params={'upgrade_to_firmware': version})
def provision(self, mac):
"""
Force provisioning of a device
:param mac: MAC of device
"""
self._mac_cmd(mac, 'force-provision', mgr='devmgr')
def get_setting(self, section=None, super=False):
"""
Return settings for this site or controller
:param super: Return only controller-wide settings
:param section: Only return this/these section(s)
:return: {section:settings}
"""
res = {}
settings = self._api_read('get/setting')
if section and not isinstance(section, (list, tuple)):
section = [section]
for s in settings:
s_sect = s['key']
if (super and 'site_id' in s) or \
(not super and 'site_id' not in s) or \
(section and s_sect not in section):
continue
for k in ('_id', 'site_id', 'key'):
s.pop(k, None)
res[s_sect] = s
return res
def update_setting(self, settings):
"""
Update settings
:param settings: {section:{settings}}
:return: resulting settings
"""
res = []
for sect, setting in settings.items():
res.extend(self._api_write('set/setting/' + sect, setting))
return res
def update_user_group(self, group_id, down_kbps=-1, up_kbps=-1):
"""
Update user group bandwidth settings
:param group_id: Group ID to modify
:param down_kbps: New bandwidth in KBPS for download
:param up_kbps: New bandwidth in KBPS for upload
"""
res = None
groups = self.get_user_groups()
for group in groups:
if group["_id"] == group_id:
# Apply setting change
res = self._api_update("rest/usergroup/{0}".format(group_id), {
"qos_rate_max_down": down_kbps,
"qos_rate_max_up": up_kbps,
"name": group["name"],
"_id": group_id,
"site_id": self.site_id
})
return res
raise ValueError("Group ID {0} is not valid.".format(group_id))
def set_client_alias(self, mac, alias):
"""
Set the client alias. Set to "" to reset to default
:param mac: The MAC of the client to rename
:param alias: The alias to set
"""
client = self.get_client(mac)['_id']
return self._api_update('rest/user/' + client, {'name': alias})
def create_voucher(self, number, quota, expire, up_bandwidth=None,
down_bandwidth=None, byte_quota=None, note=None):
"""
Create voucher for guests.
:param number: number of vouchers
:param quota: number of using; 0 = unlimited
:param expire: expiration of voucher in minutes
:param up_bandwidth: up speed allowed in kbps
:param down_bandwidth: down speed allowed in kbps
:param byte_quota: quantity of bytes allowed in MB
:param note: description
"""
cmd = 'create-voucher'
params = {'n': number, 'quota': quota, 'expire': 'custom',
'expire_number': expire, 'expire_unit': 1}
if up_bandwidth:
params['up'] = up_bandwidth
if down_bandwidth:
params['down'] = down_bandwidth
if byte_quota:
params['bytes'] = byte_quota
if note:
params['note'] = note
res = self._run_command(cmd, mgr='hotspot', params=params)
return self.list_vouchers(create_time=res[0]['create_time'])
def list_vouchers(self, **filter):
"""
Get list of vouchers
:param filter: Filter vouchers by create_time, code, quota,
used, note, status_expires, status, ...
"""
if 'code' in filter:
filter['code'] = filter['code'].replace('-', '')
vouchers = []
for voucher in self._api_read('stat/voucher'):
voucher_match = True
for key, val in filter.items():
voucher_match &= voucher.get(key) == val
if voucher_match:
vouchers.append(voucher)
return vouchers
def delete_voucher(self, id):
"""
Delete / revoke voucher
:param id: id of voucher
"""
cmd = 'delete-voucher'
params = {'_id': id}
self._run_command(cmd, mgr='hotspot', params=params)
May be deleted
Bump… is anyone else looking at getting this resolved?
So I haven’t had much time to tinker on this until today. I wanted to force the version in the config and when I ran the “check configuration” script I got this error…
Invalid config for [sensor.unifigateway]: [version] is an invalid option for [sensor.unifigateway]. Check: sensor.unifigateway->version. (See ?, line ?).
Here is my config…
sensor:
- platform: unifigateway
host: <unifi ip address>
port: 443
version: unifiOS
username: <unifi local account>
password: <unifi local password>
monitored_conditions:
- www
- lan
- wan
- wlan
- alerts
- firmware
Shouldn’t the version be something like v5 for example?
I have a UDM-pro and want to include it in home assistant.
This is my config:
### Unifi ###
- platform: unifigateway
host: 10.0.1.254
username: !secret unifi_username
password: !secret unifi_password
monitored_conditions:
- www
- wlan
- alerts
- firmware
this is the error message I get.
Logger: custom_components.unifigateway.sensor
Source: custom_components/unifigateway/sensor.py:87
Integration: unifigateway ([documentation](https://github.com/custom-components/sensor.unifigateway))
First occurred: 11:51:07 AM (1 occurrences)
Last logged: 11:51:07 AM
Failed to connect to Unifi Security Gateway: Login failed - status code: 404
and also a warning
Logger: homeassistant.loader
Source: loader.py:802
First occurred: 11:51:03 AM (1 occurrences)
Last logged: 11:51:03 AM
No 'version' key in the manifest file for custom integration 'unifigateway'. This will not be allowed in a future version of Home Assistant. Please report this to the maintainer of 'unifigateway'
Above I saw something about this project not being compatible with UDM an UDM-pro?
@JeroenDeckers dude, this was an awesome custom component before UnifiOS. Since that time there has been limited effort to making the changes to make things work again. There is a new version that is not exposed to HACS and several PRs that are waiting to be merged, including the fix for the manifest version issue. Perhaps the developers will find time to work on it and perhaps not.
I don’t believe the main developer @jchasey even has access to a UDM-pro. Please remember the UDM-Pro and UnifiOS are different to the Unifi system we used previously. And please don’t take this as a knock in any way on @jchasey, because its not at all.
The other possibility, could be if the built in integration which provides other elements extends their sensors to include the www, wlan, vpn, firmware, etc elements we had previously received with this custom component.
As far as your auth issue, in Unifi you need to go to /users/<your_ha_user>/profile and setup a local username and password. I have a user dedicated to home assistant. This works just fine. I am able to authenticate to UnifiOS and protect with no issues.
I have created a local user and i’m using those credentials.
I’m sure the credentials work because i tried logging in.
Is this config block sufficient?
### Unifi ###
- platform: unifigateway
host: 10.0.1.254
username: !secret unifi_username
password: !secret unifi_password
monitored_conditions:
- www
- wlan
- alerts
- firmware
@JeroenDeckers your config looks correct, but it still does not work because there are changes that need to be add to support the UDM-Pro. Here is my config just to illustrate. This config worked before I upgraded my router to the UDM-Pro. The only change I have made is adding the Version but that doesn’t work on the older version that is in HACS. I am running version 0.3.0 with no success…
### Unifi Sensors ###
- platform: unifigateway
host: 192.168.1.254
port: 443
version: unifiOS
username: <username>
password: <password>
monitored_conditions:
- www
- wan
- wlan
- lan
- alerts
- firmware
HI, just grab the two files above instead and place in your CC folder. It works for me without issue on the UDM Pro apart from the alerts sensor.
which 2 files? i’ve installed the component with HACS.
Did you need to configure the version in your config files?
Can you post your code?
scroll up and you will see them… dont worry about the version number
The files are listed here - UniFi Security Gateway.
I have the component installed via HACS, so I just overwrote the contents of the existing sensor.py, but had to add a new file for the controller.py.
After restarting Core, I’m delighted to report that I can HA can now connect to my UDM-Pro.
Thanks for the code, @grantc!
All good, the only sensor not working was “alerts” - just remove it from the yaml file if you don’t want errors in the log
Good shout. I can confirm that it also works with the latest 1.9.0 firmware.