Publish vision/state code using webcam-test:
import cv2
import paho.mqtt.client as mqtt
import google.generativeai as genai
import json
import os
import time
from dotenv import load_dotenv
import re
# Load environment variables
load_dotenv()
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')
# Ensure the API key is present
if not GEMINI_API_KEY:
print("Error: GEMINI_API_KEY is missing. Please check your environment variables.")
exit(1)
# Configure the Gemini Pro API key
genai.configure(api_key=GEMINI_API_KEY)
# MQTT Broker settings
MQTT_BROKER = '192.168.0.xxx'
MQTT_PORT = 1883
MQTT_USERNAME = 'bla'
MQTT_PASSWORD = 'bma'
# MQTT Topics
GEMINI_PRESENCE_STATE_TOPIC = 'homeassistant/sensor/geminiPresence/state'
GEMINI_AGE_STATE_TOPIC = 'homeassistant/sensor/geminiAge/state'
GEMINI_MOOD_STATE_TOPIC = 'homeassistant/sensor/geminiMood/state'
# Initialize and connect to MQTT broker
client = mqtt.Client()
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.connect(MQTT_BROKER, MQTT_PORT, 60)
def analyze_frame(frame):
"""Analyze the given frame using the Gemini Pro API."""
model = genai.GenerativeModel('gemini-pro-vision')
ret, buffer = cv2.imencode('.jpg', frame)
if not ret:
print("Failed to encode image")
return None, None, None
image_data = {
'mime_type': 'image/jpeg',
'data': buffer.tobytes()
}
contents = [
"Your task is to analyse the amount of people you see, calculate the average age, "
"and describe the mood in the scene. You can only reply with: count_of_people = number, "
"average_age = number, and mood = Happy,Sad,Neutral,Drunk,Psycho,Crying,High",
image_data
]
try:
response = model.generate_content(contents=contents)
response.resolve()
if not response.parts:
feedback = response.prompt_feedback or "No candidates returned."
print(f"Error or prompt blocked: {feedback}")
return None, None, None
# Extract count of people, average age, and mood from response
count_of_people = extract_count_from_response(response.text)
average_age = extract_age_from_response(response.text)
mood = extract_mood_from_response(response.text)
return count_of_people, average_age, mood
except Exception as e:
print(f"Error during API call: {e}")
return None, None, None
def extract_count_from_response(text):
# Use regular expression to find the count of people
match = re.search(r'count_of_people = (\d+)', text)
if match:
return int(match.group(1))
else:
print("Could not extract count of people")
return 0
def extract_age_from_response(text):
# Use regular expression to find the average age
match = re.search(r'average_age = (\d+)', text)
if match:
return int(match.group(1))
else:
print("Could not extract average age")
return 0
def extract_mood_from_response(text):
# Logic to extract mood from the response
mood_match = re.search(r'mood = (\w+)', text)
if mood_match:
return mood_match.group(1)
else:
print("Could not extract mood")
return "neutral" # Default mood if not detected
def publish_sensor_states(count_of_people, average_age, mood):
"""Publish the sensor states to the MQTT broker."""
if count_of_people is None or average_age is None or mood is None:
print("No valid data to publish")
return
presence_state_payload = {"count": count_of_people}
age_state_payload = {"average_age": average_age}
mood_state_payload = {"mood": mood}
print(f"Publishing to MQTT: {presence_state_payload}, {age_state_payload}, {mood_state_payload}")
client.publish(GEMINI_PRESENCE_STATE_TOPIC, json.dumps(presence_state_payload), qos=0, retain=True)
client.publish(GEMINI_AGE_STATE_TOPIC, json.dumps(age_state_payload), qos=0, retain=True)
client.publish(GEMINI_MOOD_STATE_TOPIC, json.dumps(mood_state_payload), qos=0, retain=True)
def capture_and_analyze():
"""Capture video feed from the default camera, analyze it, and update sensor states."""
cap = cv2.VideoCapture(0) # 0 for the default webcam
try:
while True:
ret, frame = cap.read()
if not ret:
print("Failed to read frame from camera")
break
print("Analyzing frame...")
count_of_people, average_age, mood = analyze_frame(frame)
if count_of_people is not None and average_age is not None and mood is not None:
print(f"Detected {count_of_people} people with an average age of {average_age} and mood: {mood}")
publish_sensor_states(count_of_people, average_age, mood)
else:
print("No valid data detected in frame")
time.sleep(1) # Sleep time between captures, can be adjusted
finally:
cap.release()
print("Camera released")
if __name__ == "__main__":
capture_and_analyze()