Error when using Apache Kafka integration

Hi all, I’m trying out the Kafka integration and am getting the following error:

Traceback (most recent call last):
  File "/usr/src/homeassistant/homeassistant/components/apache_kafka/__init__.py", line 142, in write
    await self._producer.send_and_wait(self._topic, payload)
  File "/usr/local/lib/python3.8/site-packages/aiokafka/producer/producer.py", line 450, in send_and_wait
    future = await self.send(
  File "/usr/local/lib/python3.8/site-packages/aiokafka/producer/producer.py", line 433, in send
    key_bytes, value_bytes = self._serialize(topic, key, value)
  File "/usr/local/lib/python3.8/site-packages/aiokafka/producer/producer.py", line 341, in _serialize
    self._producer_magic)
AttributeError: 'AIOKafkaProducer' object has no attribute '_producer_magic'

I believe I have it configured correctly, and I know the topic I’m trying to publish to exists. (I can write to it manually, etc) I’ve tried filtering specific entities as well as not filtering at all, but it makes no difference - same error every time. I’m not using SSL either. Thanks for any help

Ok, may have solved this. The error in the stack trace is a little misleading, but I think the core problem is that the Kafka integration in HA is using null keys, and the topic I was using is compacted - which requires keys. I created a new non-compacted topic and now it’s publishing messages. It would be cool if we could specify a key, like maybe an entity_id or a timestamp.

I am, however, now seeing other serialization errors like this:

"TypeError: Object of type set is not JSON serializable"

Not sure yet which entity(s) is triggering this, but something must have a collection that isn’t able to be serialized. Probably something the integration should be handling, but maybe there aren’t enough people using this to make it a priority. (or maybe it’s something specific to my setup but I don’t think so)

This should be fixed in 2024.8 release