Since the REST API is lacking, I decided to just manipulate the SQL state database directly.
This does make the code db dependent and it is potentially brittle if the database is locked or conflicted during writings, but since nothing else writes to this state variable, I think it should be “relatively” safe.
Here is my initial code that can read in either a CSV or SQLite file.
Enjoy!
#!/bin/sh
"exec" "$(dirname $(readlink -f $0))/venv/bin/python3" "$0" "$@"
#===============================================================================
# NOTE on python virtual environment setup and invocation
# Above shebang used to invoke venv relative to directory in which script is stored
#See: https://stackoverflow.com/questions/20095351/shebang-use-interpreter-relative-to-the-script-path)
# Alternatively, substitute below shebang line if all python libraries available in
# the system or if invoking script from within an already activated virtual environment:
# #!/usr/bin/env python3
#
# Note: I use a 'venv' to avoid having to add additional libraries to my system
#
# To set up 'venv'
# python3 -m venv --system-site-packages venv
# source venv/bin/activate
# pip3 install pip --upgrade
# pip3 install pandas paramiko tzlocal
# deactivate
#
#===============================================================================
# DESCRIPTION:
# Batch insert meter entries into the HA 'states' database table with
# 'last_updated_ts' time backdated to the time the entry was initially recorded
#
# The input data can be either a SQLite database or a CSV file.
# The data should be in two columns with one column containing the UTC timestamp
# of when data was recorded and the other column containing the meter reading.
# Extra columns are ignored
#
# Note if using a CSV file, the column names should be in the first row
#
# Note: Assumes you have ssh access with root privileges to the HA server
# I use the "Advanced SSH & Web Terminal" HA Add-on
#
#===============================================================================
# USAGE:
# add_water.py
#
# Key user variables (in all-caps)
# METER_DB_PATH File containing the meter data -- may be SQLite or CSV
# METER_TABLE Name of the data table if input file is SQLite
# METER_COLUMNS Names of the 2 columns containing data
# First is for UTC timestamp, second for the corresponding data value
# ENTITY_ID Name of the sensor for which you want to enter the data
# STOP_SHART_HA Stop HA before accessing HA db and restart after if set to True
# BACKUP_HA_DB Backup the HA db before accessing HA db if set to True
# DELETE_BACKUP_ON_SUCCESS Delete the backup if program completes successfully if set to True
# BACKUP_HA_DIR Name of the directory to store HA db backups if set to True
# Created if doesn't exist
# SSH_HOST Name or ip address of the HA server
# SSH_USER Username on the HA server
# SSH_PASSWD SSH password to access SSH_USER@SSH_HOST
# Not needed if using passphrases and ssh-agent
#
#===============================================================================
# VERSION: 0.5.0
#
#===============================================================================
# CHANGELOG
# 0.5.0 (July 2024)
# - First official release
#===============================================================================
# AUTHOR:
# Jeff Kosowsky
# Copyright July 2024
#
#===============================================================================
#### NOTES:
# States are inserted into the HA 'states' table using manual SQL manipulations
# of the 'home-assistant_v2.db' database since the HA UI and REST API
# don't allow for inserting backdated state entries
#
# This is potentially "fragile" since you are manipulating the database directly
# which could lead to lock or race conditions, though it should generally be petty
# safe since nothing else should be writing to that metadata_id/entity_id
#
# If you are concerned about fragility, do one or more of the following:
# 1. Set the variable 'STOP_START_HA' to 'True' which will stop home assistant
# before accessing the SQLite database and restart it at the end of the program
#
# 2. Backup just the 'home-assistant_v2.db' database either by setting 'BACKUP_HA_DB'
# to 'True or copying it manually yourself
# If using 'BACKUP_HA_DB', then setting 'DELETE_BACKUP_ON_SUCCESS' to 'True' will
# delete the backup if routine completes successfully
#
# 3. Create a full backup of HA using the US
#
# Note my SQlite METER_TABLE is called 'water' and is of form:
# CREATE TABLE IF NOT EXISTS TABLE_NAME (
# timestamp INTEGER PRIMARY KEY,
# consumption REAL,
# reading INTEGER
# )
# Note that only 'timestamp' and 'reading' are used since I want total readings entered
#
#===============================================================================
##IMPORTS
import os
import sys
import warnings
import uuid
import csv
import sqlite3
from datetime import datetime
from tzlocal import get_localzone
import pandas as pd
with warnings.catch_warnings(): #Otherwise get deprecation warnings on Ubuntu 18.04
warnings.filterwarnings("ignore")
import paramiko
#===============================================================================
#### GLOBAL VARIABLES (Note user-changeable variables are in all-caps
##Local meter data database variables (paths refer to paths on the local machine)
METER_TABLE = 'water' #Only needed for SQLite database inputs
METER_DB_PATH = 'water.db' #Name of local db used to store meter data
#METER_DB_PATH = 'water.csv' #Name of local db used to store meter data
#NOTE: Assumed to be relative to script directory if not an absolute path
if not METER_DB_PATH.startswith('/'):
METER_DB_PATH = os.getcwd() + '/' + METER_DB_PATH
meter_db = os.path.basename(METER_DB_PATH)
#First column should be the name of the time-stamp column, second column should be the name of the data column
METER_COLUMNS = ['timestamp', 'reading'] #Names of data columns
ha_state_columns = ['last_updated_ts', 'state'] #Don't change
## HomeAssistant variables (paths refer to paths on the remote HA machine)
ha_db_path = '/homeassistant/home-assistant_v2.db' #Path to 'home-assistant_v2.db' database
ENTITY_ID = f'sensor.meter_{METER_TABLE}' #Change this to name of sensor you use
STOP_START_HA = False #Stop and later restart HA if set to True
BACKUP_HA_DB = True #If True then backup 'home_assistant_v2.db' before editing database
DELETE_BACKUP_ON_SUCCESS = True #If True, then remove backup if routine completes successfully
BACKUP_HA_DIR = "/tmp/backup" #Note directory is created if it doesn't exist
## SSH credentials and settings (note no password required if using ssh-agent)
SSH_HOST = 'homeassistant'
SSH_USER = '<YOUR_USER_NAME' #Change to your user name on the HA server
SSH_PASSWD = '<YOUR_PASSWORD>' #Not necessary if using ssh passphrases and ssh user-agent
# Debugging variables
#For debugging, allow all rows & columns to print
#pd.set_option('display.max_rows', None) # None means unlimited rows #DEBUG
#pd.set_option('display.max_columns', None) # None means unlimited columns #DEBUG
#pd.set_option('display.width', 180) # Adjust to your terminal width #DEBUG
#pd.set_option('display.max_colwidth', None) #DEBUG
#===============================================================================
#### MAIN
def main():
localtz = get_localzone()
try:
#Create ssh connection to HA
ssh = ssh_connect(SSH_HOST, SSH_USER, SSH_PASSWD)
restart_ha = False
if STOP_START_HA is True and ssh_ha_running(ssh):
restart_ha = True
ssh_ha_stop(ssh)
command = f"SELECT metadata_id FROM states_meta WHERE entity_id = '{ENTITY_ID}' LIMIT 1"
metadata_id = int(ssh_execute_sql_first(ssh, command))
# print(metadata_id) #DEBUG
#Retrieve time stamp of last valid HA state (i.e., unknown or unavailable)
command = f"SELECT MAX(last_updated_ts) FROM states WHERE metadata_id = {metadata_id} AND state != 'unknown' AND state != 'unavailable'"
last_ts_ha = ssh_execute_sql_first(ssh, command)
# print(last_ts_ha) #DEBUG
#Retrieve last valid state value (or "" if no valid states exist)
if last_ts_ha != "":
command = f"SELECT state FROM states WHERE metadata_id = {metadata_id} AND last_updated_ts == {last_ts_ha} LIMIT 1"
last_state_ha = ssh_execute_sql_first(ssh, command)
if last_state_ha != "":
last_state_ha = float(last_state_ha)
else: #No valid states
last_state_ha = ""
# print(last_state_ha) #DEBUG
#Retrieve last attributes_id for sensor
if last_state_ha != "": #If valid state exists, retrieve its attributes_id
command = f"SELECT attributes_id FROM states WHERE metadata_id = {metadata_id} AND last_updated_ts == {last_ts_ha} LIMIT 1" #Find attribute_id of last valid state
else: #Find last attributes_id even if no valid states exist
command = f"SELECT attributes_id FROM states WHERE metadata_id = {metadata_id} ORDER BY state_id DESC LIMIT 1"
attributes_id = ssh_execute_sql_first(ssh, command)
if attributes_id == "": #No attribute_id found!
raise Exception(f"Can't find existing 'attributes_id' for '{ENTITY_ID}'")
# print(attributes_id) #DEBUG
#Retrieve state_id of last sensor entry (whether valid or not) -- used to populate old_state_id which point to last sensor entry whether valid or not
# command = f"SELECT state FROM states WHERE metadata_id = {metadata_id} ORDER BY state_id DESC LIMIT 1"
old_state_id = ssh_execute_sql_first(ssh, command)
# print(old_state_id) #DEBUG
last_ts_ha = float(last_ts_ha or 0)
#Ingest meter data Panda DataFrame with columns: ha_state_columns = ['last_updated_ts', 'state']
db_type = database_type(METER_DB_PATH)
if db_type == 'sqlitedb':
df = read_sql(METER_DB_PATH, last_ts_ha)
elif db_type == 'csv':
df = read_csv(METER_DB_PATH, last_ts_ha)
else:
raise Exception(f"Error: [{METER_DB_PATH}] Database type must be 'SQLite' or 'CSV'")
if df is None:
print("Nothing to do")
return
df = df.sort_values(by='last_updated_ts').reset_index(drop=True) #Generally should already be sorted (at least if reading in a SQLite db)
if df['last_updated_ts'].duplicated().any(): #This shouldn't happen since 'timestamps' is a primary KEY
raise Exception(f"'{METER_TABLE}' table in {meter_db} has duplicate timestamps")
#Comment out following 2 line if you want to include entries where 'reading' is unchanged (i.e. no new consumption)
df = df[df['state'] != df['state'].shift()] #Eliminate where next row has same state as previous
if df.iloc[0]['state'] == last_state_ha:
df = df.iloc[1:].reset_index(drop=True) #Drop first line if equal to last saved state
entries_posted = 0
#Add remaining (non-blank) rows of the 'states' HA table to the DataFrame
if not df.empty: #Rows to add
df['attributes_id'] = attributes_id
df['origin_idx'] = 0 #Always 0 it seems
df['context_id_bin'] = [uuid.uuid4().bytes for _ in range(len(df))]
df['metadata_id'] = metadata_id
df['last_reported_ts'] = df['last_updated_ts'] #Always equal it seems
df['old_state_id'] = old_state_id #Initially set them to the last valid saved HA state_id, we will later update as they are added to HA db
#Note: other states are 'blank', other than the index 'state_id' which gets added automatically when row inserted into SQL
# print(df) #DEBUG
df_columns = df.columns.tolist()
# print(df_columns) #DEBUG
if BACKUP_HA_DB is True: #Backup HA database
ssh_ha_execute_cmd_error(ssh, f'mkdir -p {BACKUP_HA_DIR}', f"Couldn't create backup dir on HA: {BACKUP_HA_DIR}")
backup_path = BACKUP_HA_DIR + '/' + os.path.basename(ha_db_path) + '-' + datetime.now().strftime("%Y%m%d.%H%M%S")
backup_path = backup_path.replace('//', '/')
# print(backup_path) #DEBUG
ssh_ha_execute_cmd_error(ssh, f'cp -f {ha_db_path} {backup_path}', f"Couldn't copy '{ha_db_path}' to '{backup_path}")
last_state_old = last_state_ha
last_ts_old = last_ts_ha
for index, row in df.iterrows(): #Insert rows into SQL one-by-one
# if index > 50: break #DEBUG: Look through 50 rows and then stop
if last_state_old != "" and float(row['state']) < float(last_state_old): #Warn on negative consumption
olddate = datetime.fromtimestamp(last_ts_old).replace(tzinfo=localtz).strftime('%m/%d/%y %H:%M:%S')
currdate = datetime.fromtimestamp(row['last_updated_ts']).replace(tzinfo=localtz).strftime('%m/%d/%y %H:%M:%S')
print(f"Warning - Negative consumption: {last_state_old} ({olddate}) -> {row['state']} ({currdate}) [{row['state'] - last_state_old}]", file=sys.stderr)
if index > 0: #Set 'old_state_id' to previous 'state_id' (Note: the first 'old_state_id' is set already since we initialize df['old_state_id'] = old_state_id
#Assign old_state_id = state_id at time last_ts_old
command = f"SELECT state_id FROM states WHERE metadata_id = {metadata_id} AND last_updated_ts == {last_ts_old} LIMIT 1"
row['old_state_id'] = float(ssh_execute_sql_first(ssh, command))
# print(f"{index}: {row['old_state_id']}") #DEBUG
last_state_old = row['state']
last_ts_old = row['last_updated_ts']
# Prepare values for SQL command with single quotes around each value
values = []
for value in row.tolist():
if isinstance(value, str):
values.append(f"'{value}'") #Single quote strings
elif isinstance(value, bytes): # Handle BLOB data (bytes)
values.append(f"x'{value.hex()}'") # SQLite BLOB literal format (convert to hex and write as hex string x'<hex_string>'
else:
values.append(str(value)) #No quotes if not string or BLOB
# Construct SQL command dynamically by joining df_columns and values, respectively
command = f"INSERT OR IGNORE INTO states " \
f"({', '.join(df_columns)}) " \
f"VALUES ({', '.join(values)})"
# print(command) #DEBUG
ssh_execute_sql(ssh, command) #Insert row
entries_posted += 1
if DELETE_BACKUP_ON_SUCCESS is True:
try:
ssh_ha_execute_cmd_error(ssh, f'rm -f {backup_path}', f"Couldn't delete temporary backup: {backup_path}")
except Exception as e:
print(e, file=sys.stderr)
#End of stanza: if not df.empty: #Rows to add
print(f"{entries_posted} new state entries for '{ENTITY_ID}' in: {ha_db_path}")
except Exception as e:
print(f'Error: {e}', file=sys.stderr)
finally: #Cleanup
if restart_ha is True:
print("Restarting HA...", file=sys.stderr)
try:
ssh_ha_start(ssh)
except Exception as e:
print(f'Error: {e}', file=sys.stderr)
#===============================================================================
#### FUNCTIONS
def ssh_connect(ssh_host, ssh_user, ssh_passwd):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ssh_host, username=ssh_user, password=ssh_passwd)
return ssh
def ssh_ha_running(ssh):
stdin, stdout, stderr = ssh.exec_command('SUPERVISOR_TOKEN=$(sudo cut -d= -f2 /data/.ssh/environment) ha core status')
if stdout.channel.recv_exit_status() == 0:
return True
return False
def ssh_ha_stop(ssh):
stdin, stdout, stderr = ssh.exec_command('SUPERVISOR_TOKEN=$(sudo cut -d= -f2 /data/.ssh/environment) ha core stop')
if stdout.channel.recv_exit_status() != 0:
raise Exception(f"[ssh_ha_stop] Couldn't stop HA...")
return True
def ssh_ha_start(ssh):
stdin, stdout, stderr = ssh.exec_command('SUPERVISOR_TOKEN=$(sudo cut -d= -f2 /data/.ssh/environment) ha core start')
if stdout.channel.recv_exit_status() != 0:
raise Exception(f"[ssh_ha_start] Couldn't start HA...")
return True
def ssh_ha_execute_cmd_error(ssh, command, error_msg):
stdin, stdout, stderr = ssh.exec_command(command)
if stdout.channel.recv_exit_status() != 0:
raise Exception(f'Error: {error_msg}')
def ssh_ha_execute_cmd_sudo_error(ssh, command, error_msg):
stdin, stdout, stderr = ssh.exec_command(f'sudo {command}')
if stdout.channel.recv_exit_status() != 0:
raise Exception(f'Error: {error_msg}')
def ssh_execute_sql(ssh, command):
stdin, stdout, stderr = ssh.exec_command(f'sudo sqlite3 {ha_db_path} "{command}"') #NOTE: 'sudo' since must be root to write HA db
error = "".join(stderr).strip()
if error:
raise Exception(f'[ssh_execute_sql_string] {error}')
return stdout
def ssh_execute_sql_first(ssh, command): #Return first row as string
stdout = ssh_execute_sql(ssh, command)
return stdout.readline().strip()
def ssh_execute_sql_string(ssh, command): #Join as string similar to how sqlite3 would display from command line
stdout = ssh_execute_sql(ssh, command)
return "".join(stdout).strip()
def ssh_execute_sql_list(ssh, command): #Return as list with one entry of tuplets per fow
stdout = ssh_execute_sql(ssh, command)
return [line.strip() for line in stdout]
def ssh_close(ssh):
ssh.close()
def read_sql(meter_db_path, last_ts_ha):
conn = sqlite3.connect(meter_db_path)
cursor = conn.cursor()
#Retrieve last timestamp for meter db
cursor.execute(f"SELECT MAX(timestamp) FROM {METER_TABLE}")
last_ts_meter = cursor.fetchone()[0]
#print(last_ts_meter) #DEBUG
if last_ts_meter is None:
raise Exception(f"[connect_sql] Can't retrieve {METER_COLUMNS[0]} from '{METER_TABLE}' table in '{METER_DB_PATH}'") #METER_COLUMNS[0] = timestamp
if last_ts_ha >= last_ts_meter: #No new data
return None
cursor.execute(f"SELECT {METER_COLUMNS[0]}, {METER_COLUMNS[1]} FROM {METER_TABLE} WHERE timestamp > {last_ts_ha} ORDER BY timestamp ASC")
df = pd.DataFrame(cursor.fetchall(), columns=ha_state_columns)
conn.close()
return df
def read_csv(file_name, last_ts_ha): #Assume first column is utc 'Timestamp', second column is 'State'
#Timestamps are seconds since the epoch - 1/1/1970 00:00:00 UTC
try:
df = pd.read_csv(
file_name,
usecols=METER_COLUMNS,
na_values=['N/A', ''] # Treat 'N/A' and blanks as NaN
)
df.rename(columns=dict(zip(METER_COLUMNS, ha_state_columns)), inplace=True) #Renme columns to corresponding HA 'state' table names
except Exception as e:
raise Exception(f"Can't read CSV file '{file_name}': {e}")
if not df.apply(pd.to_numeric, errors='coerce').notna().all().all():
raise ValueError(f"Non-numeric entries found in the CSV file '{file_name}'")
if (df['last_updated_ts'] <= 0).any():
raise ValueError(f"Timestamps in '{file_name}' must be greater than zero")
if last_ts_ha >= df['last_updated_ts'].max(): #No new data
return None
df = df[df['last_updated_ts'] > last_ts_ha]
return df
def database_type(file_path):
if not os.path.isfile(file_path):
raise Exception(f"'{file_path}' does not exist")
# Check if the file is an SQLite database
try:
with sqlite3.connect(f'file:{file_path}?mode=ro', uri=True) as conn:
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
if cursor.fetchall():
return 'sqlitedb'
except sqlite3.Error:
pass # Silently pass if it's not an SQLite database
# Check if the file is a CSV
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
try:
dialect = csv.Sniffer().sniff(f.read(1024))
f.seek(0)
reader = csv.reader(f, dialect)
for row in reader:
pass # If we can read rows, it's likely a CSV file
return 'csv'
except csv.Error:
pass # Silently pass if it's not a CSV file
except Exception as e:
print(f"Error reading as CSV: {e}")
return None #Neither
#===============================================================================
if __name__ == "__main__":
main()
# Local Variables:
# mode: Python;
# tab-width: 2
# End: