I’ve not been able to reproduce the problem of big jumps in the integration.
What I can share now is how my wife and I fixed the broken data. The following Python script needs to be run inside the “config” directory with the home-assistant_v2.db
. It has to be edited to suit the local configuration (list of sensors) and the time of the incidence.
Fixing the data wasn’t a big priority, so when we finally started to address this in earnest, the broken data had made it all the way back into the long-term statistics. All data in the short-term statistics and current states were after the jump.
Therefore the script determines the undesirable delta based on the long-term statistics. For more recent incidences where the long-term statistics have not been updated yet, further changes are needed. I am not sure whether this is needed, I don’t have such a database to test with. Perhaps long-term statistics get created immediately and will always have the delta?
#!/bin/env python3
import sqlite3
import datetime
import json
# This is the list of integration sensors with bad data.
sensors = [
"sensor.energy_from_grid",
"sensor.energy_to_grid",
"sensor.solar_energy",
"sensor.battery_charging_energy",
"sensor.battery_discharging_energy",
"sensor.other_energy",
"sensor.house_consumption_energy",
"sensor.svenja_energy",
"sensor.merle_energy",
"sensor.bad_energy",
"sensor.dach_energy",
"sensor.schlafzimmer_energy",
"sensor.luftraum_energy",
"sensor.sat_energy",
"sensor.technikraum_energy",
"sensor.batterie_energieverbrauch",
# "sensor.wohnzimmer_verbrauch",
# "sensor.dunstabzug_verbrauch",
# "sensor.garten_verbrauch"
]
# The date and time when HomeAssistant was offline, in UTC. Data prior to this
# is assumed to be correct and data after it will get corrected.
T = datetime.datetime.fromisoformat("2024-09-14T11:30").timestamp()
print(f"Checking for changes around {T}.")
# Only sensors whose change exceeds this threshold in percent get updated.
# It does not matter whether they were increased or decreased.
threshold_percent = 5
# This script must be run in the "config" directory which contains the SQLite
# database.
conn = sqlite3.connect('home-assistant_v2.db')
cursor = conn.cursor()
def fetchone(msg):
result = cursor.fetchone()
if result:
return result[0]
else:
print(msg)
exit(1)
# SQL queries are composed using string formatting because that allows
# replacing e.g. the table name. SQL placeholders are only support
# where values are expected.
# What was the last correct value, based on "statistics" or "statistics_short_term"?
query_Wpre = '''
SELECT state
FROM %(table)s
WHERE metadata_id = %(id)d AND start_ts < %(ts)f AND state IS NOT NULL
ORDER BY created_ts DESC
LIMIT 1;
'''
# Same for first incorrect value.
query_Wpost = '''
SELECT state
FROM %(table)s
WHERE metadata_id = %(id)d AND start_ts >= %(ts)f AND state IS NOT NULL
ORDER BY created_ts ASC
LIMIT 1;
'''
# Which metadata_id corresponds to a sensor ID?
#
# state and statistics tables use different metadata_ids
# and both have their own _meta table with the mapping.
query_id = {
"states": '''
SELECT metadata_id
FROM states_meta
WHERE entity_id = "%(sensor)s"
LIMIT 1;
''',
"statistics": '''
SELECT id
FROM statistics_meta
WHERE statistic_id = "%(sensor)s"
LIMIT 1;
''',
}
# Query for the scale factor.
query_unit = '''
SELECT statistic_id, unit_of_measurement
FROM statistics_meta;
'''
# Update all tables by substracting a delta.
query_update_stats = '''
UPDATE %(table)s
SET %(column)s = %(column)s - %(delta)f
WHERE metadata_id = %(id)d AND created_ts >= %(ts)f AND state IS NOT NULL;
'''
query_update = {
"states": {
"state": '''
UPDATE states
SET state = CAST((CAST(state AS REAL) - %(scale)d * %(delta)f) AS TEXT)
WHERE metadata_id = %(id)d AND last_updated_ts >= %(ts)f AND state IS NOT NULL AND state != "unavailable";
''',
},
"statistics": {
"state": query_update_stats,
"sum": query_update_stats,
},
"statistics_short_term": {
"state": query_update_stats,
"sum": query_update_stats,
},
}
deltas = {}
for sensor in sensors:
print()
ids = {}
for table in ("states", "statistics"):
cursor.execute(query_id[table] % {"sensor": sensor})
ids[table] = fetchone(f"{table}: no ID found for {sensor}")
print(f"{sensor} IDs: {ids}")
# Same in both tables.
ids["statistics_short_term"] = ids["statistics"]
# Some statistics are recorded as Wh, others as kWh. In both cases,
# states are in Wh. Not sure why. The implication is that a delta
# determined below based on statistics must get scaled when substracting
# it from states.
#
# The "unit_of_measurement" in statistics_meta apparently can be used
# to determine that scale factor.
cursor.execute(query_unit)
scale = {}
for id, unit in cursor.fetchall():
factor = 1
if unit.startswith("k"):
factor = 1000
elif unit.startswith("M"):
factor = 1000000
scale[id] = factor
# This code assumes that the time is so far in the past that the jump
# has already been recorded in the long-term statistics and can no
# longer be found in the short-term statistics or the states.
#
# This needs to be updated to use the other tables when the jump
# occurred more recently.
if True:
table = "statistics"
cursor.execute(query_Wpre % {"table": table, "id": ids[table], "ts": T})
pre = fetchone(f"{sensor} has no old value.")
cursor.execute(query_Wpost % {"table": table, "id": ids[table], "ts": T})
post = fetchone(f"{sensor} has no old value.")
delta = post - pre
percent = int(abs((post - pre) * 100 / pre))
print(f"{sensor} delta: {post} - {pre} = {delta} ({percent}%)")
if percent < threshold_percent:
print(f"Not updating {sensor}.")
continue
print(f"Updating {sensor}.")
deltas[sensor] = scale[sensor] * delta
for table, columns in query_update.items():
for column, query in columns.items():
query = query % {"table": table, "column": column, "id": ids[table], "ts": T, "scale": scale[sensor], "delta": delta}
print(query)
cursor.execute(query)
# Commit only after all updates succeeded.
conn.commit()
conn.close()
# Update the integration state.
restore_state_file = ".storage/core.restore_state"
with open(restore_state_file) as input:
data = json.load(input)
for object in data["data"]:
delta = deltas.get(object["state"]["entity_id"], None)
if delta is not None:
state = object["state"]["state"]
if state != "unknown":
object["state"]["state"] = str(float(state) - delta)
object["extra_data"]["native_value"]["decimal_str"] = str(float(object["extra_data"]["native_value"]["decimal_str"]) - delta)
object["extra_data"]["last_valid_state"] = str(float(object["extra_data"]["last_valid_state"]) - delta)
with open(restore_state_file, "w") as output:
json.dump(data, output)
# sqlbrowser or any other sqlite client can be used to verify changes to the data:
#
# - select DATETIME(start_ts, 'unixepoch') AS time , * from statistics where metadata_id = 113 AND start_ts > unixepoch("2024-09-14T06:00")
# - select DATETIME(start_ts, 'unixepoch') AS time , * from statistics_short_term where metadata_id = 113 AND start_ts > unixepoch("2024-09-14T06:00")
# - select DATETIME(last_updated_ts, 'unixepoch') AS time , * from states where metadata_id = 75 AND last_updated_ts > unixepoch("2024-09-14T06:00")
#
# This seems to print time in local time, though, not UTC.