Migrating from InfluxDB to TimescaleDB on a Raspberry Pi

I have been using InfluxDB for the past five months to create history of HA entities (mostly heating system and solar energy production/consumption). However, two weeks ago I was affected by InfluxDBs inability to scale beyond 2GB databases on a Raspi therefore I decided to switch to TimescaleDB. I had very good experiences with PostgreSQL in the past at work and home so it was worth a try.

Installing TimescaleDB is described in depth on their home page. The only hurdle was the timescaledb-tune command which is not available for Raspian. I was able to install Go 1.14 (available in buster-backports) and compile it from sources (not sure why it’s not in the repositories since it was no problem at all and Go produces binaries without any dependencies). This sidetrack excluded TimescaleDB was up and running in less than 30 minutes.
Then I installed Expaso’s custom integration LTSS, configured it in HA and data arrived immediately without any problems. Since Grafana - which I use for analysis and visualization - supports PostgreSQL natively changing the dashboards was straight forward. Using SQL I was also able to fix a long-standing issue in Grafana/InfluxDB: when there are no measurements within the select time interval Grafana wouldn’t show anything. With some SQL magic I was able to fix this by simply carrying the latest value outside the interval to just the interval boundary. In order to make queries in Grafana simpler I created some database functions which I want to share here:

CREATE OR REPLACE FUNCTION timeseries(entity VARCHAR, time_from TIMESTAMP WITH TIME ZONE, time_to TIMESTAMP WITH TIME ZONE)
RETURNS TABLE("time" TIMESTAMP WITH TIME ZONE, state FLOAT) AS $$
	SELECT * FROM 
		(
			(SELECT time, state::float
			 FROM ltss
			 WHERE entity_id = entity AND time > time_from AND time < time_to AND NOT state = 'unavailable')
		UNION
			(SELECT time_from, state::float
			 FROM ltss
			 WHERE entity_id = entity AND time <= time_from AND NOT state = 'unavailable'
			 ORDER BY time DESC LIMIT 1)
		UNION
			(SELECT time_to, state::float
			 FROM ltss
			 WHERE entity_id = entity AND time <= time_to AND NOT state = 'unavailable'
			 ORDER BY time DESC LIMIT 1)
		) a ORDER BY time;
$$ LANGUAGE SQL;

You can use this in Grafana like SELECT * FROM timeseries('sensor.xyz', $__timeFrom(), $__timeTo());
The first query in the UNION returns the data within the interval, the second carries the latest value outside the interval to the “left” boundary, the third query carries the last measurement inside the interval to the “right” boundary. With that you will always get complete graphs not matter where and if there are measurements inside the interval (except if there are no measurements at all).

An extension of this function is

CREATE OR REPLACE FUNCTION timeseries(entity VARCHAR, time_from TIMESTAMP WITH TIME ZONE, time_to TIMESTAMP WITH TIME ZONE, bucket_size INTERVAL)
RETURNS TABLE("time" TIMESTAMP WITH TIME ZONE, state FLOAT) AS $$
	SELECT tb AS time, state FROM 
		(
			(SELECT time_bucket(bucket_size, time) AS tb, AVG(state::float) AS state
			 FROM ltss
			 WHERE entity_id = entity AND time > time_from AND time < time_to AND NOT state = 'unavailable'
			 GROUP BY tb)
		UNION
			(SELECT time_from AS tb, state::float
			 FROM ltss
			 WHERE entity_id = entity AND time <= time_from AND NOT state = 'unavailable'
			 ORDER BY time DESC LIMIT 1)
		UNION
			(SELECT time_to AS tb, state::float
			 FROM ltss
			 WHERE entity_id = entity AND time <= time_to AND NOT state = 'unavailable'
			 ORDER BY time DESC LIMIT 1)
		) a ORDER BY time;
$$ LANGUAGE SQL;

It takes a fourth argument which lets you aggregate measurements into buckets. This is useful if you have many measurements inside the interval because a) the graph can look bad if the values vary a lot and b) it can slow down rendering due to the large number of datapoints. Use the function as SELECT * FROM timeseries('sensor.xyz', $__timeFrom(), $__timeTo(), INTERVAL '1m');.

A third function is used to compute the integral below a series of measurements (daily energy consumption and production in my case):

CREATE OR REPLACE FUNCTION integral(entity VARCHAR, time_from TIMESTAMP WITH TIME ZONE, time_to TIMESTAMP WITH TIME ZONE) RETURNS FLOAT AS $$
DECLARE
integral FLOAT;
BEGIN
	SELECT SUM(xy) INTO STRICT integral FROM
		(SELECT time, state, EXTRACT(EPOCH from (time - LAG(time) OVER w)) / 3600 * LAG(state) OVER w AS xy FROM
			(SELECT time, state::float AS state FROM ltss WHERE entity_id = entity AND state::float > 0 AND time BETWEEN time_from AND time_to) a WINDOW w AS (ORDER BY time)) b;
	IF integral IS NULL THEN
        RETURN 0;
    ELSE
        RETURN integral;
    END IF;
END;
$$ LANGUAGE plpgsql;

Use it like SELECT integral('sensor.consumption', $__timeFrom(), $__timeTo()).

The last challenge was importing the existing data from InfluxDB into Postgres. I used Outflux for this:

./outflux migrate home_assistant '°C' W A V '%' --input-user user --input-pass pass  --output-conn "postgresql://user:pass/ha_history" --input-server http://influx:8086 --to '2020-10-06T19:05:00Z' --max-parallel 1

It directly migrates data, however not in the same format as the LTSS integration uses. But you can easily get it into the right shape with a few SQL commands:

INSERT INTO ltss (time, entity_id, state) SELECT time, 'sensor.' || entity_id, value FROM "°C";
INSERT INTO ltss (time, entity_id, state) SELECT time, 'sensor.' || entity_id, value FROM "W";
...

If you don’t want to import all series you can simply append a WHERE NOT (entity_id IN ('a', 'b', 'c')) or similar. Lastly, drop the tables created by the import and that’s it.

In order to reduce the amount of data stored, I added a regular cronjob that downsamples older data (I have some sensors that create measurements every few seconds):

#!/bin/bash

if [[ "$(id -u)" -eq 0 ]]; then
    exec sudo -H -u postgres $0 "$@"
fi

TABLE=ltss
INTERVAL=14d
DB=ha_history

rangeEnd=$(echo "SELECT NOW() - INTERVAL '$INTERVAL';" | psql -tA $DB)
echo "SELECT entity_id FROM (SELECT COUNT(*) AS c, entity_id FROM $TABLE WHERE time >= now() - INTERVAL '1d' GROUP BY entity_id) t WHERE c > 1440 ORDER BY c;" | psql -tA $DB | while read entity; do
        echo "Downsampling $entity"
        echo "DELETE FROM $TABLE WHERE entity_id = '$entity' AND time <= '$rangeEnd' AND NOT (state ~ '^[0-9]+(\\.[0-9]+)?$');" | psql $DB
        rangeStart=$(echo "SELECT MIN(time) FROM $TABLE WHERE entity_id = '$entity' AND NOT downsampled" | psql -tA $DB)

        echo "INSERT INTO $TABLE (time, entity_id, state, downsampled) 
                SELECT time, entity_id, CASE WHEN avg IS NULL THEN last ELSE avg END, true FROM
                        (SELECT time_bucket_gapfill('1m', time) AS time, entity_id, AVG(state::float) AS avg, LOCF(LAST(state::float, time)) AS last FROM
                                $TABLE WHERE entity_id = '$entity' AND time BETWEEN '$rangeStart' AND '$rangeEnd' AND NOT downsampled GROUP BY time_bucket_gapfill('1m', time), entity_id) a;" | psql $DB

        # echo "INSERT into $TABLE (time, entity_id, state, downsampled) SELECT time_bucket('1m', time), entity_id, AVG(state::float), true FROM $TABLE WHERE entity_id = '$entity' AND time < '$cutoff' AND NOT downsampled GROUP BY time_bucket('1m', time), entity_id;" | psql $DB
        read
        echo "DELETE FROM $TABLE WHERE entity_id = '$entity' AND time <= '$rangeEnd' AND NOT downsampled;" | psql $DB
done

It takes any rows older than 14 days (INTERVAL), aggregates them into one minute intervals and deletes the original values. It only does this for entities which have more than one value per minute over the last day.

Finally you can enable TimescaleDB compression which reduced the size quite considerably:

ALTER TABLE ltss SET (
  timescaledb.compress,
  timescaledb.compress_segmentby = 'entity_id',
  timescaledb.compress_orderby = 'time DESC, id'
);
SELECT add_compress_chunks_policy('ltss', INTERVAL '28 days');

I hope this rather lenghty personal journey is useful to some of you.

6 Likes

Very nice!

One correction though, the LTSS component was created by @freol and not by me, so he deserves the credits for that. I created the TimescaleDb (Postgres) and pgAdmin Hassio add-ons.

I really think this will facilitate people in their migration to Timescale, so thanks again!

Very nice!

Following an improved postgres function which allows to select multiple metrics (like-syntax for entity):

CREATE OR REPLACE FUNCTION timeseries(entity VARCHAR, time_from TIMESTAMP WITH TIME ZONE, time_to TIMESTAMP WITH TIME ZONE, bucket_size INTERVAL)
RETURNS TABLE("time" TIMESTAMP WITH TIME ZONE, metric VARCHAR, state FLOAT) LANGUAGE SQL AS $$
  SELECT time_bucket(bucket_size, time) AS time, entity_id AS metric, AVG(state::float) AS state
  FROM ltss
  WHERE entity_id like entity AND time between time_from AND time_to
  GROUP BY time, entity_id
  UNION ALL
  SELECT time_from, entity_id, state::float
  FROM (
    SELECT ltss.*, row_number() over (partition by entity_id order by time DESC) -1 nb
    FROM ltss
    WHERE entity_id like entity AND time <= time_from
  ) x
  WHERE nb = 1
  UNION ALL
  SELECT time_to, entity_id, state::float
  FROM (
    SELECT ltss.*, row_number() over (partition by entity_id order by time DESC) -1 nb
    FROM ltss
    WHERE entity_id like entity AND time <= time_to
  ) x
  WHERE nb = 1
  ORDER BY 1;
$$;

And bucket_size can be adjusted dynamically by diagram range and resolution as follows:

SELECT * FROM timeseries('sensor.temperature%', $__timeFrom(), $__timeTo(), ($__interval_ms / 1000 || 'second')::interval )

Thanks for the write up! After reading the compression docs I have a question. Is the orderby really necessary? It defaults to time if it is not set according to the docs and IDs aren’t reused afaik so they can’t be out of order Am I missing something?

I recently found two small errors, one in the integral function and one in the downsample script. They lead to wrong values when you have large gaps in measurements with hugely different values between these two consecutive measurements. For example energy import was 0 and then goes up to 2kW after several hours (cooking starts). This results in two data point

09:00:00 0
14:00:00 2000

The former version of the integral computed (14 - 9) * (2000 - 0) / 2 which is clearly wrong because it should be 0 for the whole five hours.

A similar problem existed in the downsample script for the same scenario. Both are already fixed in the first post.

@sithmein Why do you use an external script for downsampling instead of continuous aggregation and retention policy ?

I had a look at continuous aggregates but I find them rather clumsy to use later on. I visualize data in Grafana and ideally you use a single query throughout the whole lifetime of your data. If some parts(the most recent one) are in the main table and others (the older ones) are in the materializes view you have to query both sources. And then you get some data back twice if the aggregate and the data still present in the main data overlaps.
Therefore I decided to downsample manually and keep everything in one table.

I think you are using them wrong. Continues aggregates already combine materialized and live data by themselves by default resulting in real time aggregates from timescale 1.7 onward:

1 Like