D0 Sensor with schedule/timeout

Hi, i have this python code to scan D0 smart meter.

But the Smartmeter pushes so much times, that there are so much informations coming in every seconds…

I want to make a “timeout” or something, so it just scans or accept data all 1 minute or something

Can anyone help me to implement that i just get info all 1 minute and not every second?

thats init.py

"""Support for D0 smart meters."""
import asyncio
from homeassistant.core import callback
import logging
import serial_asyncio
import re
import time

import voluptuous as vol

from homeassistant.const import CONF_HOST
from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv

_LOGGER = logging.getLogger(__name__)

DOMAIN = "smartmeter"
CONF_PORT = "port"
CONF_OBIS = "obis"

CONFIG_SCHEMA = vol.Schema(
    {
        DOMAIN: vol.Schema(
            {
                vol.Optional(CONF_PORT, default="/dev/ttyUSB0"): cv.string,
                vol.Optional(CONF_OBIS, default=[]): vol.All(
                    cv.ensure_list, vol.Length(min=1), [cv.string]
                )
            }
        )
    },
    extra=vol.ALLOW_EXTRA,
)

START_CHAR = b'/'
END_CHAR = b'!'

async def async_setup(hass, config):
    """Check connectivity and version of smartmeter."""
    port = config[DOMAIN][CONF_PORT]
    obis = config[DOMAIN][CONF_OBIS]
    meter = Meter(hass, port, obis)
    hass.data[DOMAIN] = meter

    # Wait for smartmeter setup complete (initial values loaded)
    if not await meter.setup():
        _LOGGER.error("Could not find a meter device at %s", port)
        return False

    # Load components
    hass.async_create_task(
        discovery.async_load_platform(hass, "sensor", DOMAIN, {}, config)
    )

    return True


class Meter():
    """Representation of a smartmeter connection."""

    def __init__(self, hass, port, list_of_sensors):
        """Initialize charging station connection."""

        self._list_of_sensors = list_of_sensors
        self._update_listeners = []
        self._hass = hass
        self._port = port
        self._values = {}
        self._units = {}
        self.device_name = "smartmeter"  # correct device name will be set in setup()
        self.device_id = "smartmeter_"  # correct device id will be set in setup()

        self.regex_data_set = re.compile(r"^(.+)\((.*)\)")
        self.regex_data_set_data = re.compile(r"^(.*)\*(.*)")

    def get_list_of_sensors(self):
        return self._list_of_sensors

    async def setup(self, loop=None):
        """Initialize smartmeter object."""

        # start serial connection
        loop = asyncio.get_event_loop() if loop is None else loop
        transport, protocol = await serial_asyncio.create_serial_connection(loop, D0Reader, self._port, baudrate=9600)
        protocol.set_callback(self.hass_callback)
    
        return True

    def get_value(self, address):

        value = self._values.get(address, None)
        unit = self._units.get(address, None)
        return value, unit

    def hass_callback(self, data):
        """Handle component notification via callback."""

        # Analyse and preprocess received data blob
        for line in data.splitlines(True):
            
            first_match = self.regex_data_set.search(line)
            if first_match:
                address = first_match.group(1)
                second_match = self.regex_data_set_data.search(first_match.group(2))
                if second_match:
                    self._values[address] = float(second_match.group(1))
                    self._units[address] = second_match.group(2)
                    _LOGGER.debug("extracted %s with value %s with unit %s", address, self._values[address], self._units[address])

        # Inform entities about updated values
        for listener in self._update_listeners:
            listener()

        _LOGGER.debug("Notifying %d listeners", len(self._update_listeners))

    def add_update_listener(self, listener):
        """Add a listener for update notifications."""
        self._update_listeners.append(listener)

        # initial data is already loaded, thus update the component
        listener()

class D0Reader(asyncio.Protocol):

    def __init__(self) -> None:
        super().__init__()
        self._data = ""

    def set_callback(self, callback):
        self._callback = callback

    def connection_made(self, transport):
        self.transport = transport
        _LOGGER.debug("port opened %s", transport)

    def data_received(self, data):
        # Reset buffer if new start byte received
        if START_CHAR in data:
            _LOGGER.debug("new start found")
            self._data = ""
        
        # Buffer content and fetch data
        # TODO: change data type to bytearray and convert to string afterwards
        self._data += data.decode("latin-1")

        # Callback if ending byte found(complete message received)
        if END_CHAR in data:
            _LOGGER.debug("end found, run callback")
          #   _LOGGER.debug("data: %s", self._data)
            # TODO: check CRC
            self._callback(self._data)

    def connection_lost(self, exc):
        _LOGGER.debug("serial connection closed: %s", exc)
        self.transport.loop.stop()

and thats the other file

"""Support for smartmeter sensors."""
import logging

from homeassistant.helpers.entity import Entity

from . import DOMAIN

_LOGGER = logging.getLogger(__name__)


async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
    """Set up the smart meter platform."""
    if discovery_info is None:
        return

    smartmeter = hass.data[DOMAIN]

    obis = smartmeter.get_list_of_sensors()
    _LOGGER.debug("Setting up sensors: %s", repr(obis))


    sensors = []
    for sensor_obis in obis:
        sensors.append(MeterSensor(smartmeter, sensor_obis, "Verbrauch", "mdi:flash"))
    
    async_add_entities(sensors)


class MeterSensor(Entity):
    """The entity class for smart meter sensors."""

    def __init__(self, smartmeter, obis, name, icon, device_class=None):
        """Initialize the smartmeter sensor."""
        self._smartmeter = smartmeter
        self._obis = obis
        self._name = name
        self._icon = icon
        self._unit = None
        self._device_class = device_class

        self._state = None
        self._attributes = {}

    @property
    def should_poll(self):
        """Deactivate polling. Data updated by smartmeter."""
        return False

    @property
    def unique_id(self):
        """Return the unique ID of the binary sensor."""
        return f"{self._smartmeter.device_id}_{self._obis}"

    @property
    def name(self):
        """Return the name of the device."""
        names={
                '1-0:1.8.0*255':'Consumption',
                '1-0:2.8.0*255':'Generation'
             }
        name = names.get(self._obis,"unknown")

        return f"{self._smartmeter.device_name} {name}"

    @property
    def device_class(self):
        """Return the class of this sensor."""
        return self._device_class

    @property
    def icon(self):
        """Icon to use in the frontend, if any."""
        return self._icon

    @property
    def state(self):
        """Return the state of the sensor."""
        return self._state

    @property
    def unit_of_measurement(self):
        """Get the unit of measurement."""
        return self._unit

    @property
    def device_state_attributes(self):
        """Return the state attributes of the binary sensor."""
        return self._attributes

    async def async_update(self):
        """Get latest cached states from the device."""
        self._state, self._unit = self._smartmeter.get_value(self._obis)

    def update_callback(self):
        """Schedule a state update."""
        self.async_schedule_update_ha_state(True)

    async def async_added_to_hass(self):
        """Add update callback after being added to hass."""
        self._smartmeter.add_update_listener(self.update_callback)

Would be great if someone can tell me, where i can put like a timeout or that it just looks all 1 minute and not every second… thats to much data, so i cannot check the course anymore (loads to long)

1 Like

You could do something I used to do for trading systems on wall street during very frantic busy IPO’S, which is this strategy (it was receiving THOUSANDS of price ‘ticks’ as a stock is frantically traded, much faster than any human can act upon them, so I only let some prices through). Update your code with two parameters (say, “X” milliseconds as one parameter to specify how often you want the update to be sent, and the second parameter regarding how much different one message would have to be from the previous message so that it should not be disregarded but passed on) so both parameters can be updated easily later, on the fly (if you want to get fancy, you can update that parameter on the fly at runtime if the CPU gets overloaded or whatever!). Then, don’t just pass the message received along - instead, add all the messages into a thread-safe hash table. Then a separate process can every “X” seconds, send out the most recent record in the hashtable, and then truncate the hashtable so it is empty. Voila! That’s just for the first parameter, the “X” milliseconds - and the second parameter - you’ll need to decide how to handle that one - What do you think?