Hi,
I am trying to communicate with an external microcontroller via i2c. I am developing my custom component that does it. If I check the status of a specific binary_sensor using pooling then everything is ok. But I want to know about the change right away so I want to use the interrupt to generate the event. I use one of Raspberry Pi 3 port for this. And this part of my component is working properly. Unfortunately, in Hassio you cannot see the entity that I added and I don’t know how to update the state of a specific binary_sensor in the method caused by interrupt.
To be more precise: in the method it reads through I2C which pin has changed state, to know which one to update.
My code below. Can any of the more experienced colleagues help me?
"""Platform for sensor integration."""
import logging
import voluptuous as vol
from RPi import GPIO
from homeassistant.components.binary_sensor import ( PLATFORM_SCHEMA, BinarySensorDevice)
from homeassistant.components import rpi_gpio
from homeassistant.helpers.entity import Entity
from homeassistant.const import EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP
from homeassistant.helpers.dispatcher import (dispatcher_send, async_dispatcher_connect)
import homeassistant.helpers.config_validation as cv
from homeassistant.core import callback
import board
import busio
_LOGGER = logging.getLogger(__name__)
CONF_PIN = "pin"
CONF_ADDRESS = "address"
DEFAULT_ADDRESS = 0x10
DEFAULT_INTERRUPT_PIN = 17
_SENSORS_SCHEMA = vol.Schema({cv.positive_int: cv.string})
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_PIN): _SENSORS_SCHEMA,
vol.Optional(CONF_ADDRESS, default=DEFAULT_ADDRESS): vol.Coerce(int),
}
)
@callback
def read_input(port):
#"""Read a value from a GPIO."""
_LOGGER.info("INTERRUPT!!!")
buf = [0xff]
rec_buf = []
state_pin = False
i2c = busio.I2C(board.SCL, board.SDA)
i2c.writeto_then_readfrom(0x10, bytes(buf), bytes(rec_buf))
#state_pin = ???????
return tmp
GPIO.setup(DEFAULT_INTERRUPT_PIN, GPIO.IN, GPIO.PUD_UP)
GPIO.add_event_detect(DEFAULT_INTERRUPT_PIN, GPIO.FALLING, callback=read_input, bouncetime=50)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the sensor platform."""
pins = config.get(CONF_PIN)
address = config.get(CONF_ADDRESS)
binary_sensors = []
for pin_num, pin_name in pins.items():
binary_sensors.append(SHS_bs(pin_name, pin_num, state))
async_add_entities(binary_sensors, True)
class SHS_bs(BinarySensorDevice):
"""Representation of a Sensor."""
def __init__(self, name, pin, state):
"""Initialize the sensor."""
self._name = name
self._pin = pin
self._state = state
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def should_poll(self):
"""No polling needed."""
return False
@property
def is_on(self):
"""Return true if the binary sensor is on."""
return self._state
def update(self):
"""Fetch new state data for the sensor."""
self._state = read_input(self._pin)
Thank you,
Sebastian