Hi everybody!
Be forewarned, this is going to be a long post.
TL;DR: I wrote an AppDaemon
app that acts like a service, and the service it provides is WiFi based presence detection through tshark & an extra wifi adapter in monitor mode. I’ve found it to be faster and more responsive than nmap
. It doesn’t require you to have a static local IP on each device, but still suffers from “not seeing” phones that shut off their WiFi chips to save power during standby.
##Prerequisites:
- Linux platform running
AppDaemon
- tshark, installation instructions are in the app.
- Read/Write access to
/srv/homeassistant
, the app will read and delete files from this directory.tshark
will write to this directory. - A second wireless interface that supports monitor mode. The dongle I purchased can be found here. You can purchase whatever you want, but it must be able to be put into monitor mode. That is the important part.
##Current Version: 1.1.0
##Features:
- Detect presence and set
binary_sensor.<user>_presence
toHome
orOut
- Optionally, if you want to work your own detection into the mix, leave off the
ttl
value, andPresence Listener
will only update thelast_seen
, giving you complete control over your own automations! users.json
file to keep track of users, persists throughAppDaemon
restarts- Add new user without needing to run
stop()
- Event Type:
listener_add_user
- Event Data:
{"friendly_name": "<name>", "mac": "<mac address>"}
#presence_listener.py
import appdaemon.appapi as appapi
import os
import subprocess
import glob
import time
import json
import re
from datetime import datetime, timedelta
#
# Presence Listener -- tshark presence detection
#
# Prerequisites:
# - READ & WRITE access to /srv/homeassistant
# - an **extra** wireless adapter that can be put in monitor mode
# - tshark; -> sudo apt-get update
# -> sudo apt-get install tshark
# ---> should non-super users be able to capture packets: YES
# -> sudo adduser homeassistant wireshark
# -> reboot
#
# args:
# interface = wireless interface (device) to listen on; don't know? type iwconfig; usually wlan1
# refresh_time = scanning time in seconds
# users_fp = filepath to users dictionary structure -or- "worker_node" if a slave instance
# instance_name = a unique name for the node, use the location of the pi eg. living room pi
# ttl = optional, number of minutes without seeing mac address to determine 'Out'
#
# tshark further reading:
# man pages: https://www.wireshark.org/docs/man-pages/tshark.html
# filter reference: https://www.wireshark.org/docs/dfref/w/wlan.html
#
# Changelog
#
# Version 1.1.0:
# Added self.expose_last_seen(mac, last_seen)
# This allows the user to control their own automations, and will only take over if the
# ttl value is not set.
#
# Added a persistent_notification to self.add_user()
# This will notify the front end when successful addition of new user, also alerting the
# front end of the new entity_id to add to any groups/views.
#
# Version 1.0.2:
# Fixed strptime conversion of a self.datetime.now()
#
# Version 1.0.1:
# Replaced datetime.now() with self.datetime() for better compatibility in AppDaemon
#
# Version 1.0.0:
# Initial Version
#
class PresenceListener(appapi.AppDaemon):
def initialize(self):
# a way to keep track of individual users' timer-callbacks
self.user_timer_library = {}
# real quick'n'dirty string sanitization juuuust in case
rx = re.compile('\W+') # Matches one or more of anything other than a letter, digit or underscore.
self.args['instance_name'] = rx.sub(' ', self.args['instance_name']).strip().lower().replace(' ', '_')
# instantiate our "database"
try:
# load last known user data from users_fp
with open(self.args['users_fp'], 'r') as j:
self.users = json.load(j)
# push last known user data into HASS
for mac in self.users:
self.expose_to_hass(mac)
except FileNotFoundError:
# if this is a worker, pull down last users from HASS
if self.args['users_fp'] == 'worker_node':
self.users = self.retrieve_users_from_hass()
except Exception as e:
self.error(e)
self.users = {}
# typically for the sake of testing ...
self.listen_event(self.start, event='internal_start')
self.listen_event(self.stop, event='internal_stop')
# define unique event listeners so multiple instances don't triggger each other
self.is_home_trigger = '{}_trigger_ishome'.format(self.args['instance_name'])
self.is_out_trigger = '{}_trigger_isout'.format(self.args['instance_name'])
self.log('Triggers set for node: ishome={0.is_home_trigger}, isout={0.is_out_trigger}'.format(self),
level='DEBUG')
# create event listeners
self.listen_event(self.add_user, event='listener_add_user')
self.listen_event(self.person_ishome, event=self.is_home_trigger)
self.listen_event(self.person_isout, event=self.is_out_trigger)
# can be a placeholder for startup - I like entry/exit points
self.start(event_name=None, data=None, kwargs=None)
def terminate(self):
# can be a placeholder for shutdown - I like entry/exit points
self.stop(event_name=None, data=None, kwargs=None)
def check_scan(self, kwargs):
"""
Ensure tshark is running, process scan and if users are home, fire event is_home_trigger
"""
if not self.tshark_is_running():
self.start_scan(self.args['interface'])
for mac in self.process_scan(self.args['refresh_time']):
if mac in self.users:
now = self.datetime().strftime('%Y-%m-%d %H:%M:%S')
if 'ttl' in self.args:
self.fire_event(self.is_home_trigger, mac=mac, last_seen=now)
else:
self.expose_last_seen(mac=mac, last_seen=now)
def expose_last_seen(self, mac, last_seen):
"""
Expose only the last_seen time to HomeAssistant, leaving the state and other attributes
untouched
"""
attributes = {}
# show the front end where the last call came from - can be useful in some cases
attributes['seen_from_instance'] = self.args['instance_name']
attributes['last_seen'] = last_seen
self.set_state('binary_sensor.{}_presence'.format(self.users[mac]['friendly_name'].lower()),
attributes=attributes)
def add_user(self, event_name, data, kwargs):
"""
Add user with required arguments `mac` and `friendly_name`
"""
try:
friendly_name = data['friendly_name']
# data sanitization
mac = data['mac'].replace(':', '').replace('.', '').replace('-', '')
mac = ':'.join(a+b for a,b in zip(mac[::2], mac[1::2]))
# if the resulting mac address doesn't match what we figure it should look like,
# raise an error and throw a persistent notification up so the users knows they
# dun goofed
if not re.match('(\w{2}:){5}\w{2}', mac):
raise ValueError(mac)
# if friendly_name or mac weren't given as keys
except KeyError as k:
if self.args['users_fp'].endswith('.json'):
self.call_service('persistent_notification/create',
title='[AppDaemon] Something went wrong! :(',
message='APP: Presence Listener\n"{}" was not set when trying to add a user.'.format(k))
return
# they dun goofed
except ValueError as v:
if self.args['users_fp'].endswith('.json'):
self.call_service('persistent_notification/create',
title='[AppDaemon] Something went wrong! :(',
message=('APP: Presence Listener\nMAC ADDRESS "{}" is not in the proper format! '
'Try again with the mac in XX:XX:XX:XX:XX format.'.format(v)))
return
# set mandatory values
self.users[mac] = {
'friendly_name': friendly_name,
'mac': mac
}
# set possible other values
for key in ['location', 'icon']:
try:
self.users[mac][key] = data[key]
except KeyError:
pass
if self.args['users_fp'].endswith('.json'):
self.expose_to_hass(mac=mac)
self.call_service('persistent_notification/create',
title='[AppDaemon] Added New User: {}'.format(friendly_name),
message=('Please add "binary_sensor.{}_presence" to your front end group/view '
'in order to see them!'.format(friendly_name.lower())))
def trigger_person_isout(self, kwargs):
"""
Check the binary_sensor associated with a mac, and if we haven't seen this person since
longer than the TTL value, fire event == is_out_trigger
"""
# check against HomeAssistant, in case the state was updated remotely
# via another instance? via your phone? how ever you would like to supplement this app
name = self.users[kwargs['mac']]['friendly_name'].lower()
location = self.get_state('binary_sensor.{}_presence'.format(name.lower()))
last_seen = self.get_state('binary_sensor.{}_presence'.format(name.lower()), attribute='last_seen')
# create analogous objects
now_dt = datetime.strptime(str(self.datetime()), '%Y-%m-%d %H:%M:%S')
last_seen_dt = datetime.strptime(last_seen, '%Y-%m-%d %H:%M:%S')
if location != 'Out':
if now_dt - last_seen_dt >= timedelta(minutes=int(self.args['ttl'])):
self.fire_event(self.is_out_trigger, mac=kwargs['mac'])
def person_ishome(self, event_name, data, kwargs):
"""
Mark the mac address as "Home", send information to HomeAssistant, start the isout timer
"""
mac = data['mac']
# set the data in the backend "database"
#self.log('{} is marked: Home'.format(self.users[mac]['friendly_name']))
self.users[mac]['last_seen'] = data['last_seen']
self.users[mac]['location'] = 'Home'
self.users[mac]['icon'] = 'mdi:home'
self.expose_to_hass(mac=mac)
try:
# cancel any previous timers, effectively resetting the TTL clock
self.cancel_timer(self.user_timer_library[mac])
except:
pass # I know, I'm literally the devil
# set a timer on the user, running the event trigger for person_isout if it expires
# add it to the users' timer callback library
self.user_timer_library[mac] = self.run_in(self.trigger_person_isout,
seconds=int(self.args['ttl'])*60,
mac=mac)
def person_isout(self, event_name, data, kwargs):
"""
Mark the mac address as "Out" and send information to HomeAssistant
"""
mac = data['mac']
# set the data in the backend "database"
#self.log('{} is marked: Out'.format(self.users[mac]['friendly_name']))
self.users[mac]['location'] = 'Out'
self.users[mac]['icon'] = 'mdi:home-outline'
self.expose_to_hass(mac=mac)
def expose_to_hass(self, mac):
"""
Build and Send a binary_sensor payload to send to HomeAssistant
"""
attributes = {}
# show the front end where the last call came from - can be useful in some cases
attributes['seen_from_instance'] = self.args['instance_name']
for key in ['mac', 'friendly_name', 'last_seen', 'location', 'icon']:
try:
attributes[key] = self.users[mac][key] # right side will cause KeyError if it doesn't exist
except KeyError:
if key == 'friendly_name':
self.error('{} is not specified for user "{}"! Please check your users.json'.format(key, mac))
return
# default values if not specified in the users.json
elif key == 'mac':
attributes[key] = self.users[mac][key] = mac
elif key == 'last_seen':
attributes[key] = self.users[mac][key] = 'Never Seen'
elif key == 'location':
attributes[key] = self.users[mac][key] = 'Out'
elif key == 'icon':
attributes[key] = self.users[mac][key] = 'mdi:home-outline'
self.set_state('binary_sensor.{}_presence'.format(self.users[mac]['friendly_name'].lower()),
state=self.users[mac]['location'],
attributes=attributes)
def retrieve_users_from_hass(self):
"""
Get all binary_sensors from HomeAssistant matching binary_sensor.(\w*)_presence
"""
current_users = {}
# regex to match all current dude_presence entities
binary_sensor_repr = re.compile('binary_sensor.(\w*)_presence')
# get all binary_sensors
for user in self.get_state('binary_sensor'):
# if binary_sensor entity id matches "binary_sensor.dude_presence"
# then add it to the current_users dictionary
if re.match(binary_sensor_repr, user):
user_object = self.get_state(user, attribute='all')
current_users[user_object['attributes']['mac']] = {
"friendly_name": user_object['attributes']['friendly_name'],
"icon": user_object['attributes']['icon'],
"last_seen": user_object['attributes']['last_seen'],
"location": user_object['attributes']['location'],
"mac": user_object['attributes']['mac']
}
return current_users
def start(self, event_name, data, kwargs):
self.log('Starting Presence Scanner ..')
self.clean_up_files(kwargs=None, full_cleanup=True)
if not self.tshark_is_running():
self.start_scan(self.args['interface'])
soon = self.datetime() + timedelta(seconds=15)
later = self.datetime() + timedelta(minutes=5)
self.check_scan_handle = self.run_every(self.check_scan, start=soon, interval=int(self.args['refresh_time']))
self.cleanup_handle = self.run_every(self.clean_up_files, start=later, interval=300)
def stop(self, event_name, data, kwargs):
self.log('Stopping Presence Scanner ..')
try:
self.cancel_timer(self.check_scan_handle)
self.cancel_timer(self.cleanup_handle)
except:
pass # I know, I'm literally the devil
# kill tshark
os.system('pkill -9 tshark')
# on reload or shutdown
# # if this is a worker, re-retrieve the users from HASS
if self.args['users_fp'] == 'worker_node':
self.users = self.retrieve_users_from_hass()
else:
# or save file config
users = ', '.join(self.users[mac]['friendly_name'] for mac in self.users)
self.log('Saving users ({}) to file...'.format(users))
with open(self.args['users_fp'], 'w') as j:
json.dump(self.users, j, indent=4, sort_keys=True)
###############################################
############ WORKING SECTION BELOW ###########
#############################################
def tshark_is_running(self) -> bool:
"""
Return True if tshark is found via ps aux; False if not
"""
ps_output = subprocess.Popen('ps aux'.split(), stdout=subprocess.PIPE)
ps_stdout = ps_output.stdout.read().decode('utf-8')
return 'tshark -i' in ps_stdout# and '[tshark]' not in ps_stdout
def clean_up_files(self, kwargs, full_cleanup=False) -> None:
"""
Delete tshark files in /srv/homeassistant
"""
#self.log('Cleaning up tshark files ...')
filenames = glob.glob('{}*'.format('/srv/homeassistant/tshark'))
if full_cleanup: index = 0
else: index = 1
for idx, filename in enumerate(filenames, start=index):
if not idx == len(filenames): # ie if 0th indexed, all files except the last file
os.remove(filename)
def start_scan(self, wlan) -> None:
"""
Run tshark in listening mode on wlan with the below parameters
-i self.args['interface'] :: specify the interface to use
-I :: Put the interface in "monitor mode"
-b filesize:1000 :: maximally write 1MB to file, then start new file, et al.
-w /srv/homeassistant/tshark :: write raw packet data to outfile
"""
self.log('Starting tshark in write mode ...')
tshark_args = 'tshark -i {} -I -b filesize:1000 -w /srv/homeassistant/tshark'.format(self.args['interface'])
subprocess.Popen(tshark_args.split())
def process_scan(self, time_window) -> list:
"""
Read tshark output files, returning a list of seen mac addresses
"""
# a way to keep track of mac addresses we've already read from the file
seen_mac_addresses = []
try:
# get the latest file to read
filename_to_read = max(filename for filename in glob.iglob('{}*'.format('/srv/homeassistant/tshark')))
except ValueError:
self.log('Started processing scan ... but there are no files yet to process!', level='DEBUG')
return []
# run tshark w/ parameters below
# -r + filename_to_read :: read this file
# -T fields :: create a table with fieldnames
# -e frame.time_epoch :: fieldname: time since epoch
# -e wlan.sa :: fieldname: "source address"
# -e wlan.bssid :: fieldname: "BSS Id"
tshark_args = 'tshark -r {} -T fields -e frame.time_epoch -e wlan.sa -e wlan.bssid'.format(filename_to_read)
cmd = subprocess.Popen(tshark_args.split(), stdout=subprocess.PIPE)
output = cmd.stdout.read().decode('utf-8')
timestamp_threshold = float(time.time()) - float(time_window)
for line in output.splitlines():
try:
timestamp, mac, mac2 = line.split("\t")
if mac == mac2 \
or float(timestamp) < timestamp_threshold \
or not mac:
continue
if mac not in seen_mac_addresses:
seen_mac_addresses.append(mac)
except ValueError as v:
self.log('Nothing to see here: {}\n{}'.format(v, line), level='DEBUG')
except Exception as e:
self.error('Something went terribly wrong: {}\n{}'.format(e, line))
return seen_mac_addresses
#config.json
{
"a0:00:0a:a0:00:0a": {
"friendly_name": "SupahNoob",
}
The only required starting values are a top-level mac address of the device (usually a phone) that you wish to track, as well as a friendly name for said device.
appdaemon.cfg entry
[presence_listener]
module = presence_listener
class = PresenceListener
interface = wlan1
refresh_time = 10
users_fp = /home/homeassistant/.homeassistant/users.json
instance_name = rpi3
ttl = 15
#result
*This needs to be set up in your respective YAML file to appear in a Group.
*This needs to be set up in your respective YAML file to appear in a Group.
#The app does not do any automation based on status. The app only registers states and adds users through an event listener. You will need to write another app to do automations based on users’ states.
#How It Works
An article
… and [Snowden] would also begin to appreciate the enormous scope of the NSA’s surveillance capabilities, an ability to map the movement of everyone in a city by monitoring their MAC address, a unique identifier emitted by every cell phone, computer, and other electronic device.
So yeah, basically that. We sniff listen to MAC Addresses sent out by a device’s WiFI controller. Let’s get to it!
In the initialize
, we read in any users already saved to the json
file, or if the node we’re running this app on isn’t the main one (that is, the one with the json
file on it) then pull down all the current users from HomeAssistant itself. The rest is app setup, various event listeners! While testing, I included entry and exit points hooked up to internal_start
and internal_stop
event types. We then take a sequential step through the core functions of the app below…
##Core
def clean_up_files(full_cleanup=True)
Delete tshark files in /srv/homeassistant.
…tshark creates a rotating log file, similar to python’s logging module. We periodically clean this up in /srv/homeassistant
. If full_cleanup
is True
(on startup/shutdown of app) we clean up all the files, as none are needed.
def thsark_is_running()
Return True if tshark is found via ps aux; False if not
def start_scan(wlan=self.args['interface'])
Run tshark in listening mode on wlan with the below parameters
thsark
-i self.args['interface'] :: specify the interface to use
-I :: Put the interface in "monitor mode"
-b filesize:1000 :: maximally write 1MB to file, then start new file, et al.
-w /srv/homeassistant/tshark :: write raw packet data to outfile
run_every(interval=self.arg['refresh_time'])
def check_scan()
Ensure tshark is running, process scan and if users are home, fire event == is_home_trigger
run_every(interval=300)
def clean_up_files(full_cleanup=False)
def process_scan()
Read tshark output files, returning a list of seen mac addresses
tshark
-r + filename_to_read :: read this file
-T fields :: create a table with fieldnames
-e frame.time_epoch :: fieldname: time since epoch
-e wlan.sa :: fieldname: "source address"
-e wlan.bssid :: fieldname: "BSS Id"
- Get the latest tshark file
- Run tshark with the above parameters
- Append valid MAC addresses to a list
- Return all valid MAC addresses
IMPORTANT NOTE
If you wanted to handle the indication of Home
or Out
on your own – leave off the ttl
value and only the last_seen
attribute will change with each scan! The next section covers the handling logic for how the presence is set within Home Assistant.
#Presence Handling
def person_ishome()
Mark the mac address as “Home”, send information to HomeAssistant, start the isout timer
Exactly like in Tutorial#1, we keep a library of users here. We plan to run the trigger_person_isout
in the self.arg['ttl']
value, however if, on another round of process_scan
our user’s Mac Address shows up again, we’ll cancel the first timer, and add a new timer.
def expose_to_hass()
Build and Send a binary_sensor payload to send to HomeAssistant
This will set default values if they are not specified for the following…
mac = supplied param
last_seen = 'Never Seen'
location = 'Out'
icon = 'mdi:home-outline'
… if friendly_name
was not supplied as a default value, an error is logged. It would probably be more sensible to shoot out a persistent notification here, but this can only be reached if you’ve modified the json
file and if you’re doing this, I assume you’re the developer and regularly check your error logs.
def trigger_person_isout()
Check the binary_sensor associated with a mac, and if we haven’t seen this person since longer than the TTL value, fire event == is_out_trigger
Read in binary_sensor
data from HomeAssistant, in case the state was updated remotely. If location isn’t already Out
, and if now minus the last time we’ve seen is greater than the self.args['ttl']
value, fire the event is_home_trigger
.
This function is important as it is what gives the app it’s modularity. You can run this app on multiple different machines, and as long as they have different instance names, you will have effectively extended the “range” at which you can see devices. This is very handy for large homes or homes that have poor WiFi coverage. If one RPi can’t see at the other end of the house? No problem, just buy another/run AppDaemon
on another device that meets the prerequisites.
def person_isout()
Mark the mac address as “Out” and send information to HomeAssistant
##Bonus functions
def add_user()
Adds users with required arguments mac
and friendly_name
If the necessary arguments are not fulfilled properly, a persistent notification is raised.
MAC Addresses must be in one of the standardized 12-digit hexadecimal formats below to be accepted.
- a0:a0:a0:a0:a0:a0
- a0-a0-a0-a0-a0-a0
- a0a.a0a.a0a.a0a
- a0a0a0a0a0a0
Event Type: listerner_add_user
Event Data: {"friendly_name": "<name>", "mac": "<mac address>"}
def expose_last_seen()
Expose only the last_seen time to HomeAssistant, leaving the state and other attributes untouched
By leaving off the ttl
value, you can take control of your own automations! Only the last time the mac address was seen will be updated in this function.
If you have active phone users in your domain, I’ve found this is a FANTASTIC solution for presence detection. I personally run the app on 2 different Raspberry Pis to give it extra range and combine it with door sensors in order to give instant home/not-home based automations upon arriving home. I find it unlikely that the problem of presence detection can be solved with one trick, so if you integrate this into your setup, I encourage you to do so as a supplement - not the only means of detecting whether or not a user is home.
Happy automating!
- SN