I am tryng this approach for grab and send audio from microphone section in esphome:
microphone:
- platform: i2s_audio
i2s_audio_id: i2s_in
id: mic
adc_type: external
i2s_din_pin: GPIO23
pdm: false
on_data:
- lambda: |-
for (uint8_t byte : x) {
id(audio_buffer).push_back(byte);
}
if (id(audio_buffer).size() >= 512) {
int sock = ::socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in destination;
destination.sin_family = AF_INET;
destination.sin_port = htons(12345); // UDP receiver port
destination.sin_addr.s_addr = inet_addr("192.168.1.10"); // UDP receiver IP
::sendto(sock, id(audio_buffer).data(), id(audio_buffer).size(), 0, reinterpret_cast<sockaddr*>(&destination), sizeof(destination));
::close(sock);
id(audio_buffer).clear();
}
globals:
- id: is_capturing
type: bool
restore_value: no
initial_value: "false"
- id: audio_buffer
type: std::vector<int16_t>
restore_value: no
initial_value: 'std::vector<int16_t>()'
- id: sequence_number
type: uint32_t
restore_value: no
initial_value: '0'
binary_sensor:
- platform: esp32_touch
pin: GPIO4
threshold: 1000
name: Action
on_press:
then:
if:
condition:
lambda: "return !id(is_capturing);"
then:
- globals.set:
id: is_capturing
value: "true"
- microphone.capture: mic
- delay: 5s
- globals.set:
id: is_capturing
value: "false"
- microphone.stop_capture: mic
button:
- platform: template
name: "Cattura"
on_press:
- microphone.capture: mic
- delay: 5s
- microphone.stop_capture: mic
And receiver section:
/homeassistant/custom_components/audio_receiver/init.py
import asyncio
import logging
import wave
import os
from collections import deque
from datetime import datetime
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
DOMAIN = "audio_receiver"
_logger = logging.getLogger(__name__)
class UDPAudioReceiver:
def __init__(self, hass, host, port, save_path):
self.hass = hass
self.host = host
self.port = port
self.save_path = save_path
self.buffer = deque()
self.timeout_handle = None
async def start(self):
loop = asyncio.get_running_loop()
self.transport, _ = await loop.create_datagram_endpoint(
lambda: UDPProtocol(self),
local_addr=(self.host, self.port)
)
_logger.info(f"UDP audio receiver started on {self.host}:{self.port}")
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.stop)
async def stop(self, event):
self.transport.close()
_logger.info("UDP audio receiver stopped")
self.save_as_wav()
def save_as_wav(self):
if self.buffer:
timestamp = datetime.now().strftime("%H.%M")
file_path = os.path.join(self.save_path, f"audio-{timestamp}.wav")
_logger.info("Timeout reached, saving data...")
with wave.open(file_path, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(44100)
while self.buffer:
wav_file.writeframes(self.buffer.popleft())
_logger.info(f"Audio data saved to {file_path}")
else:
_logger.info("Timeout reached, but no data to save.")
class UDPProtocol(asyncio.DatagramProtocol):
def __init__(self, receiver):
self.receiver = receiver
def datagram_received(self, data, addr):
_logger.info(f"Data received from {addr}")
self.receiver.buffer.append(data)
if self.receiver.timeout_handle:
self.receiver.timeout_handle.cancel()
self.receiver.timeout_handle = asyncio.get_event_loop().call_later(10, self.receiver.save_as_wav)
async def async_setup(hass, config):
host = config[DOMAIN].get('host', '0.0.0.0')
port = config[DOMAIN].get('port', 12345)
save_path = config[DOMAIN].get('save_path', '/media/audio')
receiver = UDPAudioReceiver(hass, host, port, save_path)
hass.loop.create_task(receiver.start())
return True
And config_flow.py
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.core import callback
from . import DOMAIN
class AudioReceiverFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_PUSH
@staticmethod
@callback
def async_get_options_flow(config_entry):
return OptionsFlowHandler(config_entry)
async def async_step_user(self, user_input=None):
if user_input is not None:
return self.async_create_entry(title="Audio Receiver", data=user_input)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required("host", default="0.0.0.0"): str,
vol.Required("port", default=12345): int,
vol.Required("save_path", default="/media/audio"): str,
}
),
)
class OptionsFlowHandler(config_entries.OptionsFlow):
def __init__(self, config_entry):
self.config_entry = config_entry
async def async_step_init(self, user_input=None):
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
return self.async_show_form(
step_id="init",
data_schema=vol.Schema(
{
vol.Required("host", default=self.config_entry.options.get("host", "0.0.0.0")): str,
vol.Required("port", default=self.config_entry.options.get("port", 12345)): int,
vol.Required("save_path", default=self.config_entry.options.get("save_path", "/media/audio")): str,
}
),
)
configuration.yaml
audio_receiver:
host: "0.0.0.0"
port: 12345
save_path: "/media/audio" #folder audio has to be created.
I am start the capture or via button or via touch pin, i can see the udp packet sended and received, the receiver after some time of inactivity save the buffer to wav the file is created but i can’t hear nothing of relevants inside wave file the capture time is 5 seconds but i receive only a 1 second file with some noise.