So for context, I want to make an integration that relies on polling an API. This API endpoint only returns the devices if they are physically turned on.
I made use of the following code from Fetching Data | Home Assistant Developer Docs.
"""Example integration using DataUpdateCoordinator."""
from datetime import timedelta
import logging
import async_timeout
from homeassistant.components.light import LightEntity
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
UpdateFailed,
)
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass, entry, async_add_entities):
"""Config entry example."""
# assuming API object stored here by __init__.py
api = hass.data[DOMAIN][entry.entry_id]
async def async_update_data():
"""Fetch data from API endpoint.
This is the place to pre-process the data to lookup tables
so entities can quickly look up their data.
"""
try:
# Note: asyncio.TimeoutError and aiohttp.ClientError are already
# handled by the data update coordinator.
async with async_timeout.timeout(10):
return await api.fetch_data()
except ApiAuthError as err:
# Raising ConfigEntryAuthFailed will cancel future updates
# and start a config flow with SOURCE_REAUTH (async_step_reauth)
raise ConfigEntryAuthFailed from err
except ApiError as err:
raise UpdateFailed(f"Error communicating with API: {err}")
coordinator = DataUpdateCoordinator(
hass,
_LOGGER,
# Name of the data. For logging purposes.
name="sensor",
update_method=async_update_data,
# Polling interval. Will only be polled if there are subscribers.
update_interval=timedelta(seconds=30),
)
#
# Fetch initial data so we have data when entities subscribe
#
# If the refresh fails, async_config_entry_first_refresh will
# raise ConfigEntryNotReady and setup will try again later
#
# If you do not want to retry setup on failure, use
# coordinator.async_refresh() instead
#
await coordinator.async_config_entry_first_refresh()
async_add_entities(
MyEntity(coordinator, idx) for idx, ent in enumerate(coordinator.data)
)
class MyEntity(CoordinatorEntity, LightEntity):
"""An entity using CoordinatorEntity.
The CoordinatorEntity class provides:
should_poll
async_update
async_added_to_hass
available
"""
def __init__(self, coordinator, idx):
"""Pass coordinator to CoordinatorEntity."""
super().__init__(coordinator)
self.idx = idx
@property
def is_on(self):
"""Return entity state.
Example to show how we fetch data from coordinator.
"""
self.coordinator.data[self.idx]["state"]
async def async_turn_on(self, **kwargs):
"""Turn the light on.
Example method how to request data updates.
"""
# Do the turning on.
# ...
# Update the data
await self.coordinator.async_request_refresh()
However, this example will only poll when there are subscribers (registered entities). So when I start HA and my API does not return any device, the polling won’t start.
How would I go about changing this example code to poll always? Even when there is no device returned from the API.
Thanks