RF 433 MHZ PIR motion sensor Raspberry PI configuration

Hi, All

I’m new to home assistant. I have a generic RF 433 MHZ PIR motion sensor, I want to configure when it detect motion it will trigger light switch to turn on.

I’ve gone through the forum and cookbook and couldn’t able get it work.

Do anyone have experience with the same configuration?

Thanks.

It is not clear what you are using to receive the 433MHz signals. Are you using something like this, or something else?

I run my home-assistant on Raspberry Pi and connect RF 433 Mhz transmitter and receiver module to transmitte/receive the signal. I’ve managed configure RF 433 Mhz power outlet switch device. But for PIR motion sensor only emitted single RF code so I don’t know how to configure in home assistant to trigger other device in automation.

I’m still not sure how you get the data from the pin, but I am guessing its by running some terminal command from a prompt, in which case you need to define command line binary sensor in HA, using the same command.

If you show how you are getting the data from the sensor, I can be of more help.

Hi Graham,

I used python rf snippet from below link. I did modified the script to tell me when my PIR motion detect send out the code.
https://raw.githubusercontent.com/milaq/rpi-rf/master/scripts/rpi-rf_receive

Ahh. This code needs to run all the time, so its not suitable for a command line sensor, which runs a command every time it wants to get data.

I searched on the site to see if anyone had used this library before, but I couldn’t see anything simple, so you’ll have to do it yourself, which is still relatively straightforward.

What needs to happen is that you create a binary sensor to receive data in HA, and modify this script to send data to the sensor.

I know of two ways to do this, using MQTT and using http. MQTT is more flexible and generic, but also more complex to set up, so for now I suggest you use a http binary sensor.

I have never actually used one, (I use mqtt mostly), but I think the modifications to the script will be something like


#!/usr/bin/env python3

import argparse
import signal
import sys
import time
import logging
import requests       ### new
import json           ### new

from rpi_rf import RFDevice

rfdevice = None

### new 
url = 'http://localhost:8123/api/states/binary_sensor.radio'
header = "{'x-ha-access': 'YOUR_PASSWORD', 'content-type': 'application/json'}"
# I think you can leave aout the x-ha-access section if you don't have a password

# pylint: disable=unused-argument
def exithandler(signal, frame):
    rfdevice.cleanup()
    sys.exit(0)

logging.basicConfig(level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S',
                    format='%(asctime)-15s - [%(levelname)s] %(module)s: %(message)s', )

parser = argparse.ArgumentParser(description='Receives a decimal code via a 433/315MHz GPIO device')
parser.add_argument('-g', dest='gpio', type=int, default=27,
                    help="GPIO pin (Default: 27)")
args = parser.parse_args()

signal.signal(signal.SIGINT, exithandler)
rfdevice = RFDevice(args.gpio)
rfdevice.enable_rx()
timestamp = None
logging.info("Listening for codes on GPIO " + str(args.gpio))
while True:
    if rfdevice.rx_code_timestamp != timestamp:
        timestamp = rfdevice.rx_code_timestamp
        logging.info(str(rfdevice.rx_code) +
                     " [pulselength " + str(rfdevice.rx_pulselength) +
                     ", protocol " + str(rfdevice.rx_proto) + "]")
        ### new
        if rfdevice.rx_code == oncode:   ## you need to determine the code for ON that is returned
            requests.post(url, headers=header, data=json.dumps({'state': 'on', 'attributes': {'friendly_name': 'Motion Detector'}}))
        elif rfdevice.rx_code == offcode:
            requests.post(url, headers=header, data=json.dumps({'state': 'off', 'attributes': {'friendly_name': 'Motion Detector'}}))

    time.sleep(0.01)
rfdevice.cleanup()

But this isn’t tested at all, so probably has some errors, which you will have to find. I suggest you use curl to test the binary sensor as suggested in the docs before you try this code.

And eventually, you will need to make this code run at boot, by putting it in /etc/rc.local, but save that for another post.

Thanks Graham, I will have a crack with it.

Hi Graham,

I’ve managed get it work with MQTT. Once again thanks for your direction.

MQTT is a good way to go. Glad to help.

how did you do that?

Hi Claudio,

I modified the python script suggest by Graham, instead send the message to binary sensor using http I used paho-mqtt python library to publish the message.

i.e publish.single(“hass/sensor/motion_sensor_1”,“ON”,hostname=“localhost”, auth=auth)

If you want in details, I can share my configuration to you.

Viet Luu

1 Like

for future reference i can give you a third option.
let the script write the values to a small file and let HA read the value from the file.
it has the same advantage as MQTT (the last sensorvalues are Always known at restart) but there is no need to set up a MQTT server for it.

yes please. I am a newb still trying to understan it all. I do have mqtt (mosquitto) installed in the same pi3 where I ran HASS.
On the same pi3 I have a superetherodyne txrx but which I am not able to make it work though.

My goal is to use cheaper 433RF PIR sensor to give motion detection signal to HASS and use it for automation.

Help appreciated

Hi Claudio,

Look like your goal is same as mine. I installed mqtt (mosquitto) in the same pi and got superetherodyne transmitter and receiver.

Anyway below is my code:

  1. Define binary sensor => binary_sensors.yaml
    ##########################################
    • platform: mqtt
      state_topic: “hass/sensor/motion_sensor_1”
      name: “Backyard_Light_Motion”
      qos: 0
      payload_on: “ON”
      payload_off: “OFF”
      sensor_class: motion

##########################################

2.Create automation =>

###########################################

alias: "Motion backyard light during night time"
trigger:
  platform: state
  entity_id: binary_sensor.backyard_light_motion
  from: "off"
  to: "on"
condition:
  - condition: time
    after: '19:00:00'
    before: '5:00:00'
action:
  service: script.motion_backyard_light

###########################################

3.Script trigger turn on => motion_backyard_light.yaml

###########################################
sequence:
- service: script.turn_off
data:
entity_id: script.motion_backyard_light_timer
- service: switch.turn_on
data:
entity_id: switch.courtyard_light
- service: script.turn_on
data:
entity_id: script.motion_backyard_light_timer

###########################################

4.Script timer to turn off => motion_backyard_light_timer.yaml

###########################################
sequence:
- delay:
minutes: 1
- service: switch.turn_off
data:
entity_id: switch.courtyard_light
- service: mqtt.publish
data:
topic: “hass/sensor/motion_sensor_1”
payload: “OFF”
retain: “true”

###########################################

5.Install paho-mqtt

pip3 install paho-mqtt

6.rf signal detection (this script need to run all the time): #You need find update your pir motion rf code also, your topic name

#!/usr/bin/env python3

import argparse
import signal
import sys
import time
import logging
import paho.mqtt.publish as publish

auth = {
   'username':"admin",
   'password':"m@ytao11223344556677"
}

from rpi_rf import RFDevice

rfdevice = None

# pylint: disable=unused-argument
def exithandler(signal, frame):
    rfdevice.cleanup()
    sys.exit(0)

logging.basicConfig(level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S',
                    format='%(asctime)-15s - [%(levelname)s] %(module)s: %(message)s', )

parser = argparse.ArgumentParser(description='Receives a decimal code via a 433/315MHz GPIO device')
parser.add_argument('-g', dest='gpio', type=int, default=27,
                    help="GPIO pin (Default: 27)")
args = parser.parse_args()

signal.signal(signal.SIGINT, exithandler)
rfdevice = RFDevice(args.gpio)
rfdevice.enable_rx()
timestamp = None
logging.info("Listening for codes on GPIO " + str(args.gpio))
i=0
while True:
    if rfdevice.rx_code_timestamp != timestamp:
        timestamp = rfdevice.rx_code_timestamp
        if str(rfdevice.rx_code) == '4654441':
           i = i+1
           logging.info("Detect code " + str(rfdevice.rx_code) + " i=" + str(i))
        if i == 10:
           publish.single("hass/sensor/motion_sensor_1","ON",hostname="localhost", auth=auth)
           i = 0
           time.sleep(60)
    time.sleep(0.01)
rfdevice.cleanup()
2 Likes

Thanks a lot.

Is it all working fine?
My superetherodyne connected directly to the pi is not working (codes are not registered by two 433RF remotes that I used.
As signal detector code I used 2 different from yours, now I am going to try yours.

On some forum are suggesting to step down Data voltage from 5v (of transceiver) to 3,3v of pi3

QUOTE rf signal detection (this script need to run all the time): #You need find update your pir motion rf code also, your topic name

yes but how to I find/update my pir motion code …

as a side note I also have this error (at the moment the code is a fake one since I did not manage to sniff the codes)

switch 5:
  platform: rpi_rf
  gpio: 17
  switches:
    pir_entrance:
      code_on: 1234567
      code_off: 1234568


17-02-11 11:01:51 ERROR (MainThread) [homeassistant.components.switch] Error while setting up platform rpi_rf
Traceback (most recent call last):
  File "/srv/hass/hass_venv/lib/python3.4/site-packages/homeassistant/helpers/entity_component.py", line 151, in _async_setup_platform
    entity_platform.add_entities, discovery_info
  File "/usr/lib/python3.4/asyncio/futures.py", line 388, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.4/asyncio/tasks.py", line 286, in _wakeup
    value = future.result()
  File "/usr/lib/python3.4/asyncio/futures.py", line 277, in result
    raise self._exception
  File "/usr/lib/python3.4/concurrent/futures/thread.py", line 54, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/srv/hass/hass_venv/lib/python3.4/site-packages/homeassistant/components/switch/rpi_rf.py", line 68, in setup_platform
    rfdevice.enable_tx()
  File "/home/hass/.homeassistant/deps/rpi_rf/rpi_rf.py", line 77, in enable_tx
    GPIO.setup(self.gpio, GPIO.OUT)
RuntimeError: No access to /dev/mem.  Try running as root!

You can run the rf code sniffer python script in the link below:

No code is showing

Hi all,
First i would like to thank you all for the py script.
I’ve done some changes.

#!/usr/bin/env python3

import argparse
import signal
import sys
import time
import logging
import paho.mqtt.client as mqtt
import RPi.GPIO

auth = {
   'username':"mosquittouser",
   'password':"pass"
}

mqtt_server_url = "ip"

from rpi_rf import RFDevice

# declare rf PIR
pir_hol = '13981013'
pir_camera = '7689557'

from datetime import datetime, timedelta

# topic dictionary
dict = {pir_hol: 'pir_hol', pir_camera: 'pir_camera'}

dictTimestamp = {pir_hol: datetime.now(), pir_camera: datetime.now()}
dictFiredEventCount = {pir_hol: 0, pir_camera: 0}
rfdevice = None

# pylint: disable=unused-argument
def exithandler(signal, frame):
    rfdevice.cleanup()
    sys.exit(0)

def on_connect(client, userdata, flags, rc):
    m="Connected flags"+str(flags)+"result code "\
    +str(rc)+"client1_id  "+str(client)
    print(m)

def on_message(client1, userdata, message):
    print("message received  "  ,str(message.payload.decode("utf-8")))


publish = mqtt.Client()
publish.username_pw_set("mosquittouser", "homerulz")
publish.on_connect=on_connect        #attach function to callback
publish.on_message=on_message        #attach function to callback
#publish.loop_forever()
#publish.connect(mqtt_server_url,auth)      #connect to broker
#publish.loop_start() 

logging.basicConfig(level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S',
                    format='%(asctime)-15s - [%(levelname)s] %(module)s: %(message)s', )

parser = argparse.ArgumentParser(description='Receives a decimal code via a 433/315MHz GPIO device')
parser.add_argument('-g', dest='gpio', type=int, default=27,
                    help="GPIO pin (Default: 27)")
args = parser.parse_args()

signal.signal(signal.SIGINT, exithandler)
rfdevice = RFDevice(args.gpio)
rfdevice.enable_rx()
timestamp = None
pirCode = "0"
logging.info("Listening for codes on GPIO " + str(args.gpio))
i=0
while True:
    try:
      if rfdevice.rx_code_timestamp != timestamp:
          timestamp = rfdevice.rx_code_timestamp
          pirCode = str(rfdevice.rx_code)
          if pirCode == pir_hol or pirCode == pir_camera:
             dictFiredEventCount[pirCode] = dictFiredEventCount[pirCode] + 1
             logging.info("Detect code " + str(rfdevice.rx_code) + " i=" + str(dictFiredEventCount[pirCode]))
          if dictFiredEventCount[pirCode] == 3 and dictTimestamp[pirCode] < datetime.now():
             publish.connect(mqtt_server_url,1883)		   
             publish.publish("pir/sensor/" + dict[pirCode],"ON")
             dictFiredEventCount[pirCode] = 0
             dictTimestamp[pirCode]+= timedelta(seconds=60)
             publish.publish("pir/sensor/" + dict[pirCode],"OFF")
             publish.disconnect()
             logging.info("Sleep for code " + str(rfdevice.rx_code) +  str(dictTimestamp[pirCode]))
          elif dictFiredEventCount[pirCode] >= 3 :
             dictFiredEventCount[pirCode] = 0
             logging.info("Count reset to 0")
      time.sleep(0.01)
    except KeyError:
      print("Could not convert data to an integer.")
    except SystemExit:
      print("Stopping py")
      sys.exit()
    except:
      print("Unexpected error:", sys.exc_info()[0])
rfdevice.cleanup()