Tapo C310 person detection sensor

Hi all. For what it’s worth, here’s my apporach to getting HA alerts when a person is detected (I have a TP-Link Tapo C320W camerag). My camera only tells me about motion detections, not any other detections. This approach is a bit bare, but it works:

High level overview:
I use a python script to poll my camera for all events that occured, filter for the “person detected” event and send a message to HA using MQTT.

The approach:

  • Make sure you have a MQTT broker running
  • Here’s a nifty method (from the pytapo library, bless their souls) that allows you to read all events from your Tapo Camera that occured between two timeframes.
  • Use a python script to poll the camera events and match on the “person detection” alarm type
  • Use AppDaemon to run the python script (or another way if you have one, but just make sure that the script can access your network)
  • Send a MQTT message whenever the “person detected” event happened.

The code

Be sure to to install the pytapo and paho-mqtt libraries.

here’s the code:

import appdaemon.plugins.hass.hassapi as hass
from paho.mqtt import client as mqtt
from pytapo import Tapo
from datetime import datetime
import time
import json

TRANSPORT = "tcp"
BROKER_ADDRESS = "your-broker-ip"
BROKER_PORT = 1883
BROKER_USERNAME = "broker-username"
BROKER_PASSWORD = "broker-password"
PERSON_DETECTED_ALARM_TYPE = 6
# See https://github.com/JurajNyiri/pytapo/blob/2a28236dfd885947d93f6a98ac1b10cdb0b731d4/pytapo/__init__.py#L677
# To check that `6` is also your camera's `person detected` alarm type, open a python shell, run the code to
# instantiate a tapo client (see code below), let your camera detect a person, call the
# tapo_client.getEvents(startTime=now - 60*10) to see the events for the past 10 minutes and check the last
# event's alarm type ID

ENTITY_ID = "tap_camera" # Give your entity a name
TOPIC= "tapo/person_detected" # change this to your liking
DISCOVERY_TOPIC = f"homeassistant/binary_sensor/{ENTITY_ID}/config"

CAMERA_IP = "camera ip" # make sure this is static
CAMERA_USERNAME = "camera-username"
CAMERA_PASSWORD = "the-camera-password"
INTERVAL_SEC = 5

BIRTH_TOPIC = f"homeassistant/binary_sensor/{ENTITY_ID}/status"
BIRTH_PAYLOAD = "online"
WILL_TOPIC = f"homeassistant/binary_sensor/{ENTITY_ID}/status"
WILL_PAYLOAD = "offline"
UNIQUE_ID = "3e73ef97-470b-5555-6666-f0b59d0552ce" # You can use another unique id here as well, just make sure is
# doesn't change

class TapoEventListener(hass.Hass):

    def initialize(self):
        self.mqtt_client = None
        self.tapo_client = None
        self._setup_clients()
        self._enter_loop()

    def _send_message(self, client, message):
        client.publish(TOPIC, message)

    def _publish_discovery(self, client):
        discovery_payload = {
            "name": "Person Detected",
            "state_topic": TOPIC,
            "payload_on": "detected",
            "payload_off": "clear",
            "device_class": "motion",
            "unique_id": UNIQUE_ID,
            "object_id": UNIQUE_ID,
        }
        # Publish the discovery message to the discovery topic
        client.publish(DISCOVERY_TOPIC, json.dumps(discovery_payload), retain=True)

    def _setup_clients(self):
        # MQTT
        self.log("Creating MQTT client...")
        self.mqtt_client = mqtt.Client()
        self.mqtt_client.username_pw_set(BROKER_USERNAME, BROKER_PASSWORD)
        self.log("Setting will message...")
        self.mqtt_client.will_set(WILL_TOPIC, WILL_PAYLOAD, qos=1, retain=True)
        self.log("Connecting...")
        self.mqtt_client.connect(BROKER_ADDRESS, BROKER_PORT)
        self.log("Publishing discovery payload...")
        self._publish_discovery(self.mqtt_client)
        self.log("Sending birth message...")
        self.mqtt_client.publish(BIRTH_TOPIC, BIRTH_PAYLOAD, qos=1, retain=True)

        # Camera
        self.log("Creating Tapo Camera client...")
        self.tapo_client = Tapo(CAMERA_IP, user=CAMERA_USERNAME, password=CAMERA_PASSWORD)


    def _enter_loop(self):
        self.log("Entering loop...")
        now = int(datetime.timestamp(datetime.now()))
        self.tapo_client.getEvents(startTime=now - INTERVAL_SEC)
        try:
            while True:
                person_detected = False
                events = self.tapo_client.getEvents(startTime=now - INTERVAL_SEC)
                self.mqtt_client.reconnect()
                for event in events:
                    alarm_type = event["alarm_type"]
                    if alarm_type == PERSON_DETECTED_ALARM_TYPE:
                        self.log(f"Person detected!")
                        person_detected = True
                        self._send_message(self.mqtt_client, message="detected")
                        break
                if not person_detected:
                    self._send_message(self.mqtt_client, message="clear")

                time.sleep(INTERVAL_SEC)
                now = int(datetime.timestamp(datetime.now()))
        except Exception as e:
            self.log(f"An error occured:\nError:{e}.\nExiting")
            exit(1)

A few notes:

  • I do not know if the Tapo Camera’s API has a rate limit, but I guess there is one. Have no idea what it is, so be careful with the INTERVAL_SEC variable.
  • This script was not tested rigorously. Any suggested improvements are super welcome.
  • Notifications through HA is obviously not as snappy as those from the Tapo app, but there’s max INTERVAL_SEC ± 1 sec delay from detection to notification.

P.S
If you want to make this robust against camera reboots, you’ll have to update the loop by swapping the while and try-except blocks. Something like:

while True:
    try:
        person_detected = False
        events = self.tapo_client.getEvents(startTime=now - INTERVAL_SEC)
        self.log(f"Events: {events}")
        self.mqtt_client.reconnect()
        for event in events:
            alarm_type = event["alarm_type"]
            if alarm_type == PERSON_DETECTED_ALARM_TYPE:
                self.log(f"Person detected!")
                person_detected = True
                self._send_message(self.mqtt_client, message="detected")
                break
        if not person_detected:
            self._send_message(self.mqtt_client, message="clear")
    except Exception as e:
        self.log(f"An error occured:\nError:{e}")
2 Likes