Hey devs, wondering if you can help me with this as I can seem to write the core features of the filter but having some issues with a few edge cases
The filter assumes it is only going UP by the radius, never down unless its a reset
The point of the filter is to filter out any weird reading from an energy sensor. It will replace any readings above the radius and all readings below the previous value as consumption should never go down unless its a reset.
Custom Filter Integration for Home Assistant
File: custom_components/energy_outlier_filter/manifest.json
{
"domain": "energy_outlier_filter",
"name": "Energy Outlier Filter",
"documentation": "https://yourdocs.local",
"dependencies": [],
"codeowners": [],
"version": "1.0.0",
"requirements": []
}
File: custom_components/energy_outlier_filter/__init__.py
# Empty for now — needed to mark this as a Python package
File: custom_components/energy_outlier_filter/sensor.py
from homeassistant.components.sensor import SensorEntity
from homeassistant.const import STATE_UNKNOWN
from homeassistant.helpers.event import async_track_state_change_event
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import HomeAssistantType, ConfigType, DiscoveryInfoType
import logging
_LOGGER = logging.getLogger(__name__)
CONF_SOURCE = "source"
CONF_RADIUS = "radius"
CONF_NAME = "name"
CONF_WINDOW_SIZE = "window_size"
PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA.extend({
vol.Required(CONF_SOURCE): cv.entity_id,
vol.Required(CONF_NAME): cv.string,
vol.Optional(CONF_RADIUS, default=0.1): vol.Coerce(float),
vol.Optional(CONF_WINDOW_SIZE, default=1): vol.Coerce(int),
})
def setup_platform(
hass: HomeAssistantType,
config: ConfigType,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType = None,
) -> None:
source = config[CONF_SOURCE]
name = config[CONF_NAME]
radius = config[CONF_RADIUS]
window_size = config[CONF_WINDOW_SIZE]
add_entities([EnergyOutlierFilterSensor(hass, name, source, radius, window_size)], True)
class EnergyOutlierFilterSensor(SensorEntity):
_attr_should_poll = False
def __init__(self, hass, name, source_entity_id, radius, window_size):
self._hass = hass
self._name = name
self._source_entity_id = source_entity_id
self._radius = radius
self._window_size = window_size
self._state = None
self._history = []
@property
def name(self):
return self._name
@property
def state(self):
return self._state
async def async_added_to_hass(self):
async_track_state_change_event(
self._hass, self._source_entity_id, self._handle_source_event
)
async def _handle_source_event(self, event):
new_state = event.data.get("new_state")
if new_state is None or new_state.state in [STATE_UNKNOWN, "unavailable"]:
return
try:
current = float(new_state.state)
except ValueError:
return
if len(self._history) == 0:
self._state = current
else:
last = self._history[-1]
# Accept only if it's a reset or a valid increase
if current < last:
_LOGGER.debug("Reset detected: %s < %s — accepted", current, last)
self._state = current
elif abs(current - last) <= self._radius:
_LOGGER.debug("Accepted increase: %s (last: %s)", current, last)
self._state = current
else:
_LOGGER.debug("Outlier rejected: %s (last: %s)", current, last)
return
# Update history
if len(self._history) >= self._window_size:
self._history.pop(0)
self._history.append(self._state)
self.async_write_ha_state()
Example configuration.yaml usage
sensor:
- platform: energy_outlier_filter
name: "Freezer - Energy Consumed [kWh Filtered]"
source: sensor.freezer_plug_energy_consumed_kwh
radius: 0.1
window_size: 1
If you take the above sensor I created based on my code, if any reading is above 0.1 it will ignore it and replace it with the old reading. Unless the source resets to 0 then it will accept that change that is outside the radius. But is there a way to test for a false reading to unknow/unavailable which is also 0… so just not sure how to handle that issue.
Also another issue I am thinking is if for some reason a legit sensor reading happens that just happens to be a bit outside the radius, this filter sensor will go basically dead because the original reading will never be in range again! any idea how to program for this? I am thinking of taking the last 10 readings always (no window) and then average the change and see if the new change is off by more then the radius so that way it will keep going?