"""
Component that will perform object detection and identification via deepstack.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/image_processing.deepstack_object
"""
from collections import namedtuple
import datetime
import io
import logging
import os
import re
from datetime import timedelta
from typing import Tuple
from pathlib import Path
from PIL import Image, ImageDraw
import deepstack.core as ds
import homeassistant.helpers.config_validation as cv
import homeassistant.util.dt as dt_util
import voluptuous as vol
from homeassistant.util.pil import draw_box
from homeassistant.components.image_processing import (
ATTR_CONFIDENCE,
CONF_ENTITY_ID,
CONF_NAME,
CONF_SOURCE,
DOMAIN,
PLATFORM_SCHEMA,
ImageProcessingEntity,
)
from homeassistant.const import (
ATTR_ENTITY_ID,
ATTR_LAST_TRIP_TIME,
ATTR_NAME,
CONF_IP_ADDRESS,
CONF_PORT,
CONF_COUNT,
HTTP_BAD_REQUEST,
HTTP_OK,
HTTP_UNAUTHORIZED,
)
from homeassistant.core import split_entity_id
_LOGGER = logging.getLogger(__name__)
CONF_API_KEY = "api_key"
CONF_TARGETS = "targets"
CONF_TIMEOUT = "timeout"
CONF_SAVE_FILE_FOLDER = "save_file_folder"
CONF_SAVE_TIMESTAMPTED_FILE = "save_timestamped_file"
CONF_SAVE_CROPPED_FILE = "save_cropped_file"
CONF_SAVE_LABEL_DATA = "save_label_data"
CONF_SAVE_BLANKS = "save_blanks"
CONF_FACE_DETECT = "face_detect"
CONF_FACE_RECOGNIZE = "face_recognize"
CONF_SHOW_BOXES = "show_boxes"
CONF_ROI_Y_MIN = "roi_y_min"
CONF_ROI_X_MIN = "roi_x_min"
CONF_ROI_Y_MAX = "roi_y_max"
CONF_ROI_X_MAX = "roi_x_max"
DATETIME_FORMAT = "%Y-%m-%d_%H-%M-%S"
DEFAULT_API_KEY = ""
DEFAULT_TARGETS = ["person"]
DEFAULT_TIMEOUT = 10
DEFAULT_ROI_Y_MIN = 0.0
DEFAULT_ROI_Y_MAX = 1.0
DEFAULT_ROI_X_MIN = 0.0
DEFAULT_ROI_X_MAX = 1.0
DEFAULT_ROI = (
DEFAULT_ROI_Y_MIN,
DEFAULT_ROI_X_MIN,
DEFAULT_ROI_Y_MAX,
DEFAULT_ROI_X_MAX,
)
BOX = "box"
FILE = "file"
OBJECT = "object"
CLASSIFIER = "deepstack_face"
DATA_DEEPSTACK = "deepstack_classifiers"
FILE_PATH = "file_path"
SERVICE_TEACH_FACE = "deepstack_teach_face"
# rgb(red, green, blue)
RED = (255, 0, 0) # For objects within the ROI
GREEN = (0, 255, 0) # For ROI box
YELLOW = (255, 255, 0) # For objects outside the ROI
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_IP_ADDRESS): cv.string,
vol.Required(CONF_PORT): cv.port,
vol.Optional(CONF_API_KEY, default=DEFAULT_API_KEY): cv.string,
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
vol.Optional(CONF_TARGETS, default=DEFAULT_TARGETS): vol.All(
cv.ensure_list, [cv.string]
),
vol.Optional(CONF_ROI_Y_MIN, default=DEFAULT_ROI_Y_MIN): cv.small_float,
vol.Optional(CONF_ROI_X_MIN, default=DEFAULT_ROI_X_MIN): cv.small_float,
vol.Optional(CONF_ROI_Y_MAX, default=DEFAULT_ROI_Y_MAX): cv.small_float,
vol.Optional(CONF_ROI_X_MAX, default=DEFAULT_ROI_X_MAX): cv.small_float,
vol.Optional(CONF_SAVE_FILE_FOLDER): cv.isdir,
vol.Optional(CONF_SAVE_TIMESTAMPTED_FILE, default=False): cv.boolean,
vol.Optional(CONF_SAVE_CROPPED_FILE, default=False): cv.boolean,
vol.Optional(CONF_SAVE_LABEL_DATA, default=False): cv.boolean,
vol.Optional(CONF_SAVE_BLANKS, default=False): cv.boolean,
vol.Optional(CONF_FACE_DETECT, default=False): cv.boolean,
vol.Optional(CONF_FACE_RECOGNIZE, default=False): cv.boolean,
vol.Optional(CONF_SHOW_BOXES, default=True): cv.boolean,
}
)
SERVICE_TEACH_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_ENTITY_ID): cv.entity_ids,
vol.Required(ATTR_NAME): cv.string,
vol.Required(FILE_PATH): cv.string,
}
)
Box = namedtuple("Box", "y_min x_min y_max x_max")
Point = namedtuple("Point", "y x")
def point_in_box(box: Box, point: Point) -> bool:
"""Return true if point lies in box"""
if (box.x_min <= point.x <= box.x_max) and (box.y_min <= point.y <= box.y_max):
return True
return False
def object_in_roi(roi: dict, centroid: dict) -> bool:
"""Convenience to convert dicts to the Point and Box."""
target_center_point = Point(centroid["y"], centroid["x"])
roi_box = Box(roi["y_min"], roi["x_min"], roi["y_max"], roi["x_max"])
return point_in_box(roi_box, target_center_point)
def get_valid_filename(name: str) -> str:
return re.sub(r"(?u)[^-\w.]", "", str(name).strip().replace(" ", "_"))
def get_objects(predictions: list, prediction_type: str, img_width: int, img_height: int):
"""Return objects with formatting and extra info."""
objects = []
decimal_places = 3
for pred in predictions:
box_width = pred["x_max"] - pred["x_min"]
box_height = pred["y_max"] - pred["y_min"]
box = {
"height": round(box_height / img_height, decimal_places),
"width": round(box_width / img_width, decimal_places),
"y_min": round(pred["y_min"] / img_height, decimal_places),
"x_min": round(pred["x_min"] / img_width, decimal_places),
"y_max": round(pred["y_max"] / img_height, decimal_places),
"x_max": round(pred["x_max"] / img_width, decimal_places),
}
box_area = round(box["height"] * box["width"], decimal_places)
centroid = {
"x": round(box["x_min"] + (box["width"] / 2), decimal_places),
"y": round(box["y_min"] + (box["height"] / 2), decimal_places),
}
if not "label" in pred.keys():
if not "userid" in pred.keys():
name = "face_unknown"
else:
name = "face_{}".format(pred["userid"])
else:
name = pred["label"]
confidence = round(pred["confidence"] * 100, decimal_places)
objects.append(
{
"bounding_box": box,
"box_area": box_area,
"centroid": centroid,
"name": name,
"prediction_type": prediction_type,
"confidence": confidence,
}
)
return objects
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the classifier."""
save_file_folder = config.get(CONF_SAVE_FILE_FOLDER)
if save_file_folder:
save_file_folder = Path(save_file_folder)
if DATA_DEEPSTACK not in hass.data:
hass.data[DATA_DEEPSTACK] = []
targets = [t.lower() for t in config[CONF_TARGETS]] # ensure lower case
entities = []
for camera in config[CONF_SOURCE]:
object_entity = ObjectClassifyEntity(
config.get(CONF_IP_ADDRESS),
config.get(CONF_PORT),
config.get(CONF_API_KEY),
config.get(CONF_TIMEOUT),
targets,
config.get(ATTR_CONFIDENCE),
config[CONF_ROI_Y_MIN],
config[CONF_ROI_X_MIN],
config[CONF_ROI_Y_MAX],
config[CONF_ROI_X_MAX],
config[CONF_SHOW_BOXES],
save_file_folder,
config.get(CONF_SAVE_TIMESTAMPTED_FILE),
config.get(CONF_SAVE_CROPPED_FILE),
config.get(CONF_SAVE_LABEL_DATA),
config.get(CONF_SAVE_BLANKS),
config.get(CONF_FACE_DETECT),
config.get(CONF_FACE_RECOGNIZE),
camera.get(CONF_ENTITY_ID),
camera.get(CONF_NAME),
)
entities.append(object_entity)
hass.data[DATA_DEEPSTACK].append(object_entity)
add_devices(entities)
def service_handle(service):
"""Handle for services."""
_LOGGER.debug("Deepstack handling teaching")
entity_ids = service.data.get("entity_id")
classifiers = hass.data[DATA_DEEPSTACK]
if entity_ids:
classifiers = [c for c in classifiers if c.entity_id in entity_ids]
for classifier in classifiers:
name = service.data.get(ATTR_NAME)
file_path = service.data.get(FILE_PATH)
classifier.teach(name, file_path)
break
hass.services.register(
DOMAIN, SERVICE_TEACH_FACE, service_handle, schema=SERVICE_TEACH_SCHEMA
)
class ObjectClassifyEntity(ImageProcessingEntity):
"""Perform a face classification."""
def __init__(
self,
ip_address,
port,
api_key,
timeout,
targets,
confidence,
roi_y_min,
roi_x_min,
roi_y_max,
roi_x_max,
show_boxes,
save_file_folder,
save_timestamped_file,
save_cropped_file,
save_label_data,
save_blanks,
face_detect,
face_recognize,
camera_entity,
name=None,
):
"""Init with the API key and model id."""
super().__init__()
self._dsobject = ds.DeepstackObject(ip_address, port, api_key, timeout)
self._dsface = ds.DeepstackFace(ip_address, port, api_key, timeout)
self._targets = targets
self._confidence = confidence
self._camera = camera_entity
if name:
self._name = name
else:
camera_name = split_entity_id(camera_entity)[1]
self._name = "deepstack_object_{}".format(camera_name)
_LOGGER.debug("Deepstack object camera initializing as {}".format(camera_name))
self._state = None
self._objects = [] # The parsed raw data
self._targets_found = []
self._summary = {}
self._matched = {}
self._faces = []
self._total_faces = None
self._roi_dict = {
"y_min": roi_y_min,
"x_min": roi_x_min,
"y_max": roi_y_max,
"x_max": roi_x_max,
}
self._show_boxes = show_boxes
self._last_detection = None
self._image_width = None
self._image_height = None
self._save_file_folder = save_file_folder
self._save_timestamped_file = save_timestamped_file
self._save_cropped_file = save_cropped_file
self._save_label_data = save_label_data
self._save_blanks = save_blanks
self._face_detect = face_detect
self._face_recognize = face_recognize
def process_image(self, image):
"""Process an image."""
self._image_width, self._image_height = Image.open(
io.BytesIO(bytearray(image))
).size
self._state = None
self._objects = [] # The parsed raw data
self._targets_found = []
self._summary = {}
_LOGGER.debug("Deepstack detection triggering on ")
try:
self._dsobject.detect(image)
except ds.DeepstackException as exc:
_LOGGER.critical("Deepstack error : %s", exc)
return
predictions = self._dsobject.predictions.copy()
if predictions:
_LOGGER.info("Deepstack object detection complete on {} with results {}".format(self._name, predictions))
else:
_LOGGER.debug("Deepstack object detection complete on {} with no results".format(self._name))
self._summary = ds.get_objects_summary(predictions)
self._objects = get_objects(predictions, "object", self._image_width, self._image_height)
self._targets_found = [
obj
for obj in self._objects
if (obj["name"] in self._targets)
and (obj["confidence"] > self._confidence)
and (object_in_roi(self._roi_dict, obj["centroid"]))
]
self._state = len(self._targets_found)
detection_time = dt_util.now().strftime(DATETIME_FORMAT)
if (self._state > 0):
self._last_detection = detection_time
if (self._face_recognize or self._face_detect):
try:
if self._face_recognize:
self._dsface.recognise(image)
else:
self._dsface.detect(image)
except ds.DeepstackException as exc:
_LOGGER.error("Depstack error : %s", exc)
return
face_predictions = self._dsface.predictions.copy()
if len(face_predictions) > 0:
face_objects = get_objects(face_predictions, "face", self._image_width, self._image_height)
# filter out 0% probability
for obj in face_objects:
if (obj["confidence"] > 0):
self._faces.append(obj)
if len(self._faces) > 0:
_LOGGER.info("Deepstack face detection complete on {} with results {}".format(self._name, self._faces))
self._last_detection = detection_time
else:
_LOGGER.debug("Deepstack face detection complete on {} with no results".format(self._name))
self._total_faces = len(self._faces)
self._matched = ds.get_recognised_faces(face_predictions)
if len(self._matched) > 0:
_LOGGER.info("Deepstack recognized faces {}".format(self._matched))
else:
self._total_faces = None
self._matched = {}
if self._save_file_folder:
_LOGGER.debug("Saving images")
self.save_image(
image, self._objects + self._faces, self._targets, self._save_file_folder, detection_time,
)
# Fire events
objnum=0
#for target in self._targets_found:
for obj in self._objects + self._faces:
objnum=objnum+1
if (len(self._targets) and (obj["name"] not in self._targets) and not obj["name"].startswith("face_")) or (obj["confidence"] < self._confidence) or (not object_in_roi(self._roi_dict, obj["centroid"])):
continue
target_event_data = obj.copy()
target_event_data[ATTR_ENTITY_ID] = self.entity_id
target_event_data[ATTR_LAST_TRIP_TIME] = detection_time
target_event_data[CONF_COUNT] = objnum
if ("face" in target_event_data["prediction_type"]):
target_event_data["recognized"] = self._matched
target_event_data["recognized_count"] = len(self._matched)
target_event_type = "deepstack.{}_detected".format(target_event_data["prediction_type"])
self.hass.bus.fire(target_event_type, target_event_data)
_LOGGER.debug("Deepstack event fired {}".format(target_event_data))
_LOGGER.debug("Deepstack events fired")
def teach(self, name: str, file_path: str):
"""Teach classifier a face name."""
_LOGGER.debug("Deepstack teaching")
if not self.hass.config.is_allowed_path(file_path):
return
with open(file_path, "rb") as image:
self._dsface.register_face(name, image)
_LOGGER.info("Depstack face taught name : %s", name)
@property
def camera_entity(self):
"""Return camera entity id from process pictures."""
return self._camera
@property
def state(self):
"""Return the state of the entity."""
return self._state
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def unit_of_measurement(self):
"""Return the unit of measurement."""
return "targets"
@property
def should_poll(self):
"""Return the polling state."""
return False
@property
def device_state_attributes(self):
"""Return device specific state attributes."""
attr = {}
for target in self._targets:
attr[f"ROI {target} count"] = len(
[t for t in self._targets_found if t["name"] == target]
)
attr[f"ALL {target} count"] = len(
[t for t in self._objects if t["name"] == target]
)
if self._last_detection:
attr["last_target_detection"] = self._last_detection
attr["summary"] = self._summary
attr["objects"] = self._objects
if self._face_detect:
attr[CONF_FACE_DETECT] = self._face_detect
if self._face_recognize:
attr[CONF_FACE_RECOGNIZE] = self._face_recognize
if self._face_recognize:
attr["total_matched_faces"] = len(self._matched)
attr["matched_faces"] = self._matched
return attr
def save_image(self, image, crops, targets, directory, stamp):
"""Draws the actual bounding box of the detected objects."""
try:
img = Image.open(io.BytesIO(bytearray(image))).convert("RGB")
imgo = Image.open(io.BytesIO(bytearray(image))).convert("RGB")
imgc = Image.open(io.BytesIO(bytearray(image))).convert("RGB")
except UnidentifiedImageError:
_LOGGER.warning("Deepstack unable to process image, bad data")
return
draw = ImageDraw.Draw(img)
roi_tuple = tuple(self._roi_dict.values())
if roi_tuple != DEFAULT_ROI and self._show_boxes:
draw_box(
draw, roi_tuple, img.width, img.height, text="ROI", color=GREEN,
)
objnum=1
for obj in crops:
name = obj["name"]
confidence = obj["confidence"]
box = obj["bounding_box"]
prediction_type = obj["prediction_type"]
centroid = obj["centroid"]
box_label = f"{name}: {confidence:.1f}%"
if self._save_cropped_file:
_LOGGER.debug("Box {}".format(box))
imc = imgc.crop((box["x_min"] * img.width, box["y_min"] * img.height, box["x_max"] * img.width, box["y_max"] * img.height))
if self._save_timestamped_file:
crop_save_path = directory / f"{self._name}_{stamp}_{prediction_type}_{name}_{objnum}.jpg"
imc.save(crop_save_path)
crop_save_path = directory / f"{self._name}_latest_{prediction_type}_{name}.jpg"
imc.save(crop_save_path)
objnum=objnum+1
if self._save_label_data:
label_path = directory / f"labels.csv"
with open(label_path,"a+") as f:
timestamp_save_path = directory / f"{self._name}_{stamp}_nobox.jpg"
f.write("{},{},{},{},{},{},{}\n".format(timestamp_save_path,int(box["x_min"] * img.width), int(box["y_min"] * img.height), int(box["x_max"] * img.width), int (box["y_max"] * img.height),name, confidence))
if not self._show_boxes:
continue
if object_in_roi(self._roi_dict, centroid):
box_colour = RED
else:
box_colour = YELLOW
draw_box(
draw,
(box["y_min"], box["x_min"], box["y_max"], box["x_max"]),
img.width,
img.height,
text=box_label,
color=box_colour,
)
# draw bullseye
draw.text(
(centroid["x"] * img.width, centroid["y"] * img.height),
text="X",
fill=box_colour,
)
if (len(crops) > 0 or self._save_blanks):
suffix = ""
if (self._save_blanks and not len(crops) > 0):
suffix="_blank"
if self._show_boxes:
latest_save_path = (
directory / f"{get_valid_filename(self._name).lower()}_latest_box{suffix}.jpg"
)
img.save(latest_save_path)
latest_save_path = directory / f"{get_valid_filename(self._name).lower()}_latest_nobox{suffix}.jpg"
imgo.save(latest_save_path)
if self._save_timestamped_file:
if self._show_boxes:
timestamp_save_path = directory / f"{self._name}_{stamp}_box{suffix}.jpg"
img.save(timestamp_save_path)
timestamp_save_path = directory / f"{self._name}_{stamp}_nobox{suffix}.jpg"
imgo.save(timestamp_save_path)