Wrote a version of Statistics that is time weighted. It uses the built-in scan_interval
parameter to determine the frequency of sampling the source sensor. Going to take me a while to write the tests and docs to get this production ready (and maybe it should also be merged with the original Statistics sensor), but in the meantime, here’s the code in case anyone wants to pick it up:
"""
Support for statistics for sensor values.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.statistics/
"""
import logging
import statistics
from datetime import timedelta
from collections import deque
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
CONF_NAME, CONF_ENTITY_ID, STATE_UNKNOWN, ATTR_UNIT_OF_MEASUREMENT)
from homeassistant.helpers.entity import Entity
_LOGGER = logging.getLogger(__name__)
ATTR_AVERAGE_CHANGE = 'average_change'
ATTR_CHANGE = 'change'
ATTR_COUNT = 'count'
ATTR_MAX_VALUE = 'max_value'
ATTR_MIN_VALUE = 'min_value'
ATTR_MEAN = 'mean'
ATTR_MEDIAN = 'median'
ATTR_VARIANCE = 'variance'
ATTR_STANDARD_DEVIATION = 'standard_deviation'
ATTR_SAMPLING_SIZE = 'sampling_size'
ATTR_TOTAL = 'total'
ATTR_SOURCE = 'source'
CONF_SAMPLING_SIZE = 'sampling_size'
DEFAULT_NAME = 'MyStats'
DEFAULT_SIZE = 20
SCAN_INTERVAL = timedelta(seconds=60)
ICON = 'mdi:calculator'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_ENTITY_ID): cv.entity_id,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_SAMPLING_SIZE, default=DEFAULT_SIZE):
vol.All(vol.Coerce(int), vol.Range(min=1))
})
# pylint: disable=unused-argument
async def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
entity_id = config.get(CONF_ENTITY_ID)
name = config.get(CONF_NAME)
sampling_size = config.get(CONF_SAMPLING_SIZE)
async_add_devices(
[MyStatisticsSensor(hass, entity_id, name, sampling_size)],
True)
class MyStatisticsSensor(Entity):
def __init__(self, hass, entity_id, name, sampling_size):
self._hass = hass
self._entity_id = entity_id
self.is_binary = True if self._entity_id.split('.')[0] == \
'binary_sensor' else False
if not self.is_binary:
self._name = '{} {}'.format(name, ATTR_MEAN)
else:
self._name = '{} {}'.format(name, ATTR_COUNT)
self._sampling_size = sampling_size
self._unit_of_measurement = None
self.states = deque(maxlen=self._sampling_size)
self.median = self.mean = self.variance = self.stdev = 0
self.min = self.max = self.total = self.count = 0
self.average_change = self.change = 0
def _add_state_to_queue(self, new_state):
try:
self.states.append(float(new_state.state))
self.count = self.count + 1
except ValueError:
self.count = self.count + 1
@property
def name(self):
return self._name
@property
def state(self):
return self.mean if not self.is_binary else self.count
@property
def unit_of_measurement(self):
return self._unit_of_measurement if not self.is_binary else None
@property
def device_state_attributes(self):
if not self.is_binary:
state = {
ATTR_MEAN: self.mean,
ATTR_COUNT: self.count,
ATTR_MAX_VALUE: self.max,
ATTR_MEDIAN: self.median,
ATTR_MIN_VALUE: self.min,
ATTR_SAMPLING_SIZE: self._sampling_size,
ATTR_STANDARD_DEVIATION: self.stdev,
ATTR_TOTAL: self.total,
ATTR_VARIANCE: self.variance,
ATTR_CHANGE: self.change,
ATTR_AVERAGE_CHANGE: self.average_change,
ATTR_SOURCE: self._entity_id,
}
return state
@property
def icon(self):
return ICON
async def async_update(self):
new_state = self._hass.states.get(self._entity_id)
if new_state is not None:
self._unit_of_measurement = new_state.attributes.get(
ATTR_UNIT_OF_MEASUREMENT)
self._add_state_to_queue(new_state)
if not self.is_binary:
try: # require only one data point
self.mean = round(statistics.mean(self.states), 2)
self.median = round(statistics.median(self.states), 2)
except statistics.StatisticsError as err:
_LOGGER.error(err)
self.mean = self.median = STATE_UNKNOWN
try: # require at least two data points
self.stdev = round(statistics.stdev(self.states), 2)
self.variance = round(statistics.variance(self.states), 2)
except statistics.StatisticsError as err:
_LOGGER.error(err)
self.stdev = self.variance = STATE_UNKNOWN
if self.states:
self.count = len(self.states)
self.total = round(sum(self.states), 2)
self.min = min(self.states)
self.max = max(self.states)
self.change = self.states[-1] - self.states[0]
self.average_change = self.change
if len(self.states) > 1:
self.average_change /= len(self.states) - 1
else:
self.min = self.max = self.total = STATE_UNKNOWN
self.average_change = self.change = STATE_UNKNOWN