I’m working on a custom component and have my device and entities discovered, now working on fetching data. I’d like to have a function that monitors the serial port and pushing data to my entities. The doc says I should be using a DataUpdateCoordinator, but I’m a little fuzzy on how it would work with local_push. Also, can the python function that monitors the serial port be setup to run by HA? Any help would be appreciated. Thanks
Not sure if you ever figured this out or not, but I am making a custom component that gets data from a socketio connection…so local_push for me as well. In the DataUpdateCoordinator you call async_set_updated_data(). For me it looks like:
@self.sio.on("chlorinator")
async def handle_chlorinator(data):
data["event"] = EVENT_CHLORINATOR
self.async_set_updated_data(data)
Then for your entities you have a def _handle_coordinator_update(self). This is the data that gets sent from the DataUpdateCoordinator. Mine looks like this:
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
if (
self.coordinator.data["event"] == EVENT_CHLORINATOR
and self.coordinator.data["id"] == self._chlorinator["id"]
):
self._value = self.coordinator.data[self._type]
self.async_write_ha_state()
elif self.coordinator.data["event"] == EVENT_AVAILABILITY:
self._available = self.coordinator.data["available"]
self.async_write_ha_state()
I’m still a little confused as to how the data gets into the coordinator. Did you write code that monitors the socket and pushes the data by calling handle_chlorinator()? Does this “push” code run inside or outside or HASS? Thanks
@PaulStew67 take a look at my njsPC-HA components (https://github.com/Crewski/njsPC-HA/) to see how it all works…mainly the init.py and one of the entities. When setting up the entities, create and instance of the DataCoordinator (NjsPCHAdata). Then I call the connect function (for you it would be your serial monitoring function. In the socketio callbacks, the data gets “pushed” via the async_set_updated_data() method. For the entity, the data is received in the _handle_coordinator_update() function. I hope this helps some. I’m not sure on the monitoring of the serial port though. I found some monitoring examples online that print out the data. Just replace the print(data) line with the async_set_updated_data() call to push it to your entities. Below are the relevant sections.
init.py:
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up njsPC-HA from a config entry."""
# TODO Store an API object for your platforms to access
api = NjsPCHAapi(hass, entry.data)
await api.get_config()
coordinator = NjsPCHAdata(hass, api)
await coordinator.sio_connect()
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
async def _async_sio_close(_: Event) -> None:
await coordinator.sio_close()
entry.async_on_unload(
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_sio_close)
)
return True
class NjsPCHAdata(DataUpdateCoordinator):
"""Data coordinator for receiving from nodejs-PoolController"""
def __init__(self, hass: HomeAssistant, api: NjsPCHAapi):
"""Initialize data coordinator."""
super().__init__(
hass,
_LOGGER,
# Name of the data. For logging purposes.
name=DOMAIN,
)
self.api = api
self.sio = None
async def sio_connect(self):
"""Method to connect to nodejs-PoolController"""
self.sio = socketio.AsyncClient(
reconnection=True,
reconnection_attempts=0,
reconnection_delay=1,
reconnection_delay_max=10,
logger=False,
engineio_logger=False,
)
@self.sio.on("chlorinator")
async def handle_chlorinator(data):
data["event"] = EVENT_CHLORINATOR
self.async_set_updated_data(data)
@self.sio.event
async def connect():
print("I'm connected!")
avail = {"event": EVENT_AVAILABILITY, "available": True}
self.async_set_updated_data(avail)
self.logger.debug(f"SocketIO connect to {self.api._base_url}")
@self.sio.event
async def connect_error(data):
avail = {"event": EVENT_AVAILABILITY, "available": False}
self.async_set_updated_data(avail)
self.logger.error(f"SocketIO connection error: {data}")
print("The connection failed!")
@self.sio.event
async def disconnect():
avail = {"event": EVENT_AVAILABILITY, "available": False}
self.async_set_updated_data(avail)
self.logger.debug(f"SocketIO disconnect to {self.api._base_url}")
print("I'm disconnected!")
await self.sio.connect(self.api._base_url)
async def sio_close(self):
await self.sio.disconnect()
sensor.py:
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Add sensors for passed config_entry in HA."""
coordinator = hass.data[DOMAIN][config_entry.entry_id]
new_devices = []
for chlorinator in coordinator.api._config["chlorinators"]:
new_devices.append(SaltSensor(coordinator, chlorinator))
if new_devices:
async_add_entities(new_devices)
class SaltSensor(CoordinatorEntity, SensorEntity):
"""SWG Salt Sensor for njsPC-HA"""
def __init__(self, coordinator, chlorinator):
"""Initialize the sensor."""
super().__init__(coordinator)
self._chlorinator = chlorinator
self._value = chlorinator["saltLevel"]
self._available = True
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
if (
self.coordinator.data["event"] == EVENT_CHLORINATOR
and self.coordinator.data["id"] == self._chlorinator["id"]
):
self._chlorinator = self.coordinator.data
self.async_write_ha_state()
elif self.coordinator.data["event"] == EVENT_AVAILABILITY:
self._available = self.coordinator.data["available"]
self.async_write_ha_state()
@property
def should_poll(self):
return False
@property
def available(self):
return self._available
@property
def name(self):
"""Name of the sensor"""
return self._chlorinator["name"] + " Salt Level"
@property
def unique_id(self):
"""ID of the sensor"""
return self.coordinator.api.get_unique_id(
f'saltlevel_{self._chlorinator["id"]}'
)
@property
def state_class(self):
"""State class of the sensor"""
return STATE_CLASS_MEASUREMENT
@property
def native_value(self):
"""Raw value of the sensor"""
return self._chlorinator["saltLevel"]
@property
def icon(self):
return "mdi:shaker-outline"
@property
def native_unit_of_measurement(self) -> str:
return "PPM"
@property
def extra_state_attributes(self):
"""Return entity specific state attributes."""
return {
"salt_target": self._chlorinator["saltTarget"],
"salt_required": self._chlorinator["saltRequired"],
}