Enter delayed (aka backdated) MQTT data asynchronously based on attached timestamp

My water meter sends hourly consumption data to a cloud website once a day.
I can in turn scrape that website every day and get the hourly usage day.

I would like to use MQTT to publish the hourly data to Home Assistant.
If I just do that “normally” then all the packets will arrive in a bunch, delayed by multiple hours.

Can I attach a timestamp to each MQTT packet so that when the corresponding state is entered in the states database, the ‘last_updated_ts’ and where appropriate ‘last_changed_ts’ are set based on the timestamp of when the data was recorded rather than when it arrives on the MQTT server.

Alternatively, can I do this with the REST API?

Have a read of this: Is there any way to send backdated sensor data to HA?

Thanks.
From reading the thread, it seems like there is no good API method that works reliably and that the recommendation is just to manually insert the new rows into the home-assistant_v2.db SQL database – which I can do easily.

However:

  1. I fear that could be fragile so long as HA is running and also potentially writing to the DB
  2. Not sure how to update the corresponding statistics in the statistics and statistics_short_term tables
    (of course, since my water data is hourly, it wouldn’t technically be difficult to create my own 5 minute and hourly statistics – but again that is kludgy/fragile and doesn’t generalize to cases where data has more general arrival patterns)

I would think that this would be a common enough use case – i.e. where people need to batch enter and/or backdate sensor entries.
But it doesn’t seem that there are well-structured or documented methods for doing so…

The devs are aware the need exists. I haven’t heard of any actual move to implement anything though.

There’s also an open feature request you can vote for: Add historic data to Energy dashboard

I am thinking for now that the easiest way may be to manually create/publish a MQTT device & entity.

Technically I had previously created a MQTT device that I named “Meter” that has 2 entities called “Gas” and “Electric”. I then publish (real-time) MQTT updates from my meters to feed these 2 devices.

I can then create a new “fake” MQTT entity called say “Water” for that device but not feed it any MQTT messages.

Instead, I will manually update the SQL state database to feed it state data.
Regarding statistics, there seems to be some code out there that allows one to feed batch csv data which I will look at and modify as needed.

Current proposed approach:

  1. Run a script regularly (using cron) to scrape the water provider website and add new entries to a separate, local SQLite database, e.g., water.db

  2. Manually create a water sensor in HA - I use MQTT since I want it to be tied to my another energy sensors (e.g., gas, electricity) as a single energy device. This sensor won’t actually be updated vi MQTT

  3. Send REST API to exclude the water sensor from recorder (this way statistics it for it won’t be messed up)

  4. Use the python REST API to batch add new entries for the water sensor. Read from local water.db and only add values with timestamps newer than last entry in the HA sensors table.
    Add an attribute entry called 'timestamp` when posting. (timestamp should be in UTC)

  5. When all new entries are posted, run SQL on any state entries for the sensor whose corresponding attribute has a ‘timestamp’ attribute entry and:
    *Update the last_updated.ts and last_changed.ts fields for the sensor state enty with the timestamp value from its corresponding attribute

    • Replace the attribute with an existing attribute for the same sensor that has no timestamp field
    • Delete the now unused attribute row with the timestamp
      NOTE these SQL calls should be relatively safe since HA doesn’t generally change any of these entries
  6. Update/upload statistics and short_term_statistics table entries for the statistic (not sure yet how best to do that)

  7. Send REST API to re-include the water sensor in recorder

Since the REST API is lacking, I decided to just manipulate the SQL state database directly.
This does make the code db dependent and it is potentially brittle if the database is locked or conflicted during writings, but since nothing else writes to this state variable, I think it should be “relatively” safe.

Here is my initial code that can read in either a CSV or SQLite file.
Enjoy!

#!/bin/sh
"exec" "$(dirname $(readlink -f $0))/venv/bin/python3" "$0" "$@"
#===============================================================================
# NOTE on python virtual environment setup and invocation
# Above shebang used to invoke venv relative to directory in which script is stored
#See: https://stackoverflow.com/questions/20095351/shebang-use-interpreter-relative-to-the-script-path)

# Alternatively, substitute below shebang line if all python libraries available in
# the system or if invoking script from within an already activated virtual environment:
#   #!/usr/bin/env python3
#
# Note: I use a 'venv' to avoid having to add additional libraries to my system
#
# To set up 'venv'
#   python3 -m venv --system-site-packages venv
#   source venv/bin/activate
#   pip3 install pip --upgrade
#   pip3 install pandas paramiko tzlocal
#   deactivate
#
#===============================================================================
# DESCRIPTION:
# Batch insert meter entries into the HA 'states' database table with
# 'last_updated_ts' time backdated to the time the entry was initially recorded
#
# The input data can be either a SQLite database or a CSV file.
# The data should be in two columns with one column containing the UTC timestamp
# of when data was recorded and the other column containing the meter reading.
# Extra columns are ignored
#
# Note if using a CSV file, the column names should be in the first row
#
# Note: Assumes you have ssh access with root privileges to the HA server
#       I use the "Advanced SSH & Web Terminal" HA Add-on
#
#===============================================================================
# USAGE:
# add_water.py
#
# Key user variables (in all-caps)
#    METER_DB_PATH    File containing the meter data -- may be SQLite or CSV
#    METER_TABLE      Name of the data table if input file is SQLite
#    METER_COLUMNS    Names of the 2 columns containing data
#                     First is for UTC timestamp, second for the corresponding data value
#    ENTITY_ID        Name of the sensor for which you want to enter the data
#    STOP_SHART_HA    Stop HA before accessing HA db and restart after if set to True
#    BACKUP_HA_DB     Backup the HA db before accessing HA db if set to True
#    DELETE_BACKUP_ON_SUCCESS  Delete the backup if program completes successfully if set to True
#    BACKUP_HA_DIR    Name of the directory to store HA db backups if set to True
#                     Created if doesn't exist
#    SSH_HOST         Name or ip address of the HA server
#    SSH_USER         Username on the HA server
#    SSH_PASSWD       SSH password to access SSH_USER@SSH_HOST
#                     Not needed if using passphrases and ssh-agent
#
#===============================================================================
# VERSION: 0.5.0
#
#===============================================================================
# CHANGELOG
#     0.5.0 (July 2024)
#       - First official release
#===============================================================================
# AUTHOR:
#    Jeff Kosowsky
#    Copyright July 2024
#
#===============================================================================
#### NOTES:
# States are inserted into the HA 'states' table using  manual SQL manipulations
# of the 'home-assistant_v2.db' database since the HA UI and REST API
# don't allow for inserting backdated state entries
#
# This is potentially "fragile" since you are manipulating the database directly
# which could lead to lock or race conditions, though it should generally be petty
# safe since nothing else should be writing to that metadata_id/entity_id
#
# If you are concerned about fragility, do one or more of the following:
# 1. Set the variable 'STOP_START_HA' to 'True' which will stop home assistant
#    before accessing the SQLite database and restart it at the end of the program
#
# 2. Backup just the 'home-assistant_v2.db' database either by setting 'BACKUP_HA_DB'
#    to 'True or copying it manually yourself
#    If using 'BACKUP_HA_DB', then setting 'DELETE_BACKUP_ON_SUCCESS' to 'True' will
#    delete the backup if routine completes successfully
#
# 3. Create a full backup of HA using the US
#
# Note my SQlite METER_TABLE is called 'water' and is of form:
#    CREATE TABLE IF NOT EXISTS TABLE_NAME (
#    timestamp INTEGER PRIMARY KEY,
#    consumption REAL,
#    reading INTEGER
#    )
# Note that only 'timestamp' and 'reading' are used since I want total readings entered
#
#===============================================================================
##IMPORTS
import os
import sys
import warnings

import uuid
import csv
import sqlite3

from datetime import datetime
from tzlocal import get_localzone

import pandas as pd

with warnings.catch_warnings(): #Otherwise get deprecation warnings on Ubuntu 18.04
    warnings.filterwarnings("ignore")
    import paramiko

#===============================================================================
#### GLOBAL VARIABLES (Note user-changeable variables are in all-caps

##Local meter data database variables (paths refer to paths on the local machine)
METER_TABLE = 'water' #Only needed for SQLite database inputs
METER_DB_PATH = 'water.db' #Name of local db used to store meter data
#METER_DB_PATH = 'water.csv' #Name of local db used to store meter data

#NOTE: Assumed to be relative to script directory if not an absolute path
if not METER_DB_PATH.startswith('/'):
    METER_DB_PATH = os.getcwd() + '/' + METER_DB_PATH
meter_db = os.path.basename(METER_DB_PATH)

#First column should be the name of the time-stamp column, second column should be the name of the data column
METER_COLUMNS = ['timestamp', 'reading'] #Names of data columns
ha_state_columns = ['last_updated_ts', 'state'] #Don't change

## HomeAssistant variables (paths refer to paths on the remote HA machine)
ha_db_path = '/homeassistant/home-assistant_v2.db' #Path to 'home-assistant_v2.db' database

ENTITY_ID = f'sensor.meter_{METER_TABLE}' #Change this to name of sensor you use

STOP_START_HA = False #Stop and later restart HA if set to True

BACKUP_HA_DB = True   #If True then backup 'home_assistant_v2.db' before editing database
DELETE_BACKUP_ON_SUCCESS = True #If True, then remove backup if routine completes successfully
BACKUP_HA_DIR = "/tmp/backup" #Note directory is created if it doesn't exist

## SSH credentials and settings (note no password required if using ssh-agent)
SSH_HOST = 'homeassistant'
SSH_USER = '<YOUR_USER_NAME' #Change to your user name on the HA server
SSH_PASSWD = '<YOUR_PASSWORD>' #Not necessary if using ssh passphrases and ssh user-agent

# Debugging variables
#For debugging, allow all rows & columns to print
#pd.set_option('display.max_rows', None)  # None means unlimited rows #DEBUG
#pd.set_option('display.max_columns', None)  # None means unlimited columns #DEBUG
#pd.set_option('display.width', 180)  # Adjust to your terminal width #DEBUG
#pd.set_option('display.max_colwidth', None) #DEBUG

#===============================================================================
#### MAIN
def main():
    localtz = get_localzone()

    try:
        #Create ssh connection to HA
        ssh = ssh_connect(SSH_HOST, SSH_USER, SSH_PASSWD)

        restart_ha = False
        if STOP_START_HA is True and ssh_ha_running(ssh):
            restart_ha = True
            ssh_ha_stop(ssh)

        command = f"SELECT metadata_id FROM states_meta WHERE entity_id = '{ENTITY_ID}' LIMIT 1"
        metadata_id = int(ssh_execute_sql_first(ssh, command))
#        print(metadata_id) #DEBUG

        #Retrieve  time stamp of last valid HA state (i.e., unknown or unavailable)
        command = f"SELECT MAX(last_updated_ts) FROM states WHERE metadata_id = {metadata_id} AND state != 'unknown' AND state != 'unavailable'"
        last_ts_ha = ssh_execute_sql_first(ssh, command)
#        print(last_ts_ha) #DEBUG

        #Retrieve last valid state value (or "" if no valid states exist)
        if last_ts_ha != "":
            command = f"SELECT state FROM states WHERE metadata_id = {metadata_id} AND last_updated_ts == {last_ts_ha} LIMIT 1"
            last_state_ha = ssh_execute_sql_first(ssh, command)
            if last_state_ha != "":
                last_state_ha = float(last_state_ha)
        else: #No valid states
            last_state_ha = ""

#        print(last_state_ha) #DEBUG

        #Retrieve last attributes_id for sensor
        if last_state_ha != "": #If valid state exists, retrieve its  attributes_id
            command = f"SELECT attributes_id FROM states WHERE metadata_id = {metadata_id} AND last_updated_ts == {last_ts_ha} LIMIT 1" #Find attribute_id of last valid state
        else: #Find last attributes_id even if no valid states exist
            command = f"SELECT attributes_id FROM states WHERE metadata_id = {metadata_id} ORDER BY state_id DESC LIMIT 1"
        attributes_id = ssh_execute_sql_first(ssh, command)

        if attributes_id == "": #No attribute_id found!
            raise Exception(f"Can't find existing 'attributes_id' for '{ENTITY_ID}'")
#        print(attributes_id) #DEBUG

        #Retrieve state_id of last sensor entry (whether valid or not) -- used to populate old_state_id which point to last sensor entry whether valid or not
#        command = f"SELECT state FROM states WHERE metadata_id = {metadata_id} ORDER BY state_id DESC LIMIT 1"
        old_state_id = ssh_execute_sql_first(ssh, command)
#        print(old_state_id) #DEBUG

        last_ts_ha = float(last_ts_ha or 0)
        #Ingest meter data Panda DataFrame with columns: ha_state_columns = ['last_updated_ts', 'state']
        db_type = database_type(METER_DB_PATH)

        if db_type == 'sqlitedb':
            df = read_sql(METER_DB_PATH, last_ts_ha)
        elif db_type == 'csv':
            df = read_csv(METER_DB_PATH, last_ts_ha)
        else:
            raise Exception(f"Error: [{METER_DB_PATH}] Database type must be 'SQLite' or 'CSV'")

        if df is None:
            print("Nothing to do")
            return

        df = df.sort_values(by='last_updated_ts').reset_index(drop=True) #Generally should already be sorted (at least if reading in a SQLite db)
        if df['last_updated_ts'].duplicated().any(): #This shouldn't happen since 'timestamps' is a primary KEY
            raise Exception(f"'{METER_TABLE}' table in {meter_db} has duplicate timestamps")

        #Comment out following 2 line if you want to include entries where 'reading' is unchanged (i.e. no new consumption)
        df = df[df['state'] != df['state'].shift()] #Eliminate where next row has same state as previous
        if df.iloc[0]['state'] == last_state_ha:
            df = df.iloc[1:].reset_index(drop=True) #Drop first line if equal to last saved state

        entries_posted = 0
        #Add remaining (non-blank) rows of the 'states' HA table to the DataFrame
        if not df.empty: #Rows to add
            df['attributes_id'] = attributes_id
            df['origin_idx'] = 0 #Always 0 it seems
            df['context_id_bin'] = [uuid.uuid4().bytes for _ in range(len(df))]
            df['metadata_id'] = metadata_id
            df['last_reported_ts'] = df['last_updated_ts'] #Always equal it seems
            df['old_state_id'] = old_state_id #Initially set them to the last valid saved HA state_id, we will later update as they are added to HA db
            #Note: other states are 'blank', other than the index 'state_id' which gets added automatically when row inserted into SQL
#            print(df) #DEBUG

            df_columns = df.columns.tolist()
#            print(df_columns) #DEBUG

            if BACKUP_HA_DB is True: #Backup HA database
                ssh_ha_execute_cmd_error(ssh, f'mkdir -p {BACKUP_HA_DIR}', f"Couldn't create backup dir on HA: {BACKUP_HA_DIR}")
                backup_path = BACKUP_HA_DIR + '/' + os.path.basename(ha_db_path) + '-' + datetime.now().strftime("%Y%m%d.%H%M%S")
                backup_path = backup_path.replace('//', '/')
#                print(backup_path) #DEBUG
                ssh_ha_execute_cmd_error(ssh, f'cp -f {ha_db_path} {backup_path}', f"Couldn't copy '{ha_db_path}' to '{backup_path}")

            last_state_old = last_state_ha
            last_ts_old = last_ts_ha
            for index, row in df.iterrows(): #Insert rows into SQL one-by-one
#                if index > 50: break #DEBUG: Look through 50 rows and then stop

                if last_state_old != "" and float(row['state']) < float(last_state_old): #Warn on negative consumption
                    olddate = datetime.fromtimestamp(last_ts_old).replace(tzinfo=localtz).strftime('%m/%d/%y %H:%M:%S')
                    currdate = datetime.fromtimestamp(row['last_updated_ts']).replace(tzinfo=localtz).strftime('%m/%d/%y %H:%M:%S')
                    print(f"Warning - Negative consumption: {last_state_old} ({olddate}) -> {row['state']} ({currdate}) [{row['state'] - last_state_old}]", file=sys.stderr)

                if index > 0: #Set 'old_state_id' to previous 'state_id' (Note: the first 'old_state_id' is set already since we initialize df['old_state_id'] = old_state_id
                    #Assign old_state_id = state_id  at time last_ts_old
                    command = f"SELECT state_id FROM states WHERE metadata_id = {metadata_id} AND last_updated_ts == {last_ts_old} LIMIT 1"
                    row['old_state_id'] = float(ssh_execute_sql_first(ssh, command))
#                    print(f"{index}: {row['old_state_id']}") #DEBUG

                last_state_old = row['state']
                last_ts_old = row['last_updated_ts']

                # Prepare values for SQL command with single quotes around each value
                values = []
                for value in row.tolist():
                    if isinstance(value, str):
                        values.append(f"'{value}'") #Single quote strings
                    elif isinstance(value, bytes):  # Handle BLOB data (bytes)
                        values.append(f"x'{value.hex()}'")  # SQLite BLOB literal format (convert to hex and write as hex string x'<hex_string>'
                    else:
                        values.append(str(value)) #No quotes if not string or BLOB

                # Construct SQL command dynamically by joining df_columns and values, respectively
                command = f"INSERT OR IGNORE INTO states " \
                          f"({', '.join(df_columns)}) " \
                          f"VALUES ({', '.join(values)})"
#                print(command) #DEBUG
                ssh_execute_sql(ssh, command) #Insert row
                entries_posted += 1

            if DELETE_BACKUP_ON_SUCCESS is True:
                try:
                    ssh_ha_execute_cmd_error(ssh, f'rm -f {backup_path}', f"Couldn't delete temporary backup: {backup_path}")
                except Exception as e:
                    print(e, file=sys.stderr)


        #End of stanza: if not df.empty: #Rows to add
        print(f"{entries_posted} new state entries for '{ENTITY_ID}' in: {ha_db_path}")

    except Exception as e:
        print(f'Error: {e}', file=sys.stderr)

    finally: #Cleanup
        if restart_ha is True:
            print("Restarting HA...", file=sys.stderr)
            try:
                ssh_ha_start(ssh)
            except Exception as e:
                print(f'Error: {e}', file=sys.stderr)


#===============================================================================
#### FUNCTIONS

def ssh_connect(ssh_host, ssh_user, ssh_passwd):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(ssh_host, username=ssh_user, password=ssh_passwd)
    return ssh

def ssh_ha_running(ssh):
    stdin, stdout, stderr = ssh.exec_command('SUPERVISOR_TOKEN=$(sudo cut -d= -f2 /data/.ssh/environment) ha core status')
    if stdout.channel.recv_exit_status() == 0:
        return True
    return False

def ssh_ha_stop(ssh):
    stdin, stdout, stderr = ssh.exec_command('SUPERVISOR_TOKEN=$(sudo cut -d= -f2 /data/.ssh/environment) ha core stop')
    if stdout.channel.recv_exit_status() != 0:
        raise Exception(f"[ssh_ha_stop] Couldn't stop HA...")
    return True

def ssh_ha_start(ssh):
    stdin, stdout, stderr = ssh.exec_command('SUPERVISOR_TOKEN=$(sudo cut -d= -f2 /data/.ssh/environment) ha core start')
    if stdout.channel.recv_exit_status() != 0:
        raise Exception(f"[ssh_ha_start] Couldn't start HA...")
    return True

def ssh_ha_execute_cmd_error(ssh, command, error_msg):
    stdin, stdout, stderr = ssh.exec_command(command)
    if stdout.channel.recv_exit_status() != 0:
        raise Exception(f'Error: {error_msg}')

def ssh_ha_execute_cmd_sudo_error(ssh, command, error_msg):
    stdin, stdout, stderr = ssh.exec_command(f'sudo {command}')
    if stdout.channel.recv_exit_status() != 0:
        raise Exception(f'Error: {error_msg}')

def ssh_execute_sql(ssh, command):
    stdin, stdout, stderr = ssh.exec_command(f'sudo sqlite3 {ha_db_path} "{command}"') #NOTE: 'sudo' since must be root to write HA db
    error = "".join(stderr).strip()
    if error:
        raise Exception(f'[ssh_execute_sql_string] {error}')
    return stdout

def ssh_execute_sql_first(ssh, command): #Return first row as string
    stdout = ssh_execute_sql(ssh, command)
    return stdout.readline().strip()

def ssh_execute_sql_string(ssh, command): #Join as string similar to how sqlite3 would display from command line
    stdout = ssh_execute_sql(ssh, command)
    return "".join(stdout).strip()

def ssh_execute_sql_list(ssh, command): #Return as list with one entry of tuplets per fow
    stdout = ssh_execute_sql(ssh, command)
    return [line.strip() for line in stdout]

def ssh_close(ssh):
    ssh.close()

def read_sql(meter_db_path, last_ts_ha):
    conn = sqlite3.connect(meter_db_path)
    cursor = conn.cursor()

    #Retrieve last timestamp for meter db
    cursor.execute(f"SELECT MAX(timestamp) FROM {METER_TABLE}")
    last_ts_meter = cursor.fetchone()[0]
    #print(last_ts_meter) #DEBUG
    if last_ts_meter is None:
        raise Exception(f"[connect_sql] Can't retrieve {METER_COLUMNS[0]} from '{METER_TABLE}' table in '{METER_DB_PATH}'") #METER_COLUMNS[0] = timestamp

    if last_ts_ha >= last_ts_meter: #No new data
        return None

    cursor.execute(f"SELECT {METER_COLUMNS[0]}, {METER_COLUMNS[1]} FROM {METER_TABLE} WHERE timestamp > {last_ts_ha} ORDER BY timestamp ASC")
    df = pd.DataFrame(cursor.fetchall(), columns=ha_state_columns)
    conn.close()
    return df

def read_csv(file_name, last_ts_ha): #Assume first column is utc 'Timestamp', second column is 'State'
                                     #Timestamps are seconds since the epoch - 1/1/1970 00:00:00 UTC
    try:
        df = pd.read_csv(
            file_name,
            usecols=METER_COLUMNS,
            na_values=['N/A', '']              # Treat 'N/A' and blanks as NaN
        )
        df.rename(columns=dict(zip(METER_COLUMNS, ha_state_columns)), inplace=True) #Renme columns to corresponding HA 'state' table names
    except Exception as e:
        raise Exception(f"Can't read CSV file '{file_name}': {e}")

    if not df.apply(pd.to_numeric, errors='coerce').notna().all().all():
        raise ValueError(f"Non-numeric entries found in the CSV file '{file_name}'")

    if (df['last_updated_ts'] <= 0).any():
        raise ValueError(f"Timestamps in '{file_name}' must be greater than zero")

    if last_ts_ha >= df['last_updated_ts'].max(): #No new data
        return None

    df = df[df['last_updated_ts'] > last_ts_ha]
    return df

def database_type(file_path):
    if not os.path.isfile(file_path):
        raise Exception(f"'{file_path}' does not exist")

    # Check if the file is an SQLite database
    try:
        with sqlite3.connect(f'file:{file_path}?mode=ro', uri=True) as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
            if cursor.fetchall():
                return 'sqlitedb'
    except sqlite3.Error:
        pass  # Silently pass if it's not an SQLite database

    # Check if the file is a CSV
    try:
        with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
            try:
                dialect = csv.Sniffer().sniff(f.read(1024))
                f.seek(0)
                reader = csv.reader(f, dialect)
                for row in reader:
                    pass  # If we can read rows, it's likely a CSV file
                return 'csv'
            except csv.Error:
                pass  # Silently pass if it's not a CSV file
    except Exception as e:
        print(f"Error reading as CSV: {e}")

    return None #Neither

#===============================================================================
if __name__ == "__main__":
    main()

# Local Variables:
# mode: Python;
# tab-width: 2
# End:

Here is an updated version that also adds the corresponding statistics to the statistics and short_term_statistics tables.
Note it also is generalized to work for any sensor, not just power/gas/water (i.e., sensors that are ‘total’ or ‘total increasing’.
There is a lot of flexibility via changing the (global) user variables at the head of the Python code.

Please let me know if you find any bugs in the code. Us at your own risk of course…

Here are just the headers and variables. The full code is to long to post here so see Full code for full code

#!/bin/sh
"exec" "$(dirname $(readlink -f $0))/venv/bin/python3" "$0" "$@"
# ===============================================================================
#  NOTE on python virtual environment setup and invocation
#  Above shebang used to invoke venv relative to directory in which script is stored
#  See: https://stackoverflow.com/questions/20095351/shebang-use-interpreter-relative-to-the-script-path)
#
# Alternatively, substitute below shebang line if all python libraries available in
# the system or if invoking script from within an already activated virtual environment:
#   #!/usr/bin/env python3
#
# Note: I use a 'venv' to avoid having to add additional libraries to my system
#
# To set up 'venv'
#   python3 -m venv --system-site-packages venv
#   source venv/bin/activate
#   pip3 install pip --upgrade
#   pip3 install pandas paramiko tzlocal
#   deactivate
#
#===============================================================================
# DESCRIPTION:
# Batch insert sensor entries for ENTITY_ID into the HA 'states' database table with
# 'last_updated_ts' time backdated to the time the entry was initially recorded
#
# The input data can be either a SQLite database or a CSV file.
# The data should be in two columns with one column containing the UTC timestamp
# of when data was recorded and the other column containing the sensor data.
# Extra columns are ignored
#
# Note if using a CSV file, the column names should be in the first row
#
# Note: Assumes you have ssh access with root privileges to the HA server
#       I use the "Advanced SSH & Web Terminal" HA Add-on
#
#===============================================================================
# USAGE:
# add_water.py
#
# Key user variables (in all-caps)
#    SENSOR_DB_PATH     File containing the sensor data -- may be SQLite or CSV
#    SENSOR_TABLE       Name of the data table if input file is SQLite
#    SENSOR_COLUMNS     Names of the 2 columns containing data
#                      First is for UTC timestamp, second for the corresponding data value
#    CONVERSION_FACTOR Multiply 2nd column by this factor to get correct state units
#    SKIP_UNCHANGED    Skip (and don't insert)duplicate sequential data elements (this makes sense if data is a 'total')
#    ENTITY_ID         Name of the sensor for which you want to enter the data
#    STATISTIC_ID      Typically same as ENTITY_ID
#    DEFAULT_ATTRIBUTE_ID Use this attribute id if can't find existing one in the database
#    STOP_SHART_HA     Stop HA before accessing HA db and restart after if set to True
#    BACKUP_HA_FLAG    Backup the HA db before accessing HA db if set to True
#    DELETE_BACKUP_ON_SUCCESS  Delete the backup if program completes successfully if set to True
#    BACKUP_HA_DIR     Name of the directory to store HA db backups if set to True
#                      Created if doesn't exist
#    REBUILD_FULL_STATISTICS If True, then rebuild all statistics back to the first state value stored
#    SSH_HOST          Name or ip address of the HA server
#    SSH_USER          Username on the HA server
#    SSH_PASSWD        SSH password to access SSH_USER@SSH_HOST
#                      Not needed if using passphrases and ssh-agent
#
#===============================================================================
# VERSION: 0.6.1
#
#===============================================================================
# CHANGELOG
#     0.5.0 (July 2024)
#       - First official release
#     0.6.1 (July 2024)
#       - Added ability to insert statistics (both 'statistics' and 'statistics_short_term')
#       - Added CONVERSION_FACTORE, DEFAULT_ATTRIBUTE_ID
#       - Bug fixes and major code cleanup
#
#===============================================================================
# AUTHOR:
#    Jeff Kosowsky
#    Copyright July 2024
#
#===============================================================================
#### NOTES:
# States are inserted into the HA 'states' table using manual SQL manipulations
# of the 'home-assistant_v2.db' database since the HA UI and REST API
# don't allow for inserting backdated state entries
#
# This is potentially "fragile" since you are manipulating the database directly
# which could lead to lock or race conditions, though it should generally be petty
# safe since nothing else should be writing to that metadata_id/entity_id
#
# If you are concerned about fragility, do one or more of the following:
# 1. Set the variable 'STOP_START_HA' to 'True' which will stop home assistant
#    before accessing the SQLite database and restart it at the end of the program
#
# 2. Backup just the 'home-assistant_v2.db' database either by setting 'BACKUP_HA_FLAG'
#    to 'True or copying it manually yourself
#    If using 'BACKUP_HA_FLAG', then setting 'DELETE_BACKUP_ON_SUCCESS' to 'True' will
#    delete the backup if routine completes successfully
#
# 3. Create a full backup of HA using the US
#
# Note my SQlite SENSOR_TABLE is called 'water' and is of form:
#    CREATE TABLE IF NOT EXISTS TABLE_NAME (
#    timestamp INTEGER PRIMARY KEY,
#    consumption REAL,
#    reading INTEGER
#    )
# Note that only 'timestamp' and 'reading' are used since I want total readings entered
#
#===============================================================================
##IMPORTS
import os
import sys
import warnings

import uuid
import csv
import sqlite3

import time
from datetime import datetime
from tzlocal import get_localzone

import pandas as pd

with warnings.catch_warnings(): #Otherwise get deprecation warnings on Ubuntu 18.04
    warnings.filterwarnings("ignore")
    import paramiko

#===============================================================================
#### GLOBAL VARIABLES (Note user-changeable variables are in all-caps

##Local sensor data database variables (paths refer to paths on the local machine)
SENSOR_TABLE = 'water' #Only needed for SQLite database inputs
SENSOR_DB_PATH = 'water.db' #Name of and path to local SQLITE db used to store sensor data
#SENSOR_DB_PATH = 'water.csv' #Name of local CSV db used to store sensor data

#NOTE: Assumed to be relative to script directory if not an absolute path
if not SENSOR_DB_PATH.startswith('/'):
    SENSOR_DB_PATH = os.getcwd() + '/' + SENSOR_DB_PATH
sensor_db = os.path.basename(SENSOR_DB_PATH)

#First column should be the name of the time-stamp column, second column should be the name of the data column
SENSOR_COLUMNS = ['timestamp', 'reading'] #Names of data columns from your SENSOR_TABLE corresponding to time & data
ha_state_columns = ['last_updated_ts', 'state'] #Don't change

CONVERSION_FACTOR = .01 #Multiply the 2nd column of SENSOR_COLUMNS by this factor to get the appropriate state units (e.g. ft^3/100 -> ft^3 for my water meter)
SKIP_UNCHANGED = True #Skip (and don't insert)duplicate sequential data elements (this makes sense if data is a 'total')
                      #Typically, but not always, if corresponding statistic_metadata  has column 'has_mean' = 0

## HomeAssistant variables (paths refer to paths on the remote HA machine)
ha_db_path = '/homeassistant/home-assistant_v2.db' #Path to 'home-assistant_v2.db' database

ENTITY_ID = f'sensor.meter_water' #Change this to name of sensor you use
STATISTIC_ID = ENTITY_ID #Typically have the same nameo
DEFAULT_ATTRIBUTE_ID = None #Use this as the default attribute_id for the entity_id if no prior states in the databases for entity_id
                          #Set to None if don't have or need one
DEFAULT_ATTRIBUTE_ID = None #Add your own number corresponding to a default id if no states yet exist for the sensor

STOP_START_HA = False #Stop and later restart HA if set to True

BACKUP_HA_FLAG = True   #If True then backup 'home_assistant_v2.db' before editing database
DELETE_BACKUP_ON_SUCCESS = True #If True, then remove backup if routine completes successfully
BACKUP_HA_DIR = "/tmp/backup" #Note directory is created if it doesn't exist

REBUILD_FULL_STATISTICS = False #If True, then rebuild all statistics

## SSH credentials and settings (note no password required if using ssh-agent)
SSH_HOST = 'homeassistant'
SSH_USER = '<YOUR_USER_NAME' #Change to your user name on the HA server
SSH_PASSWD = '<YOUR_PASSWORD>' #Not necessary if using ssh passphrases and ssh user-agent

## Other global variables
localtz = get_localzone()
backup_path = None
short_stat_int = 300 #Interval length of short_term_statistics in seconds
1 Like