Calling synch function with I/O in a new component

Im building a component where I use a Synch call to write data to a cloud DB. And Im getting an error when calling the self.client.ingest_from_stream. I cant figure out what the “Target” parameter is for in await hass.async_add_executor_job, and what I need to do to provide it (I imagine “none” is not enough) or what I can do to get this to work

Error:
function’ object has no attribute ‘loop

Code:

async def test_connection(self) -> None:
        """Making synch call asynch"""
        await self.hass.async_add_executor_job(self.test_connection_synch, Target)
        return None

def test_connection_synch(self) -> None:
        """Test connection, will throw Exception when it cannot connect."""

        ingestion_properties = IngestionProperties(
            database=self.database,
            table=self.table,
            data_format=DataFormat.CSV,
        )

        csvStr = "2019-05-02 15:23:50.0369439,1,2,3,4"

        try:
            bytes_stream = io.StringIO(csvStr)
            stream_descriptor = StreamDescriptor(bytes_stream)

            self.client.ingest_from_stream(
                stream_descriptor, ingestion_properties=ingestion_properties
            )

        except Exception as e:
            return Exception
        return None

self.hass.async_add_executor_job takes the name of the function and a *kwargs of the parameters that will be passed to this function.

In your case, you just have to pass self

await self.hass.async_add_executor_job(self.test_connection_synch, self)

Thank you for helping, however it did not solve my issue:


Traceback (most recent call last):
  File "/workspaces/core/homeassistant/components/azure_data_explorer/client.py", line 77, in test_connection
    await self.hass.async_add_executor_job(self.test_connection_synch, self)
  File "/workspaces/core/homeassistant/core.py", line 430, in async_add_executor_job
    task = self.loop.run_in_executor(None, target, *args)
AttributeError: 'function' object has no attribute 'loop'

For refrence, here is the entire file

"""Setting up the ingest client"""
from __future__ import annotations

from dataclasses import dataclass
import logging

import io
import pandas
import asyncio
import traceback

from homeassistant.core import HomeAssistant

from azure.kusto.data import KustoConnectionStringBuilder
from azure.kusto.data.data_format import DataFormat

_LOGGER = logging.getLogger(__name__)


from azure.kusto.ingest import (
    QueuedIngestClient,
    IngestionProperties,
    # FileDescriptor,
    # BlobDescriptor,
    StreamDescriptor,
    KustoStreamingIngestClient,
    # ManagedStreamingIngestClient,
    # IngestionStatus,
)

from .const import (
    CONF_ADX_CLUSTER_INGEST_URI,
    CONF_ADX_DATABASE_NAME,
    CONF_ADX_TABLE_NAME,
    CONF_APP_REG_ID,
    CONF_APP_REG_SECRET,
    CONF_AUTHORITY_ID,
    CONF_MAX_DELAY,
    CONF_SEND_INTERVAL,
    DEFAULT_OPTIONS,
    DOMAIN,
)


@dataclass
class MyAzureDataExplorerClient:
    """Class for Azure Data Explorer Client."""

    def __init__(self, **kwargs) -> None:  # <---Name arguments
        """Create the right class."""
        self.hass = HomeAssistant

        # URIs
        clusterIngestURI = "https://xxx.westeurope.kusto.windows.net"

        self.database = "HomeAssistant"
        self.table = "Events"

        # Auth
        # In case you want to authenticate with AAD application.
        client_id = "b5253dxxxxxxxxxxxx8491ba2a1f"
        client_secret = "AZP7Q~zqKxxxxxxxxxxxQEa9BgVYK"

        # read more at https://docs.microsoft.com/en-us/onedrive/find-your-office-365-tenant-id
        authority_id = "72f988bxxxxxxxxxxxx11db47"

        kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
            clusterIngestURI, client_id, client_secret, authority_id
        )
        # self.client = QueuedIngestClient(kcsb)
        self.client = KustoStreamingIngestClient(kcsb)

    async def test_connection(self) -> None:
        """Making synch call asynch"""
        try:
            _LOGGER.error("Trying to call synch")
            await self.hass.async_add_executor_job(self.test_connection_synch, self)
        except Exception as e:
            _LOGGER.error("Failed to call synch")
            _LOGGER.error(e)
            _LOGGER.error(traceback.print_exc())
            return Exception
        _LOGGER.error("OK to call synch")
        return None

    def test_connection_synch(self) -> None:
        """Test connection, will throw Exception when it cannot connect."""

        _LOGGER.error("*************Testing client******************")

        # Database

        ingestion_properties = IngestionProperties(
            database=self.database,
            table=self.table,
            data_format=DataFormat.CSV,
        )

        csvStr = "2019-05-02 15:23:50.0369439,1,2,3,4"

        try:
            bytes_stream = io.StringIO(csvStr)
            stream_descriptor = StreamDescriptor(bytes_stream)

            self.client.ingest_from_stream(
                stream_descriptor, ingestion_properties=ingestion_properties
            )

        except Exception as e:
            _LOGGER.error("*************Error connection to ADX******************")
            _LOGGER.error(e)
            return Exception

        _LOGGER.info("*************OK conected to ADX******************")
        return None

You have to use the homeassistant instance, that you get from, e.g., async_setup_entry

Thank you very much, this worked