The available presence detection mechanism are all quite unreliable as can be seen from various topics in this forum. I found a way to reliably track both Android devices and iPhones using arping
. Unfortunately you cannot simply call arping
and get a definite answer but you have to built a bit more logic around it. It did this in a Python script that publishes presence via MQTT. It starts arping
as a process and connects to its stdout. arping
will run continuosly and try to reach the devices every five seconds. This works extremely reliable but it has the drawback that intervals and devices have to be configured both in HA as well as in the Python script. I briefly thought about wring an Appdaemon script but it doesn’t work because the script is in an endless loop reading the arping
output which doesn’t match Appdaemon’s model at all.
Therefore I was wondering whether the script could be used as a basis for writing a native HA integration. Unfortunately I have zero knowledge about HA integrations and the time right now to get into it. But maybe someone else does.
Here’s the script:
#!/usr/bin/python3
import subprocess
import paho.mqtt.client as mqtt
import datetime
import threading
from signal import pause
MQTT_SERVER = "localhost"
MQTT_TOPIC = "sensors/presence/{0:s}/{1:s}"
NOT_HOME = datetime.timedelta(minutes = 5)
def on_connect(client, userdata, flags, rc):
if rc != 0:
print("Bad connection Returned code=", rc)
def track(dev):
proc = subprocess.Popen(['arping', '-0', '-W5', '-D', dev], stdout = subprocess.PIPE, bufsize = 0, encoding = 'utf-8')
while True:
name = devices[dev]["name"]
c = proc.stdout.read(1)
now = datetime.datetime.now()
lastSeen = devices[dev]["last_seen"]
if not c:
break
elif c == '!':
devices[dev]["last_seen"] = now
if (devices[dev]["state"] != "home") or (now - lastSeen >= NOT_HOME) or (now - lastUpdate >= NOT_HOME):
devices[dev]["state"] = "home"
client.publish(MQTT_TOPIC.format(name, "state"), "home")
client.publish(MQTT_TOPIC.format(name, "last_seen"), now.isoformat())
client.publish(MQTT_TOPIC.format(name, "time"), now.isoformat())
lastUpdate = now
elif (devices[dev]["state"] != "not_home") and ((now - lastSeen >= NOT_HOME) or (now - lastUpdate >= NOT_HOME)):
devices[dev]["state"] = "not_home"
client.publish(MQTT_TOPIC.format(name, "state"), "not_home")
client.publish(MQTT_TOPIC.format(name, "last_seen"), devices[dev]["last_seen"].isoformat())
client.publish(MQTT_TOPIC.format(name, "time"), now.isoformat())
lastUpdate = now
# MQTT
client = mqtt.Client()
client.on_connect = on_connect
client.username_pw_set(username = 'user', password = 'password)
client.connect(MQTT_SERVER, 1883, 60)
client.loop_start()
now = datetime.datetime.now()
lastUpdate = now
devices = {
"192.168.1.2": {"name": "user1", "last_seen": now, "state": "unknown"},
"192.168.1.3": {"name": "user2", "last_seen": now, "state": "unknown"}
}
for dev in devices:
t = threading.Thread(target = track, args = (dev,))
t.start()
pause()