External python scripts for polling Roborock data

I made two external scripts for polling Roborock data.
I have made these scripts because my HA version does not support python-miio 0.5.4, only 0.5.3, and I had problems installing 0.5.3 because of dependencies problems.
Also my HA is installed in a Raspbian operating system on my Raspberry Pi.
So this is only used if the Vaccum Miio intergration does not work.

Previously, I used only one script, which connected to the device, and polled for data. But HA does not like to wait for the script to finish (it may take several seconds or more to read the data), so therefore I made two scripts, one that is polling the data and storing it, and one script that reads that stored data with zero waiting time.

Note that these scripts are not used in the HA:s integrated python-services, probably it possible, but I have not tested it.

This are the two scripts:

roborock_daemon.py
This script is run as a daemon, and needs to be started up, either manually by the command:
$ python3 roborock_daemon.py &
or, I guess the more recommended way, as a service via systemctl (how not tested it though). But that is another topic.
This script polls the Roborock device at a predefined interval (can be editied), default every 60 seconds.
Your ip-adress to Roborock, and its token, needs to edited in the file. Homeassistant documentation gives a guide how to retrieve the token and ip-adress of the device.

roborock_poll.py
This script can be called at any time. It communicates with the running roborock_daemon, and reads the data retrieved from the daemon without any waiting times. It also is able to send some basic commands to the device such as when the user wants to start cleaning, stop cleaning, or let the Roborock return to the docking station.

Virtual entitys must be created to poll the data via the roborock_poll.y
Here is an example from the configuration.yaml file:

sensor:
  - platform: command_line
    name: Roborock Charge
    command: "/srv/homeassistant/bin/python3 ~/roborock_poll.py battery"
    unit_of_measurement: "%"
    scan_interval: 60

  - platform: command_line
    name: Roborock Status
    command: "/srv/homeassistant/bin/python3 ~/roborock_poll.py status"
    scan_interval: 60

  - platform: command_line
    name: Roborock Is Cleaning
    command: "/srv/homeassistant/bin/python3 ~/roborock_poll.py is_cleaning"
    scan_interval: 60

  - platform: command_line
    name: Roborock Is Paused
    command: "/srv/homeassistant/bin/python3 ~/roborock_poll.py is_paused"
    scan_interval: 60

  - platform: command_line
    name: Roborock Is Connected
    command: "/srv/homeassistant/bin/python3 ~/roborock_poll.py is_connected"
    scan_interval: 60

switch:
  - platform: command_line
    switches:
      roborock_resume_start_cleaning:
        command_state: 1
        command_on: "/srv/homeassistant/bin/python3 ~/roborock_poll.py clean"

      roborock_pause_cleaning:
        command_state: 1
        command_on: "/srv/homeassistant/bin/python3 ~/roborock_poll.py pause"

      roborock_return_home:
        command_state: 1
        command_on: "/srv/homeassistant/bin/python3 ~/roborock_poll.py return_home"

Here is another example of automations.yaml, where I stop charing the Roborock if battery-percent exceeds 62%:

  mode: single
- id: '1607185045358'
  alias: Roborock_Charging_Stop
  description: ''
  trigger:
  - platform: numeric_state
    entity_id: sensor.roborock_charge
    above: '62'
    below: '100'
  condition:
  - condition: state
    entity_id: sensor.roborock_is_paused
    state: 'False'
  - condition: and
    conditions:
    - condition: state
      entity_id: sensor.roborock_is_cleaning
      state: 'False'
  - condition: and
    conditions:
    - condition: state
      entity_id: sensor.roborock_is_connected
      state: 'True'
  action:
  - type: turn_off
    device_id: 3dedb7f1e80118e1e08c5f59fb03cbb8
    entity_id: switch.greenwave_reality_inc_powernode_6_port_switch_4
    domain: switch
  mode: single
- id: '1607190717535'
  alias: Roborock_Charging_Start
  description: ''
  trigger:
  - platform: state
    entity_id: sensor.roborock_is_cleaning
    from: 'False'
    to: 'True'
  condition: []
  action:
  - type: turn_on
    device_id: 3dedb7f1e80118e1e08c5f59fb03cbb8
    entity_id: switch.greenwave_reality_inc_powernode_6_port_switch_4
    domain: switch
  mode: single

I am unable to upload the scripts on the forum, but I will add this scripts in following posts.

roborock_daemon.py

import logging
import threading
import time
from threading import Lock
from multiprocessing.managers import BaseManager
import sys
from miio import Vacuum

pollInterval = 30
vac = Vacuum("ip-adress", "token")
mutex = Lock()
vacuum_status="No Connection"
vacuum_status_code = -1
vacuum_battery=0
vacuum_is_cleaning = False
vacuum_is_paused = False
vacuum_is_connected = False

class RemoteOperations:
    def getBattery(self):
        mutex.acquire()
        rData = vacuum_battery
        mutex.release()
        return rData
		
    def getStatus(self):
        mutex.acquire()
        rData = vacuum_status
        mutex.release()
        return rData

    def getStatusCode(self):
        mutex.acquire()
        rData = vacuum_status_code
        mutex.release()
        return rData

    def isCleaning(self):
        mutex.acquire()
        rData = vacuum_is_cleaning
        mutex.release()
        return rData

    def isPaused(self):
        mutex.acquire()
        rData = vacuum_is_paused
        mutex.release()
        return rData
    
    def startCleaning(self):
        try:
            vac.start()
            return 1
        except:
            return 0

    def resumeStartCleaning(self):
        try:
            vac.resume_or_start()
            return 1
        except:
            return 0

    def stopCleaning(self):
        try:
            vac.stop()
            return 1
        except:
            return 0

    def pauseCleaning(self):
        try:
            vac.pause()
            return 1
        except:
            return 0

    def returnHome(self):
        try:
            vac.home()
            return 1
        except:
            return 0

    def isConnected(self):
        mutex.acquire()
        rData = vacuum_is_connected
        mutex.release()
        return rData


class RemoteManager(BaseManager):
	pass

def thread_function(name):
    global vacuum_status
    global vacuum_battery
    global vacuum_status_code
    global vacuum_is_cleaning
    global vacuum_is_paused
    global vacuum_is_connected

    while True:
        try:
            vacStatus = vac.status()
        except:
            mutex.acquire()
            vacuum_status="No Connection"
            vacuum_is_connected = False
            mutex.release()
            time.sleep(pollInterval)
            continue
	
        mutex.acquire()
        vacuum_status = vacStatus.state
        vacuum_status_code = vacStatus.state_code
        vacuum_battery = vacStatus.battery
        vacuum_is_cleaning = vacStatus.is_on
        vacuum_is_paused = vacStatus.is_paused
        vacuum_is_connected = True
        mutex.release()
        time.sleep(pollInterval)

if __name__ == "__main__":
	x = threading.Thread(target=thread_function, args=(1,))
	x.start()

	RemoteManager.register('RemoteOperations', RemoteOperations)
	manager = RemoteManager(address=('', 12345), authkey=b'secret')
	manager.get_server().serve_forever()

	while True:
		pass

	x.join()

roborock_poll.py

import sys
from multiprocessing.managers import BaseManager

class RemoteManager(BaseManager):
	pass

if __name__ == '__main__':
    if len(sys.argv) != 2:
        print("Err cmd args")
        quit()

    try:
        RemoteManager.register('RemoteOperations')
        manager = RemoteManager(address=('localhost', 12345), authkey=b'secret')
        manager.connect()
        remoteops = manager.RemoteOperations()
        if sys.argv[1] == "battery":
            print(remoteops.getBattery())
        elif sys.argv[1] == "status":
            print(remoteops.getStatus())
        elif sys.argv[1] == "status_code":
            print(remoteops.getStatusCode())
        elif sys.argv[1] == "clean":
            print(remoteops.resumeStartCleaning())
        elif sys.argv[1] == "pause":
            print(remoteops.pauseCleaning())
        elif sys.argv[1] == "return_home":
            print(remoteops.returnHome())
        elif sys.argv[1] == "is_cleaning":
            print(remoteops.isCleaning())
        elif sys.argv[1] == "is_paused":
            print(remoteops.isPaused())
        elif sys.argv[1] == "is_connected":
            print(remoteops.isConnected())
        else:
            print("Err cmd")
    except:
        print("Err remotemanager")
1 Like

Thank you for sharing… this example helped me accomplish something similar.