Hey folks!
This one is pretty niche. And I need to offer the disclaimer that I’m not a software developer or an engineer nor am I trained or work in an even remotely related field. So this is a very amateur effort.
What I wanted to accomplish was to have a map on my dashboard showing me everywhere my RV has ever been. I should preface that I’m using Home Assistant in an RV in this instance; but none of this is actually RV specific. This is pure vanity, I just wanted to be able to pull up a map and see everywhere I’ve been.
Home Assistant’s built in maps “breadcrumbs” feature is nice, but won’t necessarily show years of data over a very large area (like, the entire United States, as an example) very efficiently or reliably; and it would still rely on data that may be cluttered and difficult to access.
Instead; I wanted to log GPS coordinates to a MySQL database using MariaDB. And I wanted to solve the problem of tons of errenous data making for an enormous database (such as lots of points around where the RV is parked) by using the Haversine formula to calculate the distance between two points, and log only when the RV is actually moving. Furthermore, I wanted to solve ANOTHER problem; and that was temporary loss of GPS data. I had a hardware failure on one trip so I had two points that were close to 100 miles apart. So I added functionality to use OpenStreetMaps to synthetically plot points between two points along roads; so that the data “looks nice” and makes sense.
You will need:
-
A source of GPS Latitude and Longitude. This mini-guide won’t cover how to obtain that. In my case, I have a Raspberry Pi elsewhere in the rig which happens to have a GPS dongle, and it’s providing latitude and longitude via MQTT. They are established as sensors in Home Assistant. I’m sure this could be done with device trackers, a GPS dongle directly attached to the Home Assistant machine, or even some sort of external logger. You just might need to use template sensors or helpers to create a distinct latitude and longitude source; or you may need to adjust the script.
-
The MariaDB Add-On or another database source. But I’ve only tested this in Home Assistant OS running the MariaDB add-on. I suspect if you’re smart enough to have some fancy external database server running somewhere, you’re smart enough to tweak this as needed to use it.
-
AppDaemon. This script is intended to run inside the AppDaemon Add-On for Home Assistant OS; mainly to gain access to a more capable Python implementation than the default one in Home Assistant. NOTE: You’ll need to add mysql-connector-python to your AppDaemon config.
Here’s the script!
gps_logger.py
import appdaemon.plugins.hass.hassapi as hass
import mysql.connector
from mysql.connector import pooling
from datetime import datetime, timedelta
from math import radians, sin, cos, sqrt, atan2
import aiohttp
import asyncio
import queue
from threading import Thread
import json
import os
class GPSLogger(hass.Hass):
def initialize(self):
self.gps_lat_sensor = self.args["gps_lat_sensor"]
self.gps_lon_sensor = self.args["gps_lon_sensor"]
self.db_host = self.args["db_host"]
self.db_user = self.args["db_user"]
self.db_password = self.args["db_password"]
self.db_name = self.args["db_name"]
self.log_distance = self.args["log_distance"]
self.route_distance = self.args["route_distance"]
self.min_distance_threshold = 10 # Minimum distance threshold to log new points
self.final_distance_threshold = 100 # Distance threshold to avoid perpetual routing
self.temp_log_file = "/homeassistant/temp_gps_log.json" # Temporary file for logging points
# Setup database connection pool
self.db_pool = mysql.connector.pooling.MySQLConnectionPool(
pool_name="gps_pool",
pool_size=5,
host=self.db_host,
user=self.db_user,
password=self.db_password,
database=self.db_name
)
# Fetch last coordinates on startup
self.previous_latitude, self.previous_longitude = self.get_last_coordinates()
if self.previous_latitude is not None and self.previous_longitude is not None:
self.log(f"Initialized previous coordinates to ({self.previous_latitude}, {self.previous_longitude})")
else:
self.log("No previous coordinates found on startup")
# Setup the route queue and background thread
self.route_queue = queue.Queue()
self.background_thread = Thread(target=self.process_queue)
self.background_thread.daemon = True
self.background_thread.start()
# Listen for changes in GPS sensors
self.listen_state(self.check_movement, self.gps_lat_sensor)
self.listen_state(self.check_movement, self.gps_lon_sensor)
# Periodically check for new points added via SQL
self.run_every(self.check_for_new_points, "now", 60) # Check every 60 seconds
# Process any points in the temp log file on startup
self.process_temp_log()
def process_temp_log(self):
if os.path.exists(self.temp_log_file):
with open(self.temp_log_file, "r") as f:
temp_data = json.load(f)
for entry in temp_data:
self.add_coordinates_to_db(entry["coordinates"], entry["timestamp"])
os.remove(self.temp_log_file)
self.log(f"Processed and removed temporary log file: {self.temp_log_file}")
def check_movement(self, entity, attribute, old, new, kwargs):
if old == new:
return
current_latitude = float(self.get_state(self.gps_lat_sensor))
current_longitude = float(self.get_state(self.gps_lon_sensor))
if self.previous_latitude is None or self.previous_longitude is None:
self.previous_latitude, self.previous_longitude = self.get_last_coordinates()
if self.previous_latitude is not None and self.previous_longitude is not None:
distance = self.haversine(current_latitude, current_longitude, self.previous_latitude, self.previous_longitude)
self.log(f"Calculated distance: {distance} meters from ({self.previous_latitude}, {self.previous_longitude}) to ({current_latitude}, {current_longitude})")
if distance > self.route_distance:
self.log(f"Distance greater than {self.route_distance} meters, adding route task to queue.")
timestamp = datetime.utcnow()
self.route_queue.put((self.previous_latitude, self.previous_longitude, current_latitude, current_longitude, timestamp))
# Update previous coordinates immediately after adding the route task
self.previous_latitude, self.previous_longitude = current_latitude, current_longitude
elif distance > self.log_distance and distance > self.min_distance_threshold:
self.log(f"Distance greater than {self.log_distance} meters and above threshold, updating coordinates without routing.")
self.add_coordinates_to_db([(current_latitude, current_longitude)])
# Update previous coordinates immediately after adding the new coordinates
self.previous_latitude, self.previous_longitude = current_latitude, current_longitude
else:
self.log(f"Distance less than {self.log_distance} meters or below threshold, no update.")
else:
self.log("No previous coordinates found, inserting current coordinates.")
self.add_coordinates_to_db([(current_latitude, current_longitude)])
# Update previous coordinates immediately after adding the new coordinates
self.previous_latitude, self.previous_longitude = current_latitude, current_longitude
def check_for_new_points(self, kwargs):
new_latitude, new_longitude, timestamp = self.get_new_coordinates()
if new_latitude is not None and new_longitude is not None:
if self.previous_latitude is not None and self.previous_longitude is not None:
distance = self.haversine(new_latitude, new_longitude, self.previous_latitude, self.previous_longitude)
if distance > self.route_distance:
self.log(f"New far-away point detected at ({new_latitude}, {new_longitude}), queuing route task.")
self.route_queue.put((self.previous_latitude, self.previous_longitude, new_latitude, new_longitude, timestamp))
self.delete_coordinate(new_latitude, new_longitude, timestamp)
def get_new_coordinates(self):
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor()
cursor.execute("SELECT latitude, longitude, timestamp FROM gps_data ORDER BY timestamp DESC LIMIT 1")
row = cursor.fetchone()
cursor.close()
conn.close()
if row and (row[0] != self.previous_latitude or row[1] != self.previous_longitude):
return row[0], row[1], row[2]
else:
return None, None, None
except Exception as e:
self.log(f"Error fetching new coordinates: {e}", level="ERROR")
return None, None, None
def delete_coordinate(self, lat, lon, timestamp):
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM gps_data WHERE latitude = %s AND longitude = %s AND timestamp = %s", (lat, lon, timestamp))
conn.commit()
cursor.close()
conn.close()
self.log(f"Deleted coordinate ({lat}, {lon}) with timestamp {timestamp}")
except Exception as e:
self.log(f"Error deleting coordinate: {e}", level="ERROR")
def haversine(self, lat1, lon1, lat2, lon2):
R = 6371000 # Radius of the Earth in meters
phi1 = radians(lat1)
phi2 = radians(lat2)
delta_phi = radians(lat2 - lat1)
delta_lambda = radians(lon2 - lon1)
a = sin(delta_phi / 2.0) ** 2 + cos(phi1) * cos(phi2) * sin(delta_lambda / 2.0) ** 2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
return R * c
def get_last_coordinates(self):
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor()
cursor.execute("SELECT latitude, longitude FROM gps_data ORDER BY timestamp DESC LIMIT 1")
row = cursor.fetchone()
cursor.close()
conn.close()
if row:
return row[0], row[1]
else:
return None, None
except Exception as e:
self.log(f"Error fetching last coordinates: {e}", level="ERROR")
return None, None
def add_coordinates_to_db(self, coordinates, timestamp=None):
try:
conn = self.db_pool.get_connection()
cursor = conn.cursor()
for coordinate in coordinates:
if timestamp is None:
timestamp = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
lat, lon = coordinate if len(coordinate) == 2 else coordinate[:2]
location = f"POINT({lon} {lat})"
cursor.execute("INSERT INTO gps_data (latitude, longitude, timestamp, location) VALUES (%s, %s, %s, ST_GeomFromText(%s))", (lat, lon, timestamp, location))
conn.commit()
cursor.close()
conn.close()
self.log(f"Inserted {len(coordinates)} coordinates at {timestamp}")
# Update the previous coordinates with the most recent logged point
if coordinates:
self.previous_latitude = coordinates[-1][0]
self.previous_longitude = coordinates[-1][1]
self.log(f"Updated previous coordinates to ({self.previous_latitude}, {self.previous_longitude})")
except Exception as e:
self.log(f"Error inserting coordinates: {e}", level="ERROR")
async def fetch_route(self, start_lat, start_lon, end_lat, end_lon):
try:
url = f"http://router.project-osrm.org/route/v1/driving/{start_lon},{start_lat};{end_lon},{end_lat}?overview=full&geometries=geojson"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
data = await response.json()
route = data['routes'][0]['geometry']['coordinates']
return [(lat, lon) for lon, lat in route]
except Exception as e:
self.log(f"Error fetching route: {e}", level="ERROR")
return None
def process_queue(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
while True:
task = self.route_queue.get()
if task is None:
break
start_lat, start_lon, end_lat, end_lon, original_timestamp = task
distance = self.haversine(start_lat, start_lon, end_lat, end_lon)
if distance < self.final_distance_threshold:
self.log(f"Final distance {distance} meters is below the threshold {self.final_distance_threshold}, logging end point only.")
self.add_coordinates_to_db([(end_lat, end_lon)], original_timestamp.strftime('%Y-%m-%d %H:%M:%S'))
self.previous_latitude = end_lat
self.previous_longitude = end_lon
else:
route_there = loop.run_until_complete(self.fetch_route(start_lat, start_lon, end_lat, end_lon))
route_back = loop.run_until_complete(self.fetch_route(end_lat, end_lon, start_lat, start_lon))
if route_there and route_back:
self.log("Logging route points there and back.")
intermediate_points = []
num_points_there = len(route_there)
num_points_back = len(route_back)
for i, point in enumerate(route_there):
lat, lon = point
# Adjust the timestamp for each intermediate point there
point_timestamp = (original_timestamp + timedelta(seconds=i)).strftime('%Y-%m-%d %H:%M:%S')
intermediate_points.append((lat, lon, point_timestamp))
for i, point in enumerate(route_back):
lat, lon = point
# Adjust the timestamp for each intermediate point back
point_timestamp = (original_timestamp + timedelta(seconds=(num_points_there + i))).strftime('%Y-%m-%d %H:%M:%S')
intermediate_points.append((lat, lon, point_timestamp))
# Add the final point as the current location
final_timestamp = (original_timestamp + timedelta(seconds=(num_points_there + num_points_back))).strftime('%Y-%m-%d %H:%M:%S')
intermediate_points.append((start_lat, start_lon, final_timestamp))
self.add_coordinates_to_db(intermediate_points)
else:
self.log("Failed to get route, logging only end point.")
self.add_coordinates_to_db([(end_lat, end_lon)], original_timestamp.strftime('%Y-%m-%d %H:%M:%S'))
self.route_queue.task_done()
GPSLogger:
module: gps_logger
class: GPSLogger
gps_lat_sensor: sensor.gps_latitude
gps_lon_sensor: sensor.gps_longitude
db_host: core-mariadb
db_user: homeassistant
db_password: DBPASSWORD
db_name: DBNAME
log_distance: 25
route_distance: 50
You’ll need to adjust the configuration as needed. I recommend using a new database just for this data. log_distance
and route_distance
are in meters. log_distance
will not log any new coordinates if the current coordinates are closer than the specified distance (default: 25 meters) to the most recent coordinates in the database. This helps prevent you from having thousands of points in a parking lot, for example; as this was designed with tracking vehicles in mind.
route_distance
tells the script to plot additional points along roads using OpenStreetMap routing if the new plot is more than the set distance from the most recent plot. Again this is useful if you have a loss of data, or if you want to import data from another source that doesn’t end in the same spot your GPS data will begin. This will ‘fudge’ your data a bit by routing it along roads so you don’t have straight lines on your map between two distant points. For example, I wrote a different script to import a KML file into my database which ended some 100km away from where my RV was sitting. Once I ran this script, it added a bunch of new points on the highway between those two points, making for a visualization that ‘makes sense’ and follows roads instead of the shortest distance between two points. There is a fallback in this though that will abandon the attempt to do that routing if it can’t access the internet, and will instead just log the point it receives as normal.
And of course, you’ll need to fill in your database credentials and database name. I used rv_tracking for my database name but that may not make sense for you.
DISCLAIMER: I used ChatGPT to clean up the code, help me with some of the math, and help me re-write it in the end to use variables in the Yaml file instead of having everything hard-coded (as I initially did), so that it would be more user-friendly for other people.
Alright, so now what do I do with this?
Well, whatever you want. All this will have accomplished is a MariaDB database full of latitudes and longitudes. It’ll put them all into a single table called gps_data
.
What I did with it was use Grafana
. I installed Grafana as an add-on, and created a new visualization pointed to my MariaDB database. I created a simple dashboard using the ‘Geomap’ option, and used the following as the Query, after initially selecting my ‘rv_tracking’ database:
SELECT
timestamp,
latitude,
longitude
FROM
gps_data
ORDER BY
timestamp ASC;
I set the base layer as ‘OpenStreetMap’ and Layer 1 as ‘Route(Beta)’, under ‘Map view’ I selected ‘Fit to Data’ and… boom! Now I have a simple map visualization with a blue line tracing everywhere I’ve been with the RV over the last several years. Thousands of miles worth of data visualized efficiently; and it will continue to update in real-time. I then clicked the three little dots, hit ‘share’, and used the link it provided in a webpage card to place that data on my dashboard.
In the off chance that this weirdly niche little thing is somehow useful to someone else; or can be modified in some way to be useful to someone else, enjoy!