I don’t yet have a component for it, but can connect to the cube from a laptop and receive feedback. It should be possible to wrap this up into a HA component and connect to it via an esphome bluetooth proxy:
connection_manager.py
import asyncio
import logging
from typing import Callable
from bleak import BleakClient
from bleak.backends.device import BLEDevice
from bleak.exc import BleakError
from errors import CubeConnectionError
_LOGGER = logging.getLogger(__name__)
PRIMARY_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
RX_CHARACTERISTIC_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
TX_CHARACTERISTIC_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"
CONFIGURATION_COMMANDS = {
"Reboot": bytearray([0x34]),
"SetSolvedState": bytearray([0x35]),
"DisableOrientation": bytearray([0x37]),
"EnableOrientation": bytearray([0x38]),
"LedFlash": bytearray([0x41]),
"LedToggleAnimation": bytearray([0x42]),
"LedFlashSlow": bytearray([0x43]),
"LedToggle": bytearray([0x44]),
"CalibrateOrientation": bytearray([0x57]),
"GetBattery": bytearray([0x32]),
"GetState": bytearray([0x33]),
"GetStats": bytearray([0x39]),
"GetCubeType": bytearray([0x56]),
}
FACE_ROTATION_MAP = {
0x00: "Blue Clockwise",
0x01: "Blue Counterclockwise",
0x02: "Green Clockwise",
0x03: "Green Counterclockwise",
0x04: "White Clockwise",
0x05: "White Counterclockwise",
0x06: "Yellow Clockwise",
0x07: "Yellow Counterclockwise",
0x08: "Red Clockwise",
0x09: "Red Counterclockwise",
0x0A: "Orange Clockwise",
0x0B: "Orange Counterclockwise",
}
COLOR_HEX_LOOKUP = {
0x00: "Blue",
0x01: "Green",
0x02: "White",
0x03: "Yellow",
0x04: "Red",
0x05: "Orange",
}
class ConnectionManager:
"""Manages the connection state to the GoCube."""
def __init__(
self,
connect_callback: Callable[[BleakClient], None],
disconnect_callback: Callable[[], None],
max_retries: int = 3, # Adjust the number of retry attempts as needed
retry_interval: float = 1.0, # Adjust the interval between retries as needed (in seconds)
):
"""Initialize the ConnectionManager."""
self._connect_callback = connect_callback
self._disconnect_callback = disconnect_callback
self._cube_client: BleakClient = None
self._max_retries = max_retries
self._retry_interval = retry_interval
async def connect(self, ble_device: BLEDevice) -> None:
"""Connect to the GoCube with retry logic."""
retry_count = 0
while retry_count < self._max_retries:
try:
self._cube_client = BleakClient(ble_device)
_LOGGER.info(
"Attempting to establish connection with GoCube (attempt %d/%d)...",
retry_count + 1,
self._max_retries,
)
await self._cube_client.connect()
_LOGGER.info("Connected to GoCube!")
await self.send_command("DisableOrientation")
await self.enable_notifications()
return
except BleakError as ex:
_LOGGER.error(f"Failed to connect to GoCube: {ex}")
except Exception as e:
_LOGGER.error(f"An error occurred during connection: {e}")
retry_count += 1
await asyncio.sleep(self._retry_interval)
raise CubeConnectionError(
f"Failed to connect to GoCube after {self._max_retries} attempts"
)
async def disconnect(self) -> None:
"""Disconnect from the GoCube."""
_LOGGER.info("Disconnecting from GoCube...")
try:
if self._cube_client is None:
_LOGGER.warning("Already disconnected")
return
await self._cube_client.disconnect()
finally:
self._cube_client = None
async def send_command(self, command_name: str) -> bytearray:
"""Send a configuration command to the cube and return the response."""
if command_name not in CONFIGURATION_COMMANDS:
_LOGGER.error("Invalid command name.")
return bytearray() # or any default value
command = CONFIGURATION_COMMANDS[command_name]
try:
response = await self._cube_client.write_gatt_char(
RX_CHARACTERISTIC_UUID, command
)
if response is not None:
_LOGGER.info(f"Response {response}")
return response
else:
return
except BleakError as ex:
_LOGGER.error(f"Failed to send command '{command_name}': {ex}")
return bytearray() # or any default value
async def enable_notifications(self) -> None:
"""Enable notifications from the GoCube."""
try:
await self._cube_client.start_notify(
TX_CHARACTERISTIC_UUID, self._notification_handler
)
_LOGGER.info("Notifications enabled")
except BleakError as ex:
_LOGGER.error(f"Failed to enable notifications: {ex}")
def _notification_handler(self, sender: int, data: bytearray):
"""Handle notifications."""
_LOGGER.info(f"Received notification from {sender}: {data.hex()}")
message_type = data[2] if len(data) > 2 else None
match message_type:
case 0x01:
self._handle_rotation_message(sender, data)
# Fetch cube state every time a rotation notification is received
asyncio.create_task(self.send_command("GetState"))
case 0x02:
self._handle_state_message(sender, data)
case 0x05:
self._handle_battery_message(sender, data)
case _:
self._handle_default_message(sender, data)
def _handle_rotation_message(self, sender: int, data: bytearray):
"""Handle rotation message."""
face_rotation = data[3]
# orientation = data[4]
face_rotation_desc = FACE_ROTATION_MAP.get(face_rotation, "Unknown")
_LOGGER.info(f"Rotation: Face Rotation: {face_rotation_desc}")
def _handle_battery_message(self, sender: int, data: bytearray):
"""Handle battery message."""
battery_level = data[3]
checksum = sum(data[:4]) % 0x100
if checksum == data[4]:
_LOGGER.info(f"The battery is at {battery_level}%")
else:
raise ValueError("Invalid battery status checksum")
def _handle_state_message(self, sender: int, data: bytearray):
"""Handle state message."""
if len(data) >= 60:
state_data = data[3:] # Start parsing from offset 3
face_data = [state_data[i : i + 9] for i in range(0, 54, 9)]
all_faces_solved = all(
self.is_face_solved([COLOR_HEX_LOOKUP.get(byte) for byte in colors[1:]])
for colors in face_data
)
if all_faces_solved:
_LOGGER.info("Cube Solved!")
else:
_LOGGER.info("Cube Not Solved Yet.")
for i, colors in enumerate(face_data):
face_name = list(COLOR_HEX_LOOKUP.values())[i]
face_solved = self.is_face_solved(
[COLOR_HEX_LOOKUP.get(byte) for byte in colors[1:]]
)
if face_solved:
_LOGGER.info("%s Face Solved: %s", face_name, face_solved)
else:
_LOGGER.info(
"%s Face: %s",
face_name,
[COLOR_HEX_LOOKUP.get(byte) for byte in colors[1:]],
)
else:
_LOGGER.warning("Invalid state message format: %s", data)
def _handle_default_message(self, sender: int, data: bytearray):
"""Handle default message."""
_LOGGER.info(f"Notification received from {sender}: {data}")
def is_face_solved(self, face_colors):
return all(color == face_colors[0] for color in face_colors)
main.py
import asyncio
import logging
from bleak import BleakScanner
from connection_manager import ConnectionManager
# Set up logging
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
async def main():
# Initialize connection manager
connection_manager = ConnectionManager(
connect_callback=handle_connect, disconnect_callback=handle_disconnect
)
# Discover GoCube devices
_LOGGER.info("Scanning for GoCube devices...")
devices = await BleakScanner.discover()
for device in devices:
if device.name is not None and device.name.startswith("GoCube"):
_LOGGER.info("Found GoCube device: %s", device)
try:
# Connect to the GoCube device
await connection_manager.connect(ble_device=device)
await asyncio.sleep(30) # Adjust the sleep duration as needed
except Exception as ex:
_LOGGER.error("Failed to connect to GoCube device: %s", ex)
finally:
await connection_manager.disconnect()
async def handle_connect(client):
_LOGGER.info("Connected to GoCube!")
# Subscribe to notifications
await client.enable_notifications()
await client.send_command("GetState")
async def handle_disconnect():
_LOGGER.info("Disconnected from GoCube!")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())