Implementing support for OTA upgrades would practically eliminate the need for u…nnecessary hubs. Below is the event listener I used to successfully upgrade six IKEA Trådfri 1000lm bulbs (copied from an [unrelated issue](https://github.com/zigpy/zigpy/pull/150#issuecomment-489353657)):
```Python
import asyncio
import logging
import contextlib
from collections import defaultdict
import aiohttp
from zigpy.zcl.foundation import Status
from zigpy.zcl.clusters.general import Ota
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class HerdLock:
'''
A lock that indicates to the caller if they are the first to acquire it.
If not, the context manager waits until the lock is released. Used like this:
lock = HerdLock()
expensive = None
async def worker():
async with lock('LOCK_NAME') as is_first:
if is_first:
print('Actually performing task')
await asyncio.sleep(5)
expensive = 'got it'
# Here we can assume `expensive` is set and that only one worker actually fetched it
print(expensive)
await asyncio.gather(worker(), worker(), worker(), worker())
'''
def __init__(self):
self.semaphores = defaultdict(asyncio.Semaphore)
@contextlib.asynccontextmanager
async def __call__(self, key):
semaphore = self.semaphores[key]
try:
is_first = not semaphore.locked()
async with semaphore:
yield is_first
finally:
# Delete the lock if we're the last one to release it
if not semaphore.locked():
self.semaphores.pop(key, None)
class TrådfriOTAMainListener:
UPDATE_URL = 'https://fw.ota.homesmart.ikea.net/feed/version_info.json'
OTA_HEADER = 0x0BEEF11E.to_bytes(4, 'little')
MAXIMUM_DATA_SIZE = 40
def __init__(self):
self._firmware_cache = {}
self._lock = HerdLock()
def cluster_command(self, tsn, command_id, args, ota_cluster):
asyncio.create_task(self._cluster_command(tsn, command_id, args, ota_cluster))
async def _cluster_command(self, tsn, command_id, args, ota_cluster):
if not self._firmware_cache:
logger.info('Downloading firmware update list from IKEA')
async with self._lock('DOWNLOAD_VERSION_INFO') as was_first:
if was_first:
async with aiohttp.ClientSession() as client:
async with client.get(self.UPDATE_URL) as response:
firmwares = await response.json(content_type='application/octet-stream')
self._firmware_cache.clear()
for fw in firmwares:
if 'fw_file_version_MSB' not in fw:
continue
fw['fw_file_version'] = (fw['fw_file_version_MSB'] << 16) | fw['fw_file_version_LSB']
self._firmware_cache[(fw['fw_manufacturer_id'], fw['fw_image_type'])] = fw
if command_id == 0x0001: # query_next_image
field_control, manufacturer_code, image_type, current_file_version, hardware_version = args
key = (manufacturer_code, image_type)
logger.info('Received an OTA image query from %s', ota_cluster.endpoint.device)
# We don't know what this device is
if key not in self._firmware_cache:
await ota_cluster.query_next_image_response(Status.NO_IMAGE_AVAILABLE, 0x0000, 0x0000, 0x00000000, 0x00000000)
return
fw = self._firmware_cache[key]
# Tell the device we're ready to continue
await ota_cluster.query_next_image_response(Status.SUCCESS, manufacturer_code, image_type, fw['fw_file_version'], fw['fw_filesize'])
elif command_id == 0x0003: # image_block
field_control, manufacturer_code, image_type, \
file_version, file_offset, maximum_data_size, \
request_node_address, block_request_delay = args
# We assume at this point we won't be getting unsolicited requests for blocks
key = (manufacturer_code, image_type)
fw = self._firmware_cache[key]
# Download the firmware on demand
if 'fw_data' not in fw:
async with self._lock(f'DOWNLOAD_FW:{fw["fw_binary_url"]}') as was_first:
if was_first:
async with aiohttp.ClientSession() as client:
async with client.get(fw['fw_binary_url']) as response:
data = await response.read()
# The IKEA images wrap the Zigbee OTA file in a container
offset = data.index(self.OTA_HEADER)
fw['fw_data'] = data[offset:offset + fw['fw_filesize']]
assert len(fw['fw_data']) == fw['fw_filesize']
logger.debug('Firmware upgrade progress: %0.2d', 100.0 * file_offset / fw['fw_filesize'])
data = fw['fw_data'][file_offset:file_offset + min(self.MAXIMUM_DATA_SIZE, maximum_data_size)]
await ota_cluster.image_block_response(Status.SUCCESS, manufacturer_code, image_type, file_version, file_offset, data)
elif command_id == 0x0006: # upgrade_end
status, manufacturer_code, image_type, file_version = args
# Upgrade right now
await ota_cluster.upgrade_end_response(manufacturer_code, image_type, file_version, 0x00000000, 0x00000000)
class TrådfriOTAListener:
def __init__(self, ota_cluster, main_listener):
self.ota_cluster = ota_cluster
self.main_listener = main_listener
def cluster_command(self, tsn, command_id, args):
logger.info('Received an OTA cluster command from %s', self.ota_cluster.endpoint.device)
self.main_listener.cluster_command(tsn, command_id, args, self.ota_cluster)
```
The Trådfri remotes [seem to work too](https://github.com/zigpy/zigpy/pull/150#issuecomment-491389343). Unfortunately, I was not able to get my RGB bulbs to upgrade despite a newer firmware version being available so I believe further device quirks will need to be handled by https://github.com/dmulcahey/zha-device-handlers. The core OTA functionality, however, should belong in zigpy since it doesn't depend on any specific adapter interface and can basically be a transcription of the process described in the [Zigbee specification](http://www.zigbee.org/wp-content/uploads/2014/11/docs-09-5264-23-00zi-zigbee-ota-upgrade-cluster-specification.pdf) (see pages 27+).
What do you think?