GoCube Support (Smart Rubiks Rube)

I just got my awesome new smart Rubiks cube, the “GoCube” (https://www.getgocube.com/). It connects to your smartphone via Bluetooth, and after playing around with it I realized that this could be a really cool controller for Home Assistant! (e.g. open a lock by solving the cube; dimming lights by turning it; switching between colors etc etc… In the Kickstarter they promissed an open API and according to their support:

"The API currently isn’t available now.

The API is indeed planned for “phase 2” (which we hope to initiate very soon). We will certainly post an update when it’s ready - Stay Tuned!"

Since I’,m not a programmer or smart enough to put something together myself I’m throwing the ball up and hopefully someone will catch it :slight_smile:

I don’t yet have a component for it, but can connect to the cube from a laptop and receive feedback. It should be possible to wrap this up into a HA component and connect to it via an esphome bluetooth proxy:

connection_manager.py

import asyncio
import logging
from typing import Callable

from bleak import BleakClient
from bleak.backends.device import BLEDevice
from bleak.exc import BleakError

from errors import CubeConnectionError

_LOGGER = logging.getLogger(__name__)

PRIMARY_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
RX_CHARACTERISTIC_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
TX_CHARACTERISTIC_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"

CONFIGURATION_COMMANDS = {
    "Reboot": bytearray([0x34]),
    "SetSolvedState": bytearray([0x35]),
    "DisableOrientation": bytearray([0x37]),
    "EnableOrientation": bytearray([0x38]),
    "LedFlash": bytearray([0x41]),
    "LedToggleAnimation": bytearray([0x42]),
    "LedFlashSlow": bytearray([0x43]),
    "LedToggle": bytearray([0x44]),
    "CalibrateOrientation": bytearray([0x57]),
    "GetBattery": bytearray([0x32]),
    "GetState": bytearray([0x33]),
    "GetStats": bytearray([0x39]),
    "GetCubeType": bytearray([0x56]),
}

FACE_ROTATION_MAP = {
    0x00: "Blue Clockwise",
    0x01: "Blue Counterclockwise",
    0x02: "Green Clockwise",
    0x03: "Green Counterclockwise",
    0x04: "White Clockwise",
    0x05: "White Counterclockwise",
    0x06: "Yellow Clockwise",
    0x07: "Yellow Counterclockwise",
    0x08: "Red Clockwise",
    0x09: "Red Counterclockwise",
    0x0A: "Orange Clockwise",
    0x0B: "Orange Counterclockwise",
}

COLOR_HEX_LOOKUP = {
    0x00: "Blue",
    0x01: "Green",
    0x02: "White",
    0x03: "Yellow",
    0x04: "Red",
    0x05: "Orange",
}


class ConnectionManager:
    """Manages the connection state to the GoCube."""

    def __init__(
        self,
        connect_callback: Callable[[BleakClient], None],
        disconnect_callback: Callable[[], None],
        max_retries: int = 3,  # Adjust the number of retry attempts as needed
        retry_interval: float = 1.0,  # Adjust the interval between retries as needed (in seconds)
    ):
        """Initialize the ConnectionManager."""
        self._connect_callback = connect_callback
        self._disconnect_callback = disconnect_callback
        self._cube_client: BleakClient = None
        self._max_retries = max_retries
        self._retry_interval = retry_interval

    async def connect(self, ble_device: BLEDevice) -> None:
        """Connect to the GoCube with retry logic."""
        retry_count = 0
        while retry_count < self._max_retries:
            try:
                self._cube_client = BleakClient(ble_device)
                _LOGGER.info(
                    "Attempting to establish connection with GoCube (attempt %d/%d)...",
                    retry_count + 1,
                    self._max_retries,
                )
                await self._cube_client.connect()
                _LOGGER.info("Connected to GoCube!")
                await self.send_command("DisableOrientation")
                await self.enable_notifications()
                return
            except BleakError as ex:
                _LOGGER.error(f"Failed to connect to GoCube: {ex}")
            except Exception as e:
                _LOGGER.error(f"An error occurred during connection: {e}")
            retry_count += 1
            await asyncio.sleep(self._retry_interval)
        raise CubeConnectionError(
            f"Failed to connect to GoCube after {self._max_retries} attempts"
        )

    async def disconnect(self) -> None:
        """Disconnect from the GoCube."""
        _LOGGER.info("Disconnecting from GoCube...")
        try:
            if self._cube_client is None:
                _LOGGER.warning("Already disconnected")
                return
            await self._cube_client.disconnect()
        finally:
            self._cube_client = None

    async def send_command(self, command_name: str) -> bytearray:
        """Send a configuration command to the cube and return the response."""
        if command_name not in CONFIGURATION_COMMANDS:
            _LOGGER.error("Invalid command name.")
            return bytearray()  # or any default value

        command = CONFIGURATION_COMMANDS[command_name]
        try:
            response = await self._cube_client.write_gatt_char(
                RX_CHARACTERISTIC_UUID, command
            )
            if response is not None:
                _LOGGER.info(f"Response {response}")
                return response
            else:
                return
        except BleakError as ex:
            _LOGGER.error(f"Failed to send command '{command_name}': {ex}")
            return bytearray()  # or any default value

    async def enable_notifications(self) -> None:
        """Enable notifications from the GoCube."""
        try:
            await self._cube_client.start_notify(
                TX_CHARACTERISTIC_UUID, self._notification_handler
            )
            _LOGGER.info("Notifications enabled")
        except BleakError as ex:
            _LOGGER.error(f"Failed to enable notifications: {ex}")

    def _notification_handler(self, sender: int, data: bytearray):
        """Handle notifications."""
        _LOGGER.info(f"Received notification from {sender}: {data.hex()}")
        message_type = data[2] if len(data) > 2 else None
        match message_type:
            case 0x01:
                self._handle_rotation_message(sender, data)
                # Fetch cube state every time a rotation notification is received
                asyncio.create_task(self.send_command("GetState"))
            case 0x02:
                self._handle_state_message(sender, data)
            case 0x05:
                self._handle_battery_message(sender, data)
            case _:
                self._handle_default_message(sender, data)

    def _handle_rotation_message(self, sender: int, data: bytearray):
        """Handle rotation message."""
        face_rotation = data[3]
        # orientation = data[4]
        face_rotation_desc = FACE_ROTATION_MAP.get(face_rotation, "Unknown")
        _LOGGER.info(f"Rotation: Face Rotation: {face_rotation_desc}")

    def _handle_battery_message(self, sender: int, data: bytearray):
        """Handle battery message."""
        battery_level = data[3]
        checksum = sum(data[:4]) % 0x100
        if checksum == data[4]:
            _LOGGER.info(f"The battery is at {battery_level}%")
        else:
            raise ValueError("Invalid battery status checksum")

    def _handle_state_message(self, sender: int, data: bytearray):
        """Handle state message."""
        if len(data) >= 60:
            state_data = data[3:]  # Start parsing from offset 3
            face_data = [state_data[i : i + 9] for i in range(0, 54, 9)]

            all_faces_solved = all(
                self.is_face_solved([COLOR_HEX_LOOKUP.get(byte) for byte in colors[1:]])
                for colors in face_data
            )

            if all_faces_solved:
                _LOGGER.info("Cube Solved!")
            else:
                _LOGGER.info("Cube Not Solved Yet.")
                for i, colors in enumerate(face_data):
                    face_name = list(COLOR_HEX_LOOKUP.values())[i]
                    face_solved = self.is_face_solved(
                        [COLOR_HEX_LOOKUP.get(byte) for byte in colors[1:]]
                    )
                    if face_solved:
                        _LOGGER.info("%s Face Solved: %s", face_name, face_solved)
                    else:
                        _LOGGER.info(
                            "%s Face: %s",
                            face_name,
                            [COLOR_HEX_LOOKUP.get(byte) for byte in colors[1:]],
                        )

        else:
            _LOGGER.warning("Invalid state message format: %s", data)

    def _handle_default_message(self, sender: int, data: bytearray):
        """Handle default message."""
        _LOGGER.info(f"Notification received from {sender}: {data}")

    def is_face_solved(self, face_colors):
        return all(color == face_colors[0] for color in face_colors)

main.py

import asyncio
import logging
from bleak import BleakScanner
from connection_manager import ConnectionManager

# Set up logging
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)


async def main():
    # Initialize connection manager
    connection_manager = ConnectionManager(
        connect_callback=handle_connect, disconnect_callback=handle_disconnect
    )

    # Discover GoCube devices
    _LOGGER.info("Scanning for GoCube devices...")
    devices = await BleakScanner.discover()
    for device in devices:
        if device.name is not None and device.name.startswith("GoCube"):
            _LOGGER.info("Found GoCube device: %s", device)
            try:
                # Connect to the GoCube device
                await connection_manager.connect(ble_device=device)
                await asyncio.sleep(30)  # Adjust the sleep duration as needed
            except Exception as ex:
                _LOGGER.error("Failed to connect to GoCube device: %s", ex)
            finally:
                await connection_manager.disconnect()


async def handle_connect(client):
    _LOGGER.info("Connected to GoCube!")

    # Subscribe to notifications
    await client.enable_notifications()
    await client.send_command("GetState")


async def handle_disconnect():
    _LOGGER.info("Disconnected from GoCube!")


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(main())