Eufy Robovac 35c Working with Home_Assistant - Updated 11/2020 - How To Guide - Now with Edge Cleaning!

Is there any way of designing a different way of doing this? eufy robvax , I’ve noticed that hacs has a plug-in, but I can’t get it to work tried several ways to integrate code

I wanted to share with everyone how I was able to get my X8 Hybrid to play nice with Home Assistant. Took some time but I ended up editing the vacuum.py code located in the /config/custom_components/robovac integration folder. My edited code of vacuum.py is below.

# Copyright 2022 Brendan McCluskey
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Eufy Robovac sensor platform."""
from __future__ import annotations
from collections.abc import Mapping

from datetime import timedelta
import logging
import asyncio
import base64
import json
import time
import ast

from typing import Any
from enum import IntEnum
from homeassistant.loader import bind_hass
from homeassistant.components.vacuum import VacuumEntity, VacuumEntityFeature
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import (
    CONNECTION_NETWORK_MAC,
)
from homeassistant.helpers.entity import DeviceInfo

from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.const import (
    CONF_ACCESS_TOKEN,
    CONF_MODEL,
    CONF_NAME,
    CONF_ID,
    CONF_IP_ADDRESS,
    CONF_DESCRIPTION,
    CONF_MAC,
    STATE_ON,
)

from .const import CONF_VACS, DOMAIN

from .tuyalocalapi import TuyaDevice

from homeassistant.const import ATTR_BATTERY_LEVEL


class RoboVacEntityFeature(IntEnum):
    """Supported features of the RoboVac entity."""

    EDGE = 1
    SMALL_ROOM = 2
    CLEANING_TIME = 4
    CLEANING_AREA = 8
    DO_NOT_DISTURB = 16
    AUTO_RETURN = 32
    CONSUMABLES = 64
    ROOM = 128
    ZONE = 256
    MAP = 512
    BOOST_IQ = 1024


ATTR_BATTERY_ICON = "battery_icon"
ATTR_FAN_SPEED = "fan_speed"
ATTR_FAN_SPEED_LIST = "fan_speed_list"
ATTR_STATUS = "status"
ATTR_ERROR_CODE = "error_code"
ATTR_MODEL_CODE = "model_code"
ATTR_CLEANING_AREA = "cleaning_area"
ATTR_CLEANING_TIME = "cleaning_time"
ATTR_AUTO_RETURN = "auto_return"
ATTR_DO_NOT_DISTURB = "do_not_disturb"
ATTR_BOOST_IQ = "boost_iq"
ATTR_CONSUMABLES = "consumables"
ATTR_MODE = "mode"

_LOGGER = logging.getLogger(__name__)
# Time between updating data from GitHub
REFRESH_RATE = 20
SCAN_INTERVAL = timedelta(seconds=REFRESH_RATE)


class robovac(TuyaDevice):
    """"""


@bind_hass
def is_on(hass: HomeAssistant, entity_id: str) -> bool:
    """Return if the vacuum is on based on the statemachine."""
    return hass.states.is_state(entity_id, STATE_ON)


async def async_setup_entry(
    hass: HomeAssistant,
    config_entry: ConfigEntry,
    async_add_entities: AddEntitiesCallback,
) -> None:
    """Initialize my test integration 2 config entry."""
    # print("vacuum:async_setup_entry")
    vacuums = config_entry.data[CONF_VACS]
    # print("Vac:", vacuums)
    for item in vacuums:
        item = vacuums[item]
        # print("item")
        async_add_entities([RoboVacEntity(item)])


class RoboVacEntity(VacuumEntity):
    """Eufy Robovac version of a Vacuum entity"""

    _attr_should_poll = True

    _attr_access_token: str | None = None
    _attr_ip_address: str | None = None
    _attr_model_code: str | None = None
    _attr_cleaning_area: str | None = None
    _attr_cleaning_time: str | None = None
    _attr_auto_return: str | None = None
    _attr_do_not_disturb: str | None = None
    _attr_boost_iq: str | None = None
    _attr_consumables: str | None = None
    _attr_mode: str | None = None
    _attr_robovac_supported: str | None = None

    @property
    def robovac_supported(self) -> str | None:
        """Return the cleaning mode of the vacuum cleaner."""
        return self._attr_robovac_supported

    @property
    def mode(self) -> str | None:
        """Return the cleaning mode of the vacuum cleaner."""
        return self._attr_mode

    @property
    def consumables(self) -> str | None:
        """Return the consumables status of the vacuum cleaner."""
        return self._attr_consumables

    @property
    def cleaning_area(self) -> str | None:
        """Return the cleaning area of the vacuum cleaner."""
        return self._attr_cleaning_area

    @property
    def cleaning_time(self) -> str | None:
        """Return the cleaning time of the vacuum cleaner."""
        return self._attr_cleaning_time

    @property
    def auto_return(self) -> str | None:
        """Return the auto_return mode of the vacuum cleaner."""
        return self._attr_auto_return

    @property
    def do_not_disturb(self) -> str | None:
        """Return the do not disturb mode of the vacuum cleaner."""
        return self._attr_do_not_disturb

    @property
    def boost_iq(self) -> str | None:
        """Return the boost iq mode of the vacuum cleaner."""
        return self._attr_boost_iq

    @property
    def model_code(self) -> str | None:
        """Return the model code of the vacuum cleaner."""
        return self._attr_model_code

    @property
    def access_token(self) -> str | None:
        """Return the fan speed of the vacuum cleaner."""
        return self._attr_access_token

    @property
    def ip_address(self) -> str | None:
        """Return the ip address of the vacuum cleaner."""
        return self._attr_ip_address

    @property
    def state_attributes(self) -> dict[str, Any]:
        """Return the state attributes of the vacuum cleaner."""
        data = super().state_attributes
        # data: dict[str, Any] = {}
        if self.supported_features & VacuumEntityFeature.BATTERY:
            data[ATTR_BATTERY_LEVEL] = self.battery_level
            data[ATTR_BATTERY_ICON] = self.battery_icon
        if self.supported_features & VacuumEntityFeature.FAN_SPEED:
            data[ATTR_FAN_SPEED] = self.fan_speed
        if self.supported_features & VacuumEntityFeature.STATUS:
            data[ATTR_STATUS] = self.status
        if self.robovac_supported & RoboVacEntityFeature.CLEANING_AREA:
            data[ATTR_CLEANING_AREA] = self.cleaning_area
        if self.robovac_supported & RoboVacEntityFeature.CLEANING_TIME:
            data[ATTR_CLEANING_TIME] = self.cleaning_time
        if self.robovac_supported & RoboVacEntityFeature.AUTO_RETURN:
            data[ATTR_AUTO_RETURN] = self.auto_return
        if self.robovac_supported & RoboVacEntityFeature.DO_NOT_DISTURB:
            data[ATTR_DO_NOT_DISTURB] = self.do_not_disturb
        if self.robovac_supported & RoboVacEntityFeature.BOOST_IQ:
            data[ATTR_BOOST_IQ] = self.boost_iq
        if self.robovac_supported & RoboVacEntityFeature.CONSUMABLES:
            data[ATTR_CONSUMABLES] = self.consumables
        data[ATTR_MODE] = self.mode
        return data

    @property
    def capability_attributes(self) -> Mapping[str, Any] | None:
        """Return capability attributes."""
        if self.supported_features & VacuumEntityFeature.FAN_SPEED:
            return {
                ATTR_FAN_SPEED_LIST: self.fan_speed_list,
                # CONF_ACCESS_TOKEN: self.access_token,
                CONF_IP_ADDRESS: self.ip_address,
                ATTR_MODEL_CODE: self.model_code,
            }
        else:
            return {
                # CONF_ACCESS_TOKEN: self.access_token,
                CONF_IP_ADDRESS: self.ip_address,
                ATTR_MODEL_CODE: self.model_code,
            }

    def __init__(self, item) -> None:
        # print("vacuum:RoboVacEntity")
        # print("init_item", item)
        """Initialize mytest2 Sensor."""
        super().__init__()
        self._extra_state_attributes = {}
        self._attr_battery_level = 0
        self._attr_is_on = False
        self._attr_name = item[CONF_NAME]
        self._attr_unique_id = item[CONF_ID]
        self._attr_supported_features = 4084
        self._attr_model_code = item[CONF_MODEL]
        self._attr_ip_address = item[CONF_IP_ADDRESS]
        self._attr_access_token = item[CONF_ACCESS_TOKEN]
        self._attr_robovac_supported = 0
        if self.model_code[0:5] in [
            "T2103",
            "T2117",
            "T2118",
            "T2119",
            "T2120",
            "T2123",
            "T2128",
            "T2130",
        ]:  # C
            self._attr_fan_speed_list = ["No Suction", "Standard", "Boost IQ", "Max"]
            self._attr_robovac_supported = (
                RoboVacEntityFeature.EDGE | RoboVacEntityFeature.SMALL_ROOM
            )
        elif self.model_code[0:5] in ["T1250", "T2250", "T2251", "T2252", "T2253"]:  # G
            self._attr_fan_speed_list = ["Standard", "Turbo", "Max", "Boost IQ"]
            self._attr_robovac_supported = (
                RoboVacEntityFeature.CLEANING_TIME
                | RoboVacEntityFeature.CLEANING_AREA
                | RoboVacEntityFeature.DO_NOT_DISTURB
                | RoboVacEntityFeature.AUTO_RETURN
                | RoboVacEntityFeature.CONSUMABLES
            )
        elif self.model_code[0:5] in ["T2262", "T2261", "T2261A"]:  # X
            self._attr_fan_speed_list = ["Pure", "Standard", "Turbo", "Max"]
            self._attr_robovac_supported = (
                RoboVacEntityFeature.CLEANING_TIME
                | RoboVacEntityFeature.CLEANING_AREA
                | RoboVacEntityFeature.DO_NOT_DISTURB
                | RoboVacEntityFeature.AUTO_RETURN
                | RoboVacEntityFeature.CONSUMABLES
                | RoboVacEntityFeature.ROOM
                | RoboVacEntityFeature.ZONE
                | RoboVacEntityFeature.MAP
                | RoboVacEntityFeature.BOOST_IQ
            )
        else:
            self._attr_fan_speed_list = ["Pure", "Standard", "Turbo", "Max"]
            self._attr_robovac_supported = (
                RoboVacEntityFeature.CLEANING_TIME
                | RoboVacEntityFeature.CLEANING_AREA
                | RoboVacEntityFeature.DO_NOT_DISTURB
                | RoboVacEntityFeature.AUTO_RETURN
                | RoboVacEntityFeature.CONSUMABLES
                | RoboVacEntityFeature.ROOM
                | RoboVacEntityFeature.ZONE
                | RoboVacEntityFeature.MAP
                | RoboVacEntityFeature.BOOST_IQ
            )
        self.vacuum = robovac(
            device_id=self.unique_id,
            host=self.ip_address,
            local_key=self.access_token,
            timeout=2,
            ping_interval=10
            # ping_interval=REFRESH_RATE / 2,
        )
        self.error_code = None
        self.tuya_state = None
        self.tuyastatus = None
        print("vac:", self.vacuum)

    async def async_update(self):
        """Synchronise state from the vacuum."""
        print("update:", self.name)
        self.async_write_ha_state()
        if self.ip_address == "":
            return
        await self.vacuum.async_get()
        self.tuyastatus = self.vacuum._dps
        print("Tuya local API Result:", self.tuyastatus)
        # for 15C
        self._attr_battery_level = self.tuyastatus.get("104")
        self.tuya_state = self.tuyastatus.get("15")
        self.error_code = self.tuyastatus.get("106")
        self._attr_mode = self.tuyastatus.get("5")
        self._attr_fan_speed = self.tuyastatus.get("102")
        if self.fan_speed == "No_suction":
            self._attr_fan_speed = "No Suction"
        elif self.fan_speed == "Boost_IQ":
            self._attr_fan_speed = "Boost IQ"
        elif self.fan_speed == "Quiet":
            self._attr_fan_speed = "Pure"
        # for G30
        self._attr_cleaning_area = self.tuyastatus.get("110")
        self._attr_cleaning_time = self.tuyastatus.get("109")
        self._attr_auto_return = self.tuyastatus.get("135")
        self._attr_do_not_disturb = self.tuyastatus.get("107")
        if self.tuyastatus.get("142") is not None:
            self._attr_consumables = ast.literal_eval(
                base64.b64decode(self.tuyastatus.get("142")).decode("ascii")
            )["consumable"]["duration"]
            print(self.consumables)
        # For X8
        self._attr_boost_iq = self.tuyastatus.get("118")
        # self.map_data = self.tuyastatus.get("121")
        # self.erro_msg? = self.tuyastatus.get("124")
        if self.tuyastatus.get("116") is not None:
            self._attr_consumables = ast.literal_eval(
                base64.b64decode(self.tuyastatus.get("116")).decode("ascii")
            )["consumable"]["duration"]
            print(self.consumables)

    @property
    def status(self):
        """Return the status of the vacuum cleaner."""
        print("status:", self.error_code, self.tuya_state)
        if self.ip_address == "":
            return "Error: Set the IP Address"
        if type(self.error_code) is not None and self.error_code not in [0, "no_error"]:
            self._attr_is_on = False
            if self.error_code == 1:
                return "Error: Front bumper stuck"
            elif self.error_code == 2:
                return "Error: Wheel stuck"
            elif self.error_code == 3:
                return "Error: Side brush"
            elif self.error_code == 4:
                return "Error: Rolling brush bar stuck"
            elif self.error_code == 5:
                return "Error: Device trapped"
            elif self.error_code == 6:
                return "Error: Device trapped"
            elif self.error_code == 7:
                return "Error: Wheel suspended"
            elif self.error_code == 8:
                return "Error: Low battery"
            elif self.error_code == 9:
                return "Error: Magnetic boundary"
            elif self.error_code == 12:
                return "Error: Right wall sensor"
            elif self.error_code == 13:
                return "Error: Device tilted"
            elif self.error_code == 14:
                return "Error: Insert dust collector"
            elif self.error_code == 17:
                return "Error: Restricted area detected"
            elif self.error_code == 18:
                return "Error: Laser cover stuck"
            elif self.error_code == 19:
                return "Error: Laser sesor stuck"
            elif self.error_code == 20:
                return "Error: Laser sensor blocked"
            elif self.error_code == 21:
                return "Error: Base blocked"
            elif self.error_code == "S1":
                return "Error: Battery"
            elif self.error_code == "S2":
                return "Error: Wheel Module"
            elif self.error_code == "S3":
                return "Error: Side Brush"
            elif self.error_code == "S4":
                return "Error: Suction Fan"
            elif self.error_code == "S5":
                return "Error: Rolling Brush"
            elif self.error_code == "S8":
                return "Error: Path Tracking Sensor"
            elif self.error_code == "Wheel_stuck":
                return "Error: Wheel stuck"
            elif self.error_code == "R_brush_stuck":
                return "Error: Rolling brush stuck"
            elif self.error_code == "Crash_bar_stuck":
                return "Error: Front bumper stuck"
            elif self.error_code == "sensor_dirty":
                return "Error: Sensor dirty"
            elif self.error_code == "N_enough_pow":
                return "Error: Low battery"
            elif self.error_code == "Stuck_5_min":
                return "Error: Device trapped"
            elif self.error_code == "Fan_stuck":
                return "Error: Fan stuck"
            elif self.error_code == "S_brush_stuck":
                return "Error: Side brush stuck"
            else:
                return "Error: " + str(self.error_code)
        elif self.tuya_state == "Running":
            self._attr_is_on = True
            return "cleaning"
        elif self.tuya_state == "Locating":
            self._attr_is_on = True
            return "cleaning"
        elif self.tuya_state == "remote":
            self._attr_is_on = True
            return "cleaning"
        elif self.tuya_state == "Charging":
            self._attr_is_on = False
            return "charging"
        elif self.tuya_state == "completed":
            self._attr_is_on = False
            return "docked"
        elif self.tuya_state == "Recharge":
            self._attr_is_on = True
            return "returning"
        elif self.tuya_state == "Sleeping":
            self._attr_is_on = False
            return "paused"
        elif self.tuya_state == "standby":
            self._attr_is_on = False
            return "paused"
        else:
            return "cleaning"

    async def async_locate(self, **kwargs):
        """Locate the vacuum cleaner."""
        print("Locate Pressed")
        _LOGGER.info("Locate Pressed")
        if self.tuyastatus.get("103"):
            await self.vacuum.async_set({"103": False}, None)
        else:
            await self.vacuum.async_set({"103": True}, None)

    async def async_return_to_base(self, **kwargs):
        """Set the vacuum cleaner to return to the dock."""
        print("Return home Pressed")
        _LOGGER.info("Return home Pressed")
        await self.vacuum.async_set({"101": True}, None)
        await asyncio.sleep(1)
        self.async_update

    async def async_start_pause(self, **kwargs):
        """Pause the cleaning task or resume it."""
        print("Start/Pause Pressed")
        _LOGGER.info("Start/Pause Pressed")
        if self.tuya_state in ["Recharge", "Running", "Locating", "remote"]:
            await self.vacuum.async_set({"2": False}, None)
        else:
            if self.mode == "Nosweep":
                self._attr_mode = "auto"
            elif self.mode == "room" and (
                self.status == "Charging" or self.status == "completed"
            ):
                self._attr_mode = "auto"
            await self.vacuum.async_set({"5": self.mode}, None)
        await asyncio.sleep(1)
        self.async_update

    async def async_clean_spot(self, **kwargs):
        """Perform a spot clean-up."""
        print("Spot Clean Pressed")
        _LOGGER.info("Spot Clean Pressed")
        await self.vacuum.async_set({"5": "Spot"}, None)
        await asyncio.sleep(1)
        self.async_update

    async def async_set_fan_speed(self, fan_speed, **kwargs):
        """Set fan speed."""
        print("Fan Speed Selected", fan_speed)
        _LOGGER.info("Fan Speed Selected")
        if fan_speed == "No Suction":
            fan_speed = "No_suction"
        elif fan_speed == "Boost IQ":
            fan_speed = "Boost_IQ"
        elif fan_speed == "Pure":
            fan_speed = "Quiet"
        await self.vacuum.async_set({"102": fan_speed}, None)
        await asyncio.sleep(1)
        self.async_update

    async def async_send_command(
        self, command: str, params: dict | list | None = None, **kwargs
    ) -> None:
        """Send a command to a vacuum cleaner."""
        _LOGGER.info("Send Command %s Pressed", command)
        if command == "edgeClean":
            await self.vacuum.async_set({"5": "Edge"}, None)
        elif command == "smallRoomClean":
            await self.vacuum.async_set({"5": "SmallRoom"}, None)
        elif command == "autoClean":
            await self.vacuum.async_set({"5": "auto"}, None)
        elif command == "autoReturn":
            if self.auto_return:
                await self.vacuum.async_set({"135": False}, None)
            else:
                await self.vacuum.async_set({"135": True}, None)
        elif command == "doNotDisturb":
            if self.do_not_disturb:
                await self.vacuum.async_set({"139": "MEQ4MDAwMDAw"}, None)
                await self.vacuum.async_set({"107": False}, None)
            else:
                await self.vacuum.async_set({"139": "MTAwMDAwMDAw"}, None)
                await self.vacuum.async_set({"107": True}, None)
        elif command == "boostIQ":
            if self.boost_iq:
                await self.vacuum.async_set({"118": False}, None)
            else:
                await self.vacuum.async_set({"118": True}, None)
        elif command == "roomClean":
            roomIds = params.get("roomIds", [1])
            count = params.get("count", 1)
            clean_request = {"roomIds": roomIds, "cleanTimes": count}
            method_call = {
                "method": "selectRoomsClean",
                "data": clean_request,
                "timestamp": round(time.time() * 1000),
            }
            json_str = json.dumps(method_call, separators=(",", ":"))
            base64_str = base64.b64encode(json_str.encode("utf8")).decode("utf8")
            _LOGGER.info("roomClean call %s", json_str)
            await self.vacuum.async_set({"124": base64_str}, None)
        await asyncio.sleep(1)
        self.async_update


I wanted to point out what part of the code I edited to make the magic happen, Thank you to everyone that helped me figure this out.

def __init__(self, item) -> None:
        # print("vacuum:RoboVacEntity")
        # print("init_item", item)
        """Initialize mytest2 Sensor."""
        super().__init__()
        self._extra_state_attributes = {}
        self._attr_battery_level = 0
        self._attr_is_on = False
        self._attr_name = item[CONF_NAME]
        self._attr_unique_id = item[CONF_ID]
        self._attr_supported_features = 4084
        self._attr_model_code = item[CONF_MODEL]
        self._attr_ip_address = item[CONF_IP_ADDRESS]
        self._attr_access_token = item[CONF_ACCESS_TOKEN]
        self._attr_robovac_supported = 0
        if self.model_code[0:5] in [
            "T2103",
            "T2117",
            "T2118",
            "T2119",
            "T2120",
            "T2123",
            "T2128",
            "T2130",
        ]:  # C
            self._attr_fan_speed_list = ["No Suction", "Standard", "Boost IQ", "Max"]
            self._attr_robovac_supported = (
                RoboVacEntityFeature.EDGE | RoboVacEntityFeature.SMALL_ROOM
            )
        elif self.model_code[0:5] in ["T1250", "T2250", "T2251", "T2252", "T2253"]:  # G
            self._attr_fan_speed_list = ["Standard", "Turbo", "Max", "Boost IQ"]
            self._attr_robovac_supported = (
                RoboVacEntityFeature.CLEANING_TIME
                | RoboVacEntityFeature.CLEANING_AREA
                | RoboVacEntityFeature.DO_NOT_DISTURB
                | RoboVacEntityFeature.AUTO_RETURN
                | RoboVacEntityFeature.CONSUMABLES
            )
        elif self.model_code[0:5] in ["T2262", "T2261", "T2261A"]:  # X
            self._attr_fan_speed_list = ["Pure", "Standard", "Turbo", "Max"]
            self._attr_robovac_supported = (
                RoboVacEntityFeature.CLEANING_TIME
                | RoboVacEntityFeature.CLEANING_AREA
                | RoboVacEntityFeature.DO_NOT_DISTURB
                | RoboVacEntityFeature.AUTO_RETURN
                | RoboVacEntityFeature.CONSUMABLES
                | RoboVacEntityFeature.ROOM
                | RoboVacEntityFeature.ZONE
                | RoboVacEntityFeature.MAP
                | RoboVacEntityFeature.BOOST_IQ
            )
        else:
            self._attr_fan_speed_list = ["Pure", "Standard", "Turbo", "Max"]
            self._attr_robovac_supported = (
                RoboVacEntityFeature.CLEANING_TIME
                | RoboVacEntityFeature.CLEANING_AREA
                | RoboVacEntityFeature.DO_NOT_DISTURB
                | RoboVacEntityFeature.AUTO_RETURN
                | RoboVacEntityFeature.CONSUMABLES
                | RoboVacEntityFeature.ROOM
                | RoboVacEntityFeature.ZONE
                | RoboVacEntityFeature.MAP
                | RoboVacEntityFeature.BOOST_IQ
            )
        self.vacuum = robovac(
            device_id=self.unique_id,
            host=self.ip_address,
            local_key=self.access_token,
            timeout=2,
            ping_interval=10
            # ping_interval=REFRESH_RATE / 2,
        )
        self.error_code = None
        self.tuya_state = None
        self.tuyastatus = None
        print("vac:", self.vacuum)

[/quote]

Just want to thank you all for your work on this. I am definitely new to the arena here, but was able to make it work. For some reason, I was unable to work it with HACS (would not fully complete the download) and had to manually install it with “git clone”. I also probably did that wrong because I did it from the “custom_components” directory and it recreated that directory and included the README, setup.py, and other files on that page. So I had to move all the files around after the manual download.

I’m just glad it is working now.

Thanks again.

Similar to @SmartFleshlight, I’m not able to sent any commands. However, all attributes works and updates. I’m getting the following error:

Logger: homeassistant.components.websocket_api.http.connection
Source: components/vacuum/__init__.py:351
Integration: Home Assistant WebSocket API (documentation, issues)
First occurred: 10:30:36 AM (1 occurrences)
Last logged: 10:30:36 AM

[547712564912]
Traceback (most recent call last):
  File "/srv/homeassistant/lib/python3.10/site-packages/homeassistant/components/websocket_api/commands.py", line 199, in handle_call_service
    await hass.services.async_call(
  File "/srv/homeassistant/lib/python3.10/site-packages/homeassistant/core.py", line 1808, in async_call
    task.result()
  File "/srv/homeassistant/lib/python3.10/site-packages/homeassistant/core.py", line 1845, in _execute_service
    await cast(Callable[[ServiceCall], Awaitable[None]], handler.job.target)(
  File "/srv/homeassistant/lib/python3.10/site-packages/homeassistant/helpers/entity_component.py", line 213, in handle_service
    await service.entity_service_call(
  File "/srv/homeassistant/lib/python3.10/site-packages/homeassistant/helpers/service.py", line 686, in entity_service_call
    future.result()  # pop exception if have
  File "/srv/homeassistant/lib/python3.10/site-packages/homeassistant/helpers/entity.py", line 961, in async_request_call
    await coro
  File "/srv/homeassistant/lib/python3.10/site-packages/homeassistant/helpers/service.py", line 726, in _handle_entity_call
    await result
  File "/srv/homeassistant/lib/python3.10/site-packages/homeassistant/components/vacuum/__init__.py", line 358, in async_turn_on
    await self.hass.async_add_executor_job(partial(self.turn_on, **kwargs))
  File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/srv/homeassistant/lib/python3.10/site-packages/homeassistant/components/vacuum/__init__.py", line 351, in turn_on
    raise NotImplementedError()
NotImplementedError

I’ve restarted home assistant a few times and I’m running on the most updated version.

Does someone know how to solve this?

Edit: I no longer have this problem. I’ve took the code from a pull request, since the conversation said the code was update to make specifically RoboVac 30C works, which is the one that I have. However, wiping my robovac folder content to put the original content. Everything works now! I’m using this github repository.

1 Like

I have also succesfully addes the vacuum but only have a on/off button that does not work. If I control the vacuum via the app the state gets updated, so there is communication.

Did you add any other configuration ?

On/off button does not work for me as well. On the entity card, if you click on the device name instead of the swtich, you will have much more choice afterwards. Thoses works for me.

1 Like

Ok, i get more settings there also, but no control of the vacuum on any one of them.

Edit: after another restart of HA it works, thanks !

Zone Cleaning Question:
Just upgraded my G30 Edge to an X8 (Thanks, pet hair).
Wondering if there is any way to grab the cleaning zones from the app/account and call those with the integration? It’d be perfect to have a dashboard button that does say, the living room and kitchen. I know This Smart House pulled it off with a Roborock device, wasn’t sure if that was also possible here.

Using GitHub - bmccluskey/robovac: Add a Eufy RoboVac easily to Home Assistant - thank you, pbulteel, bmccluskey, and others for all your hard work on this!

this was EXACTLY what I was looking for!!! got it installed and it works GREAT!!! Thank you!!!

1 Like

except for line 272 what else did you change? Simply adding my model number to the list there seemed to fix the x8Hybrid connection issue

Hi, i dont know if Im asking this in the right place i have a 15c

But im have one big problems:

Time to time, like one or two months apart, the eufy app clean all and my vacuum dissapear, i need to reconnect and read, is this a problem everyone have?

yes you can right a script like to clean a single room.
NB “roomIds” just have to be worked out by trial and error, start with 0 and work your way up.
“count” is how many times it will do the clean [1,2 or 3]

"alias: "X8 Livingroom Clean "
sequence:
  - service: vacuum.send_command
    data:
      command: roomClean
      params:
        roomIds:
          - 1
        count: 2
    target:
      device_id: <REMOVED>
mode: single
icon: mdi:television-classic"
1 Like

Hi,
eufy-device-id-and-local-key-grabber is given me errors, and the page is 404 error now, any solution?

I cloned the code into my repo.

Hope this helps.

Thanks for the code update to support the X8 Hybrid. FYI, the only thing I did is go into my existing vacuum.py and add my models to this line:

elif self.model_code[0:5] in ["T2262", "T2262A", "T2261", "T2261A"]:  # X

So I got myself a Eufy 15C for stupid cheap to see if the Mrs liked the idea and surprisingly she does! She even asked how much a new one that could mop as well costs!?

So I’ve added the 15C to HA with @bmccluskey’s HACS addon and it all works. After reading this whole thread I’m still not sure if it is possible to operate without cloud connection, can somebody put me straight?

I’m making the assumption that I’ll need the DEVICE_ID and LOCAL_KEY for local operation, not sure if that’s right though?

I know the older method with various old apks no longer work, but I tried them anyway. I’ve also tried the python key grabber from @markbajaj but I just get python errors and as I’m only using python to get the keys I don’t want to put too much effort in to get the grabber/python working unless it’s worth my while and be able to operate locally.

Just checked and the python grabber still works on my mac on 3.7.1

I did a few months ago spend half a day trying to dockerise it so I could share it, but gave up as was getting so many errors!

EDIT: To answer the local only question, I am not sure as not tried it. However, when I changed the WLAN it was on it totally screwed it in HA (and Alexa) until i removed and re-added due to the IP address change.

I can’t seem to make HACS see this repo. I can go to the github page all day. Am I missing something?

Thank you @markbajaj, I’m going to leave things alone for the time being as I’ve got a few more higher priority (so I’ve been told) tasks and will return at some point. I’ll see if I can get python working to get the local data and see what happens if I disable internet access. Job for another day!

1 Like