Sharing Node-Red flow to automatically detect bad energy data (Zwave transmission errors) and remove it from a DB

Hello everyone.

This discussion made me think, that removing bad data from a DB manually by looking at charts and pressing buttons (or executing requests) isn’t really the “smart home” or “home automation” way of life. :wink:

So I tried to figure out a way to clean up my database automatically with a Node-Red flow.

I’m not using the internal HA energy dashboards, but InfluxDB and Grafana.
But maybe this still helps other people to solve this problem or port this over for the internal SQLite db (and post the solution here for others too).

In my case all the faulty values are from ZWave devices, which are prone to transmission errors and often report wrong values due to that.

The idea is to look at the current and the next value of the device and look for a bigger difference.
If there is a difference, it looks up the next values one by one until the next change occurs.

If the value then jumps back (or a little bit higher) to the original value, it’s collected as an error.
If it stays on a higher or lower level, it was most likely a real increase, (or a meter reset if it was a negative change).

The flow is executed once a hour

Node-Red flow export:

[{"id":"70fb862d319fe539","type":"group","z":"8b5e3beccae4b7e8","name":"Auto Spike Entfernung","style":{"label":false,"stroke":"none","fill":"#d1d1d1","fill-opacity":"0.5"},"nodes":["43bdee375fb16448","b19d02541fb69468","577050f701491bad","53f9967e529a9384","af5ac3ee95f68aff","b3ae1ed5d19e0564","09a85a5c338d03a0","9f32f8d05c9ba708","99bc919db4897270","a742140e072c3bde","a1c69c1aa8b4e77a","dbd1524875d3f007","d2cadc18980a4499","89e1f98921aaa71d","6f4dbcc6e9594e58"],"x":74,"y":119,"w":1132,"h":362},{"id":"43bdee375fb16448","type":"http request","z":"8b5e3beccae4b7e8","g":"70fb862d319fe539","name":"Send Influx API Request","method":"POST","ret":"txt","paytoqs":"ignore","url":"http://INFLUX_HOST:8086/api/v2/query?org=YOUR_ORG&bucket=homeassistant","tls":"","persist":false,"proxy":"","insecureHTTPParser":false,"authType":"","senderr":false,"headers":[{"keyType":"other","keyValue":"Authorization","valueType":"other","valueValue":"Token YOUR_TOKEN_HERE"},{"keyType":"other","keyValue":"Content-Type","valueType":"other","valueValue":"application/vnd.flux"},{"keyType":"Accept","keyValue":"","valueType":"other","valueValue":"application/csv"},{"keyType":"other","keyValue":"transfer-encoding","valueType":"other","valueValue":""}],"credentials":{"user":"","password":""},"x":250,"y":320,"wires":[["af5ac3ee95f68aff"]]},{"id":"b19d02541fb69468","type":"inject","z":"8b5e3beccae4b7e8","g":"70fb862d319fe539","name":"Inject","props":[],"repeat":"3600","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":170,"y":260,"wires":[["99bc919db4897270"]]},{"id":"577050f701491bad","type":"comment","z":"8b5e3beccae4b7e8","g":"70fb862d319fe539","name":"Automatic Energy Spike (due to Zwave transmition errors) removal","info":"","x":340,"y":160,"wires":[]},{"id":"53f9967e529a9384","type":"http request","z":"8b5e3beccae4b7e8","g":"70fb862d319fe539","name":"Sens Delete Request","method":"POST","ret":"txt","paytoqs":"ignore","url":"http://INFLUX_HOST:8086/api/v2/delete?org=YOUR_ORG&bucket=homeassistant","tls":"","persist":false,"proxy":"","insecureHTTPParser":false,"authType":"","senderr":false,"headers":[{"keyType":"other","keyValue":"Authorization","valueType":"other","valueValue":"Token YOUR_TOKEN_HERE"},{"keyType":"other","keyValue":"Content-Type","valueType":"other","valueValue":"application/json"}],"credentials":{"user":"","password":""},"x":880,"y":380,"wires":[[]]},{"id":"af5ac3ee95f68aff","type":"csv","z":"8b5e3beccae4b7e8","g":"70fb862d319fe539","name":"Parse CSV","sep":",","hdrin":true,"hdrout":"none","multi":"mult","ret":"\\n","temp":"","skip":"0","strings":true,"include_empty_strings":"","include_null_values":"","x":470,"y":320,"wires":[["b3ae1ed5d19e0564","a1c69c1aa8b4e77a"]]},{"id":"b3ae1ed5d19e0564","type":"function","z":"8b5e3beccae4b7e8","g":"70fb862d319fe539","name":"Group Rows by Entity","func":"function groupByKey(array, key) {\n    return array\n        .reduce((hash, obj) => {\n            if (obj[key] === undefined) return hash;\n            return Object.assign(hash, { [obj[key]]: (hash[obj[key]] || []).concat(obj) })\n        }, {})\n}\n\nconst grouped = (groupByKey(msg.payload, 'entity_id'));\n\nmsg = {};\nmsg.payload = grouped;\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":680,"y":320,"wires":[["a742140e072c3bde","6f4dbcc6e9594e58"]]},{"id":"09a85a5c338d03a0","type":"comment","z":"8b5e3beccae4b7e8","g":"70fb862d319fe539","name":"All Zwave energy measuring entities should be collected with *._electric_consumption_kwh","info":"","x":450,"y":220,"wires":[]},{"id":"9f32f8d05c9ba708","type":"debug","z":"8b5e3beccae4b7e8","g":"70fb862d319fe539","name":"Spikes","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1110,"y":320,"wires":[]},{"id":"99bc919db4897270","type":"function","z":"8b5e3beccae4b7e8","g":"70fb862d319fe539","name":"Create Payload / Query","func":"var start = '-1d';\nvar stop = '-0d';\n\nmsg.payload = `\n  from(bucket: \"homeassistant\")\n  |> range(start: ${start}, stop: ${stop})\n  |> filter(fn: (r) => r[\"entity_id\"] =~ /_electric_consumption_kwh$/)\n  |> filter(fn: (r) => r[\"_field\"] == \"value\")\n  |> filter(fn: (r) => r[\"_measurement\"] == \"kWh\")\n  |> sort(columns: [\"_time\"])\n`;\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":350,"y":260,"wires":[["43bdee375fb16448"]]},{"id":"a742140e072c3bde","type":"debug","z":"8b5e3beccae4b7e8","g":"70fb862d319fe539","name":"Grouped by entity","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":930,"y":260,"wires":[]},{"id":"a1c69c1aa8b4e77a","type":"debug","z":"8b5e3beccae4b7e8","g":"70fb862d319fe539","name":"CSV","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":650,"y":260,"wires":[]},{"id":"dbd1524875d3f007","type":"function","z":"8b5e3beccae4b7e8","g":"70fb862d319fe539","name":"Create Payload for Deletion","func":"var entity_id = msg.payload.entityId;\nvar start = msg.payload.rows[0]._time;\nvar stop = msg.payload.rows[msg.payload.rows.length - 1]._time;\nvar measurement = msg.payload.rows[0]._measurement;\n\nmsg.payload = {\n    start,\n    stop,\n    \"predicate\": `_measurement=\\\"${measurement}\\\" AND entity_id=\\\"${entity_id}\\\"`\n};\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":620,"y":380,"wires":[["53f9967e529a9384","89e1f98921aaa71d"]]},{"id":"d2cadc18980a4499","type":"split","z":"8b5e3beccae4b7e8","g":"70fb862d319fe539","name":"Split Array to single Spike Messages","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","x":310,"y":380,"wires":[["dbd1524875d3f007"]]},{"id":"89e1f98921aaa71d","type":"debug","z":"8b5e3beccae4b7e8","g":"70fb862d319fe539","name":"Delete Payload","active":true,"tosidebar":true,"console":true,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"auto","x":890,"y":440,"wires":[]},{"id":"6f4dbcc6e9594e58","type":"function","z":"8b5e3beccae4b7e8","g":"70fb862d319fe539","name":"Search for Spikes","func":"const spikes = [];\n\nfor (const entityId of Object.keys(msg.payload)) {\n    const rows = msg.payload[entityId];\n    const rowCount = rows.length;\n    \n    // Sort entries of device by time\n    rows.sort((a, b) => a._time.localeCompare(b._time));\n\n    // Loop over data rows of device\n    var i = 0;\n    while (i < rowCount-1) {\n        const startRow = rows[i];\n        const secondRow = rows[i+1];\n        const diff = Math.abs(startRow._value - secondRow._value);\n\n        // Large difference to previous row?\n        if (diff < 1.0) {\n            // Nope, repeat with next row\n            i++; \n        } else {\n            // Yes, search through (up to 20) next rows if it's a erroneous spike or a real increase/meter-reset\n            var j = 1;\n            while (j < 20 && i+j < rowCount) {\n                const yetAnotherRow = rows[i+j];\n                const floatCompareTolerance = 0.0001;\n\n                const sameAsSecondValue = Math.abs(secondRow._value - yetAnotherRow._value) < floatCompareTolerance;\n                const hasSlightlyIncreased = yetAnotherRow._value - startRow._value + floatCompareTolerance >= 0.0 && yetAnotherRow._value - startRow._value < 0.5;\n\n                if (sameAsSecondValue) {\n                    // Looks like the same (wrong?) value again. Look at one more row\n                    j++;\n                    continue;\n                } else if (hasSlightlyIncreased) {\n                    // End of Spike detected -> Add it to array\n                    spikes.push({\n                        entityId,\n                        rows: rows.slice(i+1, i+j),\n                        startRow,\n                        yetAnotherRow\n                    });\n                    // Repeat next search after current spike\n                    i += j;\n                    break;\n                } else {\n                    // Wasn't a spike, end search for current row\n                    i++;\n                    break;\n                }\n            }\n        }\n    }\n\n\n}\n\nmsg.payload = spikes;\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":910,"y":320,"wires":[["9f32f8d05c9ba708","d2cadc18980a4499"]]}]

In case someone doesn’t have a Node-Red instance running and wants to implement it somewhere else:
Here’s the Javascript code of the main node which collects the spikes/errors:

const spikes = [];

for (const entityId of Object.keys(msg.payload)) {
    const rows = msg.payload[entityId];
    const rowCount = rows.length;
    
    // Sort entries of device by time
    rows.sort((a, b) => a._time.localeCompare(b._time));

    // Loop over data rows of device
    var i = 0;
    while (i < rowCount-1) {
        const startRow = rows[i];
        const secondRow = rows[i+1];
        const diff = Math.abs(startRow._value - secondRow._value);

        // Large difference to previous row?
        if (diff < 1.0) {
            // Nope, repeat with next row
            i++; 
        } else {
            // Yes, search through (up to 20) next rows if it's a erroneous spike or a real increase/meter-reset
            var j = 1;
            while (j < 20 && i+j < rowCount) {
                const yetAnotherRow = rows[i+j];
                const floatCompareTolerance = 0.0001;

                const sameAsSecondValue = Math.abs(secondRow._value - yetAnotherRow._value) < floatCompareTolerance;
                const hasSlightlyIncreased = yetAnotherRow._value - startRow._value + floatCompareTolerance >= 0.0 && yetAnotherRow._value - startRow._value < 0.5;

                if (sameAsSecondValue) {
                    // Looks like the same (wrong?) value again. Look at one more row
                    j++;
                    continue;
                } else if (hasSlightlyIncreased) {
                    // End of Spike detected -> Add it to array
                    spikes.push({
                        entityId,
                        rows: rows.slice(i+1, i+j),
                        startRow,
                        yetAnotherRow
                    });
                    // Repeat next search after current spike
                    i += j;
                    break;
                } else {
                    // Wasn't a spike, end search for current row
                    i++;
                    break;
                }
            }
        }
    }


}

msg.payload = spikes;
return msg;