Overview

The AudioBufferProcessor captures and buffers audio from both input (user) and output (bot) sources during conversations. It synchronizes these audio streams, supports both mono and stereo recording with configurable sample rates, and provides flexible event handlers for various audio processing needs. The processor can operate in continuous or speech-only recording modes and produce either combined or separate audio tracks.

Usage

To record audio, create an instance of AudioBufferProcessor and add it to your pipeline:

from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor

# Create an audio buffer processor
audiobuffer = AudioBufferProcessor(
    sample_rate=44100,  # Optional: desired output sample rate
    num_channels=2,     # 1 for mono, 2 for stereo
    buffer_size=0       # Size in bytes to trigger buffer callbacks
)

# Add to pipeline
pipeline = Pipeline([
    transport.input(),  # microphone
    context_aggregator.user(),
    llm,
    tts,
    transport.output(),
    audiobuffer,  # used to buffer the audio in the pipeline
    context_aggregator.assistant(),
])

# Example: Save recorded audio to WAV file
async def save_audio(audio: bytes, sample_rate: int, num_channels: int):
    if len(audio) > 0:
        filename = f"conversation_recording{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"
        with io.BytesIO() as buffer:
            with wave.open(buffer, "wb") as wf:
                wf.setsampwidth(2)
                wf.setnchannels(num_channels)
                wf.setframerate(sample_rate)
                wf.writeframes(audio)
            async with aiofiles.open(filename, "wb") as file:
                await file.write(buffer.getvalue())
        print(f"Merged audio saved to {filename}")

# Handle the recorded audio chunks
@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):
    await save_audio(audio, sample_rate, num_channels)

STT Audio Passthrough

If you have an STT service in your pipeline, you will need to pass through the audio so that it’s available to the AudioBufferProcessor. You can do this by adding audio_passthrough=True to the STT service:

stt = DeepgramSTTService(
    api_key=os.getenv("DEEPGRAM_API_KEY"),
    audio_passthrough=True,
)

Configuration Options

sample_rate
Optional[int]
default:"None"

The desired output sample rate in Hz. If not specified, uses the transport’s sample rate.

num_channels
int
default:"1"

Number of audio channels:

  • 1: Mono (mixed user and bot audio)
  • 2: Stereo (user audio in left channel, bot audio in right channel)
buffer_size
int
default:"0"

Size in bytes that triggers the on_audio_data event:

  • 0: Only triggers when recording stops
  • >0: Triggers whenever buffer reaches this size
user_continuous_stream
bool
default:"True"

Whether user audio is continuous or speech-only:

  • True: Expects continuous audio stream
  • False: Expects only speech segments with silence between them
enable_turn_audio
bool
default:"False"

Whether to enable separate event handling for user and bot turns:

  • True: Triggers per-turn audio events
  • False: Only triggers combined audio events

Recording Controls

Start Recording

Begin recording audio from the conversation:

await audiobuffer.start_recording()

Stop Recording

Stop the current recording session:

await audiobuffer.stop_recording()

Event Handlers

The processor supports multiple event handlers for different audio processing needs:

on_audio_data

Triggered when buffer_size is reached or recording stops, providing merged audio:

@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio: bytes, sample_rate: int, num_channels: int):
    # Handle the merged audio
    # audio: Raw audio bytes (mixed according to num_channels setting)
    # sample_rate: Sample rate in Hz
    # num_channels: Number of audio channels (1 or 2)

on_track_audio_data

Triggered alongside on_audio_data, providing separate user and bot audio tracks:

@audiobuffer.event_handler("on_track_audio_data")
async def on_track_audio_data(buffer, user_audio: bytes, bot_audio: bytes,
                             sample_rate: int, num_channels: int):
    # Handle separate audio tracks
    # user_audio: Raw user audio bytes
    # bot_audio: Raw bot audio bytes

on_user_turn_audio_data

Triggered when a user speaking turn ends, providing that turn’s audio:

@audiobuffer.event_handler("on_user_turn_audio_data")
async def on_user_turn_audio_data(buffer, audio: bytes, sample_rate: int, num_channels: int):
    # Handle audio from a single user turn

on_bot_turn_audio_data

Triggered when a bot speaking turn ends, providing that turn’s audio:

@audiobuffer.event_handler("on_bot_turn_audio_data")
async def on_bot_turn_audio_data(buffer, audio: bytes, sample_rate: int, num_channels: int):
    # Handle audio from a single bot turn

Audio Processing Features

  • Automatic resampling of audio to specified sample rate
  • Buffer synchronization between user and bot audio streams
  • Silence insertion for non-continuous audio streams
  • Support for both continuous and speech-only recording modes
  • Separate tracking of user and bot speaking turns
  • Stereo channel separation for user and bot audio