Custom component with local_push for serial device

I’m working on a custom component and have my device and entities discovered, now working on fetching data. I’d like to have a function that monitors the serial port and pushing data to my entities. The doc says I should be using a DataUpdateCoordinator, but I’m a little fuzzy on how it would work with local_push. Also, can the python function that monitors the serial port be setup to run by HA? Any help would be appreciated. Thanks

Not sure if you ever figured this out or not, but I am making a custom component that gets data from a socketio connection…so local_push for me as well. In the DataUpdateCoordinator you call async_set_updated_data(). For me it looks like:

@self.sio.on("chlorinator")
async def handle_chlorinator(data):
       data["event"] = EVENT_CHLORINATOR
       self.async_set_updated_data(data)

Then for your entities you have a def _handle_coordinator_update(self). This is the data that gets sent from the DataUpdateCoordinator. Mine looks like this:

def _handle_coordinator_update(self) -> None:
        """Handle updated data from the coordinator."""
        if (
            self.coordinator.data["event"] == EVENT_CHLORINATOR
            and self.coordinator.data["id"] == self._chlorinator["id"]
        ):
            self._value = self.coordinator.data[self._type]
            self.async_write_ha_state()
        elif self.coordinator.data["event"] == EVENT_AVAILABILITY:
            self._available = self.coordinator.data["available"]
            self.async_write_ha_state()

I’m still a little confused as to how the data gets into the coordinator. Did you write code that monitors the socket and pushes the data by calling handle_chlorinator()? Does this “push” code run inside or outside or HASS? Thanks

@PaulStew67 take a look at my njsPC-HA components (https://github.com/Crewski/njsPC-HA/) to see how it all works…mainly the init.py and one of the entities. When setting up the entities, create and instance of the DataCoordinator (NjsPCHAdata). Then I call the connect function (for you it would be your serial monitoring function. In the socketio callbacks, the data gets “pushed” via the async_set_updated_data() method. For the entity, the data is received in the _handle_coordinator_update() function. I hope this helps some. I’m not sure on the monitoring of the serial port though. I found some monitoring examples online that print out the data. Just replace the print(data) line with the async_set_updated_data() call to push it to your entities. Below are the relevant sections.

init.py:

async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
    """Set up njsPC-HA from a config entry."""
    # TODO Store an API object for your platforms to access

    api = NjsPCHAapi(hass, entry.data)
    await api.get_config()

    coordinator = NjsPCHAdata(hass, api)
    await coordinator.sio_connect()

    hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator

    hass.config_entries.async_setup_platforms(entry, PLATFORMS)

    async def _async_sio_close(_: Event) -> None:
        await coordinator.sio_close()

    entry.async_on_unload(
        hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_sio_close)
    )

    return True


class NjsPCHAdata(DataUpdateCoordinator):
    """Data coordinator for receiving from nodejs-PoolController"""

    def __init__(self, hass: HomeAssistant, api: NjsPCHAapi):
        """Initialize data coordinator."""
        super().__init__(
            hass,
            _LOGGER,
            # Name of the data. For logging purposes.
            name=DOMAIN,
        )
        self.api = api
        self.sio = None

    async def sio_connect(self):
        """Method to connect to nodejs-PoolController"""
        self.sio = socketio.AsyncClient(
            reconnection=True,
            reconnection_attempts=0,
            reconnection_delay=1,
            reconnection_delay_max=10,
            logger=False,
            engineio_logger=False,
        )

                @self.sio.on("chlorinator")
        async def handle_chlorinator(data):
            data["event"] = EVENT_CHLORINATOR
            self.async_set_updated_data(data)

        
        @self.sio.event
        async def connect():
            print("I'm connected!")
            avail = {"event": EVENT_AVAILABILITY, "available": True}
            self.async_set_updated_data(avail)
            self.logger.debug(f"SocketIO connect to {self.api._base_url}")

        @self.sio.event
        async def connect_error(data):
            avail = {"event": EVENT_AVAILABILITY, "available": False}
            self.async_set_updated_data(avail)
            self.logger.error(f"SocketIO connection error: {data}")
            print("The connection failed!")

        @self.sio.event
        async def disconnect():
            avail = {"event": EVENT_AVAILABILITY, "available": False}
            self.async_set_updated_data(avail)
            self.logger.debug(f"SocketIO disconnect to {self.api._base_url}")
            print("I'm disconnected!")

        await self.sio.connect(self.api._base_url)

    async def sio_close(self):
        await self.sio.disconnect()

sensor.py:

async def async_setup_entry(hass, config_entry, async_add_entities):
    """Add sensors for passed config_entry in HA."""
    coordinator = hass.data[DOMAIN][config_entry.entry_id]

    new_devices = []

    for chlorinator in coordinator.api._config["chlorinators"]:
        new_devices.append(SaltSensor(coordinator, chlorinator))
    if new_devices:
        async_add_entities(new_devices)



class SaltSensor(CoordinatorEntity, SensorEntity):
    """SWG Salt Sensor for njsPC-HA"""

    def __init__(self, coordinator, chlorinator):
        """Initialize the sensor."""
        super().__init__(coordinator)
        self._chlorinator = chlorinator
        self._value = chlorinator["saltLevel"]
        self._available = True

    def _handle_coordinator_update(self) -> None:
        """Handle updated data from the coordinator."""
        if (
            self.coordinator.data["event"] == EVENT_CHLORINATOR
            and self.coordinator.data["id"] == self._chlorinator["id"]
        ):
            self._chlorinator = self.coordinator.data
            self.async_write_ha_state()
        elif self.coordinator.data["event"] == EVENT_AVAILABILITY:
            self._available = self.coordinator.data["available"]
            self.async_write_ha_state()

    @property
    def should_poll(self):
        return False

    @property
    def available(self):
        return self._available

    @property
    def name(self):
        """Name of the sensor"""
        return self._chlorinator["name"] + " Salt Level"

    @property
    def unique_id(self):
        """ID of the sensor"""
        return self.coordinator.api.get_unique_id(
            f'saltlevel_{self._chlorinator["id"]}'
        )

    @property
    def state_class(self):
        """State class of the sensor"""
        return STATE_CLASS_MEASUREMENT

    @property
    def native_value(self):
        """Raw value of the sensor"""
        return self._chlorinator["saltLevel"]

    @property
    def icon(self):
        return "mdi:shaker-outline"

    @property
    def native_unit_of_measurement(self) -> str:
        return "PPM"

    @property
    def extra_state_attributes(self):
        """Return entity specific state attributes."""
        return {
            "salt_target": self._chlorinator["saltTarget"],
            "salt_required": self._chlorinator["saltRequired"],
        }