Im building a component where I use a Synch call to write data to a cloud DB. And Im getting an error when calling the self.client.ingest_from_stream. I cant figure out what the “Target” parameter is for in await hass.async_add_executor_job, and what I need to do to provide it (I imagine “none” is not enough) or what I can do to get this to work
Error:
function’ object has no attribute ‘loop
Code:
async def test_connection(self) -> None:
"""Making synch call asynch"""
await self.hass.async_add_executor_job(self.test_connection_synch, Target)
return None
def test_connection_synch(self) -> None:
"""Test connection, will throw Exception when it cannot connect."""
ingestion_properties = IngestionProperties(
database=self.database,
table=self.table,
data_format=DataFormat.CSV,
)
csvStr = "2019-05-02 15:23:50.0369439,1,2,3,4"
try:
bytes_stream = io.StringIO(csvStr)
stream_descriptor = StreamDescriptor(bytes_stream)
self.client.ingest_from_stream(
stream_descriptor, ingestion_properties=ingestion_properties
)
except Exception as e:
return Exception
return None
koying
(Chris B)
January 13, 2022, 11:53am
2
self.hass.async_add_executor_job
takes the name of the function and a *kwargs of the parameters that will be passed to this function.
In your case, you just have to pass self
await self.hass.async_add_executor_job(self.test_connection_synch, self)
Thank you for helping, however it did not solve my issue:
Traceback (most recent call last):
File "/workspaces/core/homeassistant/components/azure_data_explorer/client.py", line 77, in test_connection
await self.hass.async_add_executor_job(self.test_connection_synch, self)
File "/workspaces/core/homeassistant/core.py", line 430, in async_add_executor_job
task = self.loop.run_in_executor(None, target, *args)
AttributeError: 'function' object has no attribute 'loop'
For refrence, here is the entire file
"""Setting up the ingest client"""
from __future__ import annotations
from dataclasses import dataclass
import logging
import io
import pandas
import asyncio
import traceback
from homeassistant.core import HomeAssistant
from azure.kusto.data import KustoConnectionStringBuilder
from azure.kusto.data.data_format import DataFormat
_LOGGER = logging.getLogger(__name__)
from azure.kusto.ingest import (
QueuedIngestClient,
IngestionProperties,
# FileDescriptor,
# BlobDescriptor,
StreamDescriptor,
KustoStreamingIngestClient,
# ManagedStreamingIngestClient,
# IngestionStatus,
)
from .const import (
CONF_ADX_CLUSTER_INGEST_URI,
CONF_ADX_DATABASE_NAME,
CONF_ADX_TABLE_NAME,
CONF_APP_REG_ID,
CONF_APP_REG_SECRET,
CONF_AUTHORITY_ID,
CONF_MAX_DELAY,
CONF_SEND_INTERVAL,
DEFAULT_OPTIONS,
DOMAIN,
)
@dataclass
class MyAzureDataExplorerClient:
"""Class for Azure Data Explorer Client."""
def __init__(self, **kwargs) -> None: # <---Name arguments
"""Create the right class."""
self.hass = HomeAssistant
# URIs
clusterIngestURI = "https://xxx.westeurope.kusto.windows.net"
self.database = "HomeAssistant"
self.table = "Events"
# Auth
# In case you want to authenticate with AAD application.
client_id = "b5253dxxxxxxxxxxxx8491ba2a1f"
client_secret = "AZP7Q~zqKxxxxxxxxxxxQEa9BgVYK"
# read more at https://docs.microsoft.com/en-us/onedrive/find-your-office-365-tenant-id
authority_id = "72f988bxxxxxxxxxxxx11db47"
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
clusterIngestURI, client_id, client_secret, authority_id
)
# self.client = QueuedIngestClient(kcsb)
self.client = KustoStreamingIngestClient(kcsb)
async def test_connection(self) -> None:
"""Making synch call asynch"""
try:
_LOGGER.error("Trying to call synch")
await self.hass.async_add_executor_job(self.test_connection_synch, self)
except Exception as e:
_LOGGER.error("Failed to call synch")
_LOGGER.error(e)
_LOGGER.error(traceback.print_exc())
return Exception
_LOGGER.error("OK to call synch")
return None
def test_connection_synch(self) -> None:
"""Test connection, will throw Exception when it cannot connect."""
_LOGGER.error("*************Testing client******************")
# Database
ingestion_properties = IngestionProperties(
database=self.database,
table=self.table,
data_format=DataFormat.CSV,
)
csvStr = "2019-05-02 15:23:50.0369439,1,2,3,4"
try:
bytes_stream = io.StringIO(csvStr)
stream_descriptor = StreamDescriptor(bytes_stream)
self.client.ingest_from_stream(
stream_descriptor, ingestion_properties=ingestion_properties
)
except Exception as e:
_LOGGER.error("*************Error connection to ADX******************")
_LOGGER.error(e)
return Exception
_LOGGER.info("*************OK conected to ADX******************")
return None
koying
(Chris B)
January 13, 2022, 4:08pm
5
You have to use the homeassistant instance , that you get from, e.g., async_setup_entry
Thank you very much, this worked