Hey guys,
I updated the waze travel time sensor to handle device trackers, sensors, and zones. Take a look. Feedback is welcome.
Support for Waze travel time sensor.
For more details about this platform, please refer to the documentation at
from datetime import timedelta
import logging
import requests
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
import homeassistant.helpers.config_validation as cv
import homeassistant.helpers.location as location
from homeassistant.helpers.entity import Entity
from homeassistant.util import Throttle
REQUIREMENTS = ['WazeRouteCalculator==0.5']
_LOGGER = logging.getLogger(__name__)
ATTR_DURATION = 'duration'
ATTR_DISTANCE = 'distance'
ATTR_ROUTE = 'route'
CONF_ATTRIBUTION = "Data provided by the Waze.com"
CONF_DESTINATION = 'destination'
CONF_ORIGIN = 'origin'
DEFAULT_NAME = 'Waze Travel Time'
ICON = 'mdi:car'
REGIONS = ['US', 'NA', 'EU', 'IL']
SCAN_INTERVAL = timedelta(minutes=5)
vol.Required(CONF_ORIGIN): cv.string,
vol.Required(CONF_DESTINATION): cv.string,
vol.Required(CONF_REGION): vol.In(REGIONS),
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
TRACKABLE_DOMAINS = ['device_tracker', 'sensor', 'zone']
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the Waze travel time sensor platform."""
def run_setup(event):
destination = config.get(CONF_DESTINATION)
name = config.get(CONF_NAME)
origin = config.get(CONF_ORIGIN)
region = config.get(CONF_REGION)
sensor = WazeTravelTime(hass, name, origin, destination, region)
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, run_setup)
class WazeTravelTime(Entity):
"""Representation of a Waze travel time sensor."""
def __init__(self, hass, name, origin, destination, region):
"""Initialize the Waze travel time sensor."""
self._hass = hass
self._name = name
self._region = region
self._state = None
if origin.split('.', 1)[0] in TRACKABLE_DOMAINS:
self._origin_entity_id = origin
self._origin = origin
if destination.split('.', 1)[0] in TRACKABLE_DOMAINS:
self._destination_entity_id = destination
self._destination = destination
def name(self):
"""Return the name of the sensor."""
return self._name
def state(self):
"""Return the state of the sensor."""
if self._state is None:
return None
if 'duration' in self._state:
return round(self._state['duration'])
return None
def unit_of_measurement(self):
"""Return the unit of measurement."""
return 'min'
def icon(self):
"""Icon to use in the frontend, if any."""
return ICON
def device_state_attributes(self):
"""Return the state attributes of the last update."""
if self._state is None:
return None
if 'duration' in self._state:
res[ATTR_DURATION] = self._state['duration']
if 'distance' in self._state:
res[ATTR_DISTANCE] = self._state['distance']
if 'route' in self._state:
res[ATTR_ROUTE] = self._state['route']
return res
def _get_location_from_entity(self, entity_id):
"""Get the location from the entity state or attributes."""
entity = self._hass.states.get(entity_id)
if entity is None:
_LOGGER.error("Unable to find entity %s", entity_id)
return None
# Check if the entity has location attributes (zone)
if location.has_location(entity):
return self._get_location_from_attributes(entity)
# Check if device is in a zone (device_tracker)
zone_entity = self._hass.states.get("zone.%s" % entity.state)
if location.has_location(zone_entity):
"%s is in %s, getting zone location",
entity_id, zone_entity.entity_id
return self._get_location_from_attributes(zone_entity)
# If zone was not found in state then use the state as the location
if entity_id.startswith("sensor."):
return entity.state
# When everything fails just return nothing
return None
def _get_location_from_attributes(entity):
"""Get the lat/long string from an entities attributes."""
attr = entity.attributes
return "%s,%s" % (attr.get(ATTR_LATITUDE), attr.get(ATTR_LONGITUDE))
def _resolve_zone(self, friendly_name):
entities = self._hass.states.all()
for entity in entities:
if entity.domain == 'zone' and entity.name == friendly_name:
return self._get_location_from_attributes(entity)
return friendly_name
def update(self):
"""Fetch new state data for the sensor."""
import WazeRouteCalculator
if hasattr(self, '_origin_entity_id'):
self._origin = self._get_location_from_entity(
if hasattr(self, '_destination_entity_id'):
self._destination = self._get_location_from_entity(
self._destination = self._resolve_zone(self._destination)
self._origin = self._resolve_zone(self._origin)
if self._destination is not None and self._origin is not None:
params = WazeRouteCalculator.WazeRouteCalculator(
self._origin, self._destination, self._region)
routes = params.calc_all_routes_info()
route = next(iter(routes))
duration, distance = routes[route]
route = bytes(route, 'ISO-8859-1').decode('UTF-8')
self._state = {
'duration': duration,
'distance': distance,
'route': route}
except WazeRouteCalculator.WRCError as exp:
_LOGGER.error("Error on retrieving data: %s", exp)
except KeyError:
_LOGGER.error("Error retrieving data from server")