Use of the update coordinator with pushing endpoint

I have some threads listening for incoming data. The data is processed and returned as mydata from a function. What I can’t quite figure out now is how the data update coordinator should be put together. The documentation describes it as “Do this by not passing polling parameters update_method and update_interval to the constructor”, but that doesn’t give me any results, so I think I’m missing something. It looks like this now, a bit weird in my opinion but this does work.

> async def do_mydata_update():
>         while not mydata:
>             await asyncio.sleep(10)  # Check every 10 second for filled dict
>             #_LOGGER.warning("Waiting for data...")
>         return mydata
> 
>     coordinator = DataUpdateCoordinator(
>         hass,
>         _LOGGER,
>         name=DOMAIN,
>         update_method=do_mydata_update,
>         update_interval=timedelta(seconds=10) #omitting this won't update the entities
>     )
>     await coordinator.async_config_entry_first_refresh()

Whilst your threads are listening for data, it looks like this is really a polling method. As such, setting your update method and interval is correct and as such it is not really a push api.

See this example for how a push api would work.

Its not exactly the same as it initially reads the api to get its data but then listens for pushed updates.

Thanks for the quick reply Mark. They do look very similar. I don’t know when the dictionary will be provided with new data, it could be somewhere between 3 and 5 minutes, sometimes even faster. Currently the entities will be refreshed every 10 seconds even if the dict has not changed. Is there a solution for that (I mean update the entities only if the dictionary has been changed)?

It really depends on the mechanism that receives this data. If you can show me the code, i can point you in the right direction.

I have a function where I start the proxyserver

listener_1 = socketserver.TCPServer((host, port1), PROXYSERVER)
thread_1 = threading.Thread(target=listener_1.serve_forever)
listener_2 = socketserver.TCPServer((host, port2), PROXYSERVER)
thread_2 = threading.Thread(target=listener_2.serve_forever)
listener_3 = socketserver.TCPServer((host, port3), PROXYSERVER)
thread_3 = threading.Thread(target=listener_3.serve_forever)
for threads in thread_1, thread_2, thread_3:
	threads.start()

It calls the following class

class PROXYSERVER(BaseRequestHandler):
    """ Class provides data dictionary """
    def handle(self):
        rec = self.request.recv(1024)
        #build mydata dictionary based on recv data
        return mydata

I have no control on when data arrives (and mydata is being updated) it runs continuously in the background.

OK, what I don’t get from your code snippets is how the data gets into HA.

Are your socketserver threads and the handler part of your integration code and what is your PROXYSERVER class handle method returning data to?

Fundamentally, I would think it is best to use a callback function with your socketserver instance. Below is how I would do it. Not fully tested and may need tweaking to suit your data etc. But, creat subclasses to allow your server and handler to accept a callback function

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    """Threaded TCP Server with callback."""

    def __init__(self, host_port_tuple, streamhandler, callback) -> None:
        """Initialise."""
        super().__init__(host_port_tuple, streamhandler)
        self.callback = callback


class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
    """Subclassed handler to enable callback."""

    def handle(self):
        """Handle received data."""
        callback_func = self.server.callback
        rec = self.request.recv(1024)
        callback_func(rec)

Then your coordinator should look something like this.

Notes:

  • You will see the subclasses creates a threaded listener without you having to manage the threads yourself.
  • You will see the callback function - which calls async_set_updated_data. This will update self.data and tell the entities to update their values. ie calls _handle_coordinator_update on your entities.
  • I am adding each listener instance to a list as they are created and iterating this list to shut them down when the coordinator shutdown function is called by HA.
  • As I say, it may need some tweaking for your specific data and how it needs to update the entities and that should be done in the update_callback method.
"""Example integration using DataUpdateCoordinator."""

import logging

from config.custom_components.msp_push_data_example.sockets import (
    ThreadedTCPRequestHandler,
    ThreadedTCPServer,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import DOMAIN, HomeAssistant, callback
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator

_LOGGER = logging.getLogger(__name__)

SOCKET_HOST = "192.168.1.1"
SOCKET_PORTS = [65001, 65002, 65003]


class ExampleCoordinator(DataUpdateCoordinator):
    """My example coordinator."""

    def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None:
        """Initialize coordinator."""

        self.socket_servers: list[ThreadedTCPServer] = []

        # Initialise DataUpdateCoordinator
        super().__init__(
            hass,
            _LOGGER,
            name=f"{DOMAIN} ({config_entry.unique_id})",
            # Set update method to None.
            update_method=None,
            # Do not set a polling interval as data will be pushed.
            # You can remove this line but left here for explanatory purposes.
            update_interval=None,
        )

        self.setup_socket_servers()

    async def async_shutdown(self) -> None:
        """Run shutdown clean up."""
        for socket_server in self.socket_servers:
            # This might be blocking and if so do
            # await hass.async_run_in_executor(socket_server.shutdown())
            socket_server.shutdown()
        await super().async_shutdown()

    @callback
    def update_callback(self, data):
        """Socket datacallback.

        This will be called form a thread so add a hass job
        to the loop to set updated data.
        """
        self.hass.add_job(self.async_set_updated_data, data)

    def setup_socket_servers(self) -> None:
        """Initialise socket server."""

        for port in SOCKET_PORTS:
            self.socket_servers.append(
                ThreadedTCPServer(
                    (SOCKET_HOST, port),
                    ThreadedTCPRequestHandler,
                    self.update_callback,
                ).serve_forever()
            )

Hope that helps.