Hello everybody i am currently writing a home assistant integration, where i can monitor my pv devices. i have implemented a option flow (configure button). i can add sensores but i cant remove them. And I really don’t know how to solve this. PLease help! Thank you in advance
Link to github where i test: GitHub - EldarKarahasanovic/HA_TESTING
import logging
import voluptuous as vol
import requests
from requests.exceptions import HTTPError, ConnectTimeout, RequestException
from homeassistant import config_entries
import homeassistant.helpers.config_validation as cv
from homeassistant.const import (
CONF_HOST,
CONF_MONITORED_CONDITIONS,
)
from homeassistant.core import HomeAssistant, callback
from .const import DOMAIN, SENSOR_TYPES
_LOGGER = logging.getLogger(__name__)
SUPPORTED_SENSOR_TYPES = list(SENSOR_TYPES.keys())
DEFAULT_MONITORED_CONDITIONS = [
"power1_solar",
"temp1"
]
@callback
def mypv_entries(hass: HomeAssistant):
"""Return the hosts for the domain."""
return set(
(entry.data[CONF_HOST]) for entry in hass.config_entries.async_entries(DOMAIN)
)
class MypvConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Mypv config flow."""
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_POLL
def __init__(self) -> None:
"""Initialize the config flow."""
self._errors = {}
self._info = {}
self._host = None
self._filtered_sensor_types = {}
def _host_in_configuration_exists(self, host) -> bool:
"""Return True if host exists in configuration."""
return host in mypv_entries(self.hass)
def _check_host(self, host) -> bool:
"""Check if we can connect to the mypv."""
try:
response = requests.get(f"http://{host}/mypv_dev.jsn", timeout=10)
response.raise_for_status()
self._info = response.json()
except (ConnectTimeout, HTTPError) as e:
self._errors[CONF_HOST] = "could_not_connect"
_LOGGER.error(f"Connection error: {e}")
return False
except RequestException as e:
self._errors[CONF_HOST] = "unexpected_error"
_LOGGER.error(f"Unexpected error: {e}")
return False
return True
def _get_sensors(self, host):
"""Fetch sensor data and update _filtered_sensor_types."""
try:
response = requests.get(f"http://{host}/data.jsn", timeout=10)
response.raise_for_status()
data = response.json()
json_keys = set(data.keys())
self._filtered_sensor_types = {}
for key, value in SENSOR_TYPES.items():
if key in json_keys:
self._filtered_sensor_types[key] = value[0]
if not self._filtered_sensor_types:
_LOGGER.warning("No matching sensors found on the device.")
except RequestException as e:
_LOGGER.error(f"Error fetching sensor data: {e}")
self._filtered_sensor_types = {}
async def async_step_user(self, user_input=None):
"""Handle the initial step."""
if user_input is not None:
self._host = user_input[CONF_HOST]
if self._host_in_configuration_exists(self._host):
self._errors[CONF_HOST] = "host_exists"
else:
can_connect = await self.hass.async_add_executor_job(
self._check_host, self._host
)
if can_connect:
await self.hass.async_add_executor_job(self._get_sensors, self._host)
return await self.async_step_sensors()
user_input = user_input or {CONF_HOST: "192.168.0.0"}
setup_schema = vol.Schema(
{vol.Required(CONF_HOST, default=user_input[CONF_HOST]): str}
)
return self.async_show_form(
step_id="user", data_schema=setup_schema, errors=self._errors
)
async def async_step_sensors(self, user_input=None):
"""Handle the sensor selection step."""
if user_input is not None:
self._info['device'] = user_input.get('device', self._info.get('device'))
self._info['number'] = user_input.get('number', self._info.get('number'))
return self.async_create_entry(
title=f"{self._info['device']} - {self._info['number']}",
data={
CONF_HOST: self._host,
CONF_MONITORED_CONDITIONS: user_input[CONF_MONITORED_CONDITIONS],
'_filtered_sensor_types': self._filtered_sensor_types,
},
)
default_monitored_conditions = (
[] if self._async_current_entries() else DEFAULT_MONITORED_CONDITIONS
)
setup_schema = vol.Schema(
{
vol.Required(
CONF_MONITORED_CONDITIONS, default=default_monitored_conditions
): cv.multi_select(self._filtered_sensor_types),
}
)
return self.async_show_form(
step_id="sensors", data_schema=setup_schema, errors=self._errors
)
async def async_step_import(self, user_input=None):
"""Import a config entry."""
if self._host_in_configuration_exists(user_input[CONF_HOST]):
return self.async_abort(reason="host_exists")
self._host = user_input[CONF_HOST]
await self.hass.async_add_executor_job(self._check_host, self._host)
await self.hass.async_add_executor_job(self._get_sensors, self._host)
return await self.async_step_sensors(user_input)
@staticmethod
@callback
def async_get_options_flow(config_entry: config_entries.ConfigEntry) -> config_entries.OptionsFlow:
return MypvOptionsFlowHandler(config_entry)
class MypvOptionsFlowHandler(config_entries.OptionsFlow):
"""Handles options flow"""
def __init__(self, config_entry: config_entries.ConfigEntry) -> None:
"""Initialize options flow."""
self.config_entry = config_entry
self.filtered_sensor_types = config_entry.data.get('_filtered_sensor_types', {})
async def async_step_init(self, user_input=None):
"""Manage the options."""
if user_input is not None:
return self.async_create_entry(
title="",
data={
CONF_MONITORED_CONDITIONS: user_input[CONF_MONITORED_CONDITIONS],
},
)
options_schema = vol.Schema(
{
vol.Required(
CONF_MONITORED_CONDITIONS,
default=self.config_entry.options.get(
CONF_MONITORED_CONDITIONS, DEFAULT_MONITORED_CONDITIONS
),
): cv.multi_select(self.filtered_sensor_types),
}
)
return self.async_show_form(step_id="init", data_schema=options_schema)
sensor.py
"""The my-PV integration."""
import logging
from homeassistant.const import CONF_MONITORED_CONDITIONS
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.const import (
UnitOfElectricCurrent,
UnitOfFrequency,
UnitOfTemperature,
)
from .const import SENSOR_TYPES, DOMAIN, DATA_COORDINATOR
from .coordinator import MYPVDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass, entry, async_add_entities):
"""Add an my-PV entry."""
coordinator: MYPVDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id][
DATA_COORDINATOR
]
entities = []
if CONF_MONITORED_CONDITIONS in entry.options:
for sensor in entry.options[CONF_MONITORED_CONDITIONS]:
entities.append(MypvDevice(coordinator, sensor, entry.title))
else:
for sensor in entry.data[CONF_MONITORED_CONDITIONS]:
entities.append(MypvDevice(coordinator, sensor, entry.title))
async_add_entities(entities)
#liste löscen
class MypvDevice(CoordinatorEntity):
“”“Representation of a my-PV device.”“”
def __init__(self, coordinator, sensor_type, name):
"""Initialize the sensor."""
super().__init__(coordinator)
self._sensor = SENSOR_TYPES[sensor_type][0]
self._name = name
self.type = sensor_type
self._data_source = SENSOR_TYPES[sensor_type][3]
self.coordinator = coordinator
self._last_value = None
self._unit_of_measurement = SENSOR_TYPES[self.type][1]
self._icon = SENSOR_TYPES[self.type][2]
self.serial_number = self.coordinator.data["info"]["sn"]
self.model = self.coordinator.data["info"]["device"]
_LOGGER.debug(self.coordinator)
@property
def name(self):
"""Return the name of the sensor."""
return f"{self._name} {self._sensor}"
@property
def state(self):
"""Return the state of the device."""
try:
state = self.coordinator.data[self._data_source][self.type]
if self.type == "power_act":
if relOut is not None and loadNom is not None:
relOut = int(self.coordinator.data[self._data_source]["rel1_out"])
loadNom = int(self.coordinator.data[self._data_source]["load_nom"])
state = (relOut * loadNom) + int(state)
self._last_value = state
except Exception as ex:
_LOGGER.error(ex)
state = self._last_value
if state is None:
return state
if self._unit_of_measurement == UnitOfFrequency.HERTZ:
return state / 1000
if self._unit_of_measurement == UnitOfTemperature.CELSIUS:
return state / 10
if self._unit_of_measurement == UnitOfElectricCurrent.AMPERE:
return state / 10
return state
@property
def unit_of_measurement(self):
"""Return the unit of measurement this sensor expresses itself in."""
return self._unit_of_measurement
@property
def icon(self):
"""Return icon."""
return self._icon
@property
def unique_id(self):
"""Return unique id based on device serial and variable."""
return "{} {}".format(self.serial_number, self._sensor)
@property
def device_info(self):
"""Return information about the device."""
return {
"identifiers": {(DOMAIN, self.serial_number)},
"name": self._name,
"manufacturer": "my-PV",
"model": self.model,
}
__init__.py
""" Integration for MYPV AC-Thor"""
import voluptuous as vol
import logging
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import (
CONF_HOST,
CONF_MONITORED_CONDITIONS,
)
import homeassistant.helpers.config_validation as cv
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.core import HomeAssistant
from .const import DOMAIN, SENSOR_TYPES, DATA_COORDINATOR, PLATFORMS
from .coordinator import MYPVDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_MONITORED_CONDITIONS): vol.All(
cv.ensure_list, [vol.In(list(SENSOR_TYPES))]
),
}
)
},
extra=vol.ALLOW_EXTRA,
)
async def async_setup(hass, config):
"""Platform setup, do nothing."""
hass.data.setdefault(DOMAIN, {})
if DOMAIN not in config:
return True
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=dict(config[DOMAIN])
)
)
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Load the saved entities."""
coordinator = MYPVDataUpdateCoordinator(
hass,
config=entry.data,
options=entry.options,
)
await coordinator.async_refresh()
# Reload entry when its updated.
entry.async_on_unload(entry.add_update_listener(_async_update_listener))
if not coordinator.last_update_success:
raise ConfigEntryNotReady
hass.data[DOMAIN][entry.entry_id] = {
DATA_COORDINATOR: coordinator,
}
await hass.config_entries.async_forward_entry_setups(entry, ["sensor", "switch", "button"])
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update."""
await hass.config_entries.async_reload(entry.entry_id)