OK, what I don’t get from your code snippets is how the data gets into HA.
Are your socketserver threads and the handler part of your integration code and what is your PROXYSERVER class handle method returning data to?
Fundamentally, I would think it is best to use a callback function with your socketserver instance. Below is how I would do it. Not fully tested and may need tweaking to suit your data etc. But, creat subclasses to allow your server and handler to accept a callback function
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
"""Threaded TCP Server with callback."""
def __init__(self, host_port_tuple, streamhandler, callback) -> None:
"""Initialise."""
super().__init__(host_port_tuple, streamhandler)
self.callback = callback
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
"""Subclassed handler to enable callback."""
def handle(self):
"""Handle received data."""
callback_func = self.server.callback
rec = self.request.recv(1024)
callback_func(rec)
Then your coordinator should look something like this.
Notes:
- You will see the subclasses creates a threaded listener without you having to manage the threads yourself.
- You will see the callback function - which calls async_set_updated_data. This will update self.data and tell the entities to update their values. ie calls _handle_coordinator_update on your entities.
- I am adding each listener instance to a list as they are created and iterating this list to shut them down when the coordinator shutdown function is called by HA.
- As I say, it may need some tweaking for your specific data and how it needs to update the entities and that should be done in the update_callback method.
"""Example integration using DataUpdateCoordinator."""
import logging
from config.custom_components.msp_push_data_example.sockets import (
ThreadedTCPRequestHandler,
ThreadedTCPServer,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import DOMAIN, HomeAssistant, callback
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
SOCKET_HOST = "192.168.1.1"
SOCKET_PORTS = [65001, 65002, 65003]
class ExampleCoordinator(DataUpdateCoordinator):
"""My example coordinator."""
def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None:
"""Initialize coordinator."""
self.socket_servers: list[ThreadedTCPServer] = []
# Initialise DataUpdateCoordinator
super().__init__(
hass,
_LOGGER,
name=f"{DOMAIN} ({config_entry.unique_id})",
# Set update method to None.
update_method=None,
# Do not set a polling interval as data will be pushed.
# You can remove this line but left here for explanatory purposes.
update_interval=None,
)
self.setup_socket_servers()
async def async_shutdown(self) -> None:
"""Run shutdown clean up."""
for socket_server in self.socket_servers:
# This might be blocking and if so do
# await hass.async_run_in_executor(socket_server.shutdown())
socket_server.shutdown()
await super().async_shutdown()
@callback
def update_callback(self, data):
"""Socket datacallback.
This will be called form a thread so add a hass job
to the loop to set updated data.
"""
self.hass.add_job(self.async_set_updated_data, data)
def setup_socket_servers(self) -> None:
"""Initialise socket server."""
for port in SOCKET_PORTS:
self.socket_servers.append(
ThreadedTCPServer(
(SOCKET_HOST, port),
ThreadedTCPRequestHandler,
self.update_callback,
).serve_forever()
)
Hope that helps.