Custom Filter Help

Hey devs, wondering if you can help me with this as I can seem to write the core features of the filter but having some issues with a few edge cases

The filter assumes it is only going UP by the radius, never down unless its a reset

The point of the filter is to filter out any weird reading from an energy sensor. It will replace any readings above the radius and all readings below the previous value as consumption should never go down unless its a reset.

Custom Filter Integration for Home Assistant

File: custom_components/energy_outlier_filter/manifest.json

{
  "domain": "energy_outlier_filter",
  "name": "Energy Outlier Filter",
  "documentation": "https://yourdocs.local",
  "dependencies": [],
  "codeowners": [],
  "version": "1.0.0",
  "requirements": []
}

File: custom_components/energy_outlier_filter/__init__.py

# Empty for now — needed to mark this as a Python package

File: custom_components/energy_outlier_filter/sensor.py

from homeassistant.components.sensor import SensorEntity
from homeassistant.const import STATE_UNKNOWN
from homeassistant.helpers.event import async_track_state_change_event
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import HomeAssistantType, ConfigType, DiscoveryInfoType
import logging

_LOGGER = logging.getLogger(__name__)

CONF_SOURCE = "source"
CONF_RADIUS = "radius"
CONF_NAME = "name"
CONF_WINDOW_SIZE = "window_size"

PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA.extend({
    vol.Required(CONF_SOURCE): cv.entity_id,
    vol.Required(CONF_NAME): cv.string,
    vol.Optional(CONF_RADIUS, default=0.1): vol.Coerce(float),
    vol.Optional(CONF_WINDOW_SIZE, default=1): vol.Coerce(int),
})

def setup_platform(
    hass: HomeAssistantType,
    config: ConfigType,
    add_entities: AddEntitiesCallback,
    discovery_info: DiscoveryInfoType = None,
) -> None:
    source = config[CONF_SOURCE]
    name = config[CONF_NAME]
    radius = config[CONF_RADIUS]
    window_size = config[CONF_WINDOW_SIZE]
    add_entities([EnergyOutlierFilterSensor(hass, name, source, radius, window_size)], True)


class EnergyOutlierFilterSensor(SensorEntity):
    _attr_should_poll = False

    def __init__(self, hass, name, source_entity_id, radius, window_size):
        self._hass = hass
        self._name = name
        self._source_entity_id = source_entity_id
        self._radius = radius
        self._window_size = window_size
        self._state = None
        self._history = []

    @property
    def name(self):
        return self._name

    @property
    def state(self):
        return self._state

    async def async_added_to_hass(self):
        async_track_state_change_event(
            self._hass, self._source_entity_id, self._handle_source_event
        )

    async def _handle_source_event(self, event):
        new_state = event.data.get("new_state")
        if new_state is None or new_state.state in [STATE_UNKNOWN, "unavailable"]:
            return

        try:
            current = float(new_state.state)
        except ValueError:
            return

        if len(self._history) == 0:
            self._state = current
        else:
            last = self._history[-1]
            # Accept only if it's a reset or a valid increase
            if current < last:
                _LOGGER.debug("Reset detected: %s < %s — accepted", current, last)
                self._state = current
            elif abs(current - last) <= self._radius:
                _LOGGER.debug("Accepted increase: %s (last: %s)", current, last)
                self._state = current
            else:
                _LOGGER.debug("Outlier rejected: %s (last: %s)", current, last)
                return

        # Update history
        if len(self._history) >= self._window_size:
            self._history.pop(0)
        self._history.append(self._state)

        self.async_write_ha_state()

Example configuration.yaml usage

sensor:
  - platform: energy_outlier_filter
    name: "Freezer - Energy Consumed [kWh Filtered]"
    source: sensor.freezer_plug_energy_consumed_kwh
    radius: 0.1
    window_size: 1

If you take the above sensor I created based on my code, if any reading is above 0.1 it will ignore it and replace it with the old reading. Unless the source resets to 0 then it will accept that change that is outside the radius. But is there a way to test for a false reading to unknow/unavailable which is also 0… so just not sure how to handle that issue.

Also another issue I am thinking is if for some reason a legit sensor reading happens that just happens to be a bit outside the radius, this filter sensor will go basically dead because the original reading will never be in range again! any idea how to program for this? I am thinking of taking the last 10 readings always (no window) and then average the change and see if the new change is off by more then the radius so that way it will keep going?

NM I have decided to just go with a Template sensor that does everything I need