Custom Integration - Firestore - Aquarite

Hello everyone,

I’m working on a new integration, I’m strungling to find a good logic to re-auth to the API having only an API key. Client is towards Google and Firestore.

Here is my current way of doing.

I’ve a routine that way for token expiration and then re-trigger a login. Problem is

self.handlers.append((pool_id, handler))

where at each re-login the handlers will grow. I can solve that, but is there not a better way with home assistant helpers or others to manage a re-login following a token expiration ? I went over the doc, and only came up with the below. thanks for the guidance !

for now, I can just limit my handlers with

    max_size = 10
    self.handlers = self.handlers[-max_size:]

here is the current code

    @classmethod
    async def create(cls, aiohttp_session, username, password):
        instance = cls(aiohttp_session, username, password)
        await instance.signin()
        asyncio.create_task(instance.start_token_refresh_routine())
        return instance

    async def start_token_refresh_routine(self):
        while True:
            try:
                await self.ensure_active_token()
                await asyncio.sleep(self.calculate_sleep_duration())
            except Exception as e:
                _LOGGER.error(f"Error maintaining token: {str(e)}")
                break

    def calculate_sleep_duration(self):
        time_to_expiry = (self.expiry - datetime.datetime.now()).total_seconds()
        return max(time_to_expiry - 300, 10)

    async def get_token_and_expiry(self):
        """Fetch token and expiry using Google API."""
        url = f"{GOOGLE_IDENTITY_REST_API}:signInWithPassword?key={API_KEY}"
        headers = {"Content-Type": "application/json; charset=UTF-8"}
        data = json.dumps({
            "email": self.username,
            "password": self.password,
            "returnSecureToken": True
        })
        resp = await self.aiohttp_session.post(url, headers=headers, data=data)
        if resp.status == 400:
            raise UnauthorizedException("Failed to authenticate.")
        self.tokens = await resp.json()
        self.expiry = datetime.datetime.now() + datetime.timedelta(seconds=int(self.tokens["expiresIn"]))
        self.credentials = Credentials(token=self.tokens['idToken'])
        self.client = Client(project="hayward-europe", credentials=self.credentials)
        if hasattr(self, 'handlers') and self.handlers:
            for pool_id, handler in self.handlers:
                await self.subscribe(pool_id, handler)

    async def ensure_active_token(self):
        """Ensure that the token is still valid, and refresh it if necessary."""
        _LOGGER.debug(f"Token check, {datetime.datetime.now()} {self.expiry}...")
        if datetime.datetime.now() >= (self.expiry - datetime.timedelta(minutes=5)): 
            _LOGGER.info("Token expired, refreshing...")
            await self.get_token_and_expiry()

    async def signin(self):
        """Sign in and set the tokens and expiry."""
        await self.get_token_and_expiry()

    async def subscribe(self, pool_id, handler) -> None:
        doc_ref = self.client.collection("pools").document(pool_id)
        def on_snapshot(doc_snapshot, changes, read_time):
            """Handles document snapshots."""
            try:
                for change in changes:
                    _LOGGER.debug(f"Received change {change.type} in firestore")
                for doc in doc_snapshot:
                    try:
                        handler(doc)
                    except Exception as handler_error:
                        _LOGGER.error(f"Error executing handler: {handler_error}")
            except Exception as e:
                _LOGGER.error(f"Error in on_snapshot: {e}")
        doc_ref.on_snapshot(on_snapshot)
        self.handlers.append((pool_id, handler))