Overview

The AudioBufferProcessor captures and buffers audio from both input (user) and output (bot) sources during conversations. It synchronizes these audio streams, supports both mono and stereo recording with configurable sample rates, and provides flexible event handlers for various audio processing needs. The processor can produce either combined or separate audio tracks and supports both continuous buffering and chunked processing for long-form recordings.

Usage

To record audio, create an instance of AudioBufferProcessor and add it to your pipeline:

from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor

# Create an audio buffer processor
audiobuffer = AudioBufferProcessor(
    sample_rate=44100,  # Optional: desired output sample rate
    num_channels=2,     # 1 for mono, 2 for stereo
    buffer_size=24000 * 2 * 30  # 30 seconds of audio (sample_rate * 2 bytes * seconds)
)

# Add to pipeline
pipeline = Pipeline([
    transport.input(),  # microphone
    context_aggregator.user(),
    llm,
    tts,
    transport.output(),
    audiobuffer,  # used to buffer the audio in the pipeline
    context_aggregator.assistant(),
])

# Example: Save recorded audio to WAV file
async def save_audio(audio: bytes, sample_rate: int, num_channels: int):
    if len(audio) > 0:
        filename = f"conversation_recording{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"
        with io.BytesIO() as buffer:
            with wave.open(buffer, "wb") as wf:
                wf.setsampwidth(2)
                wf.setnchannels(num_channels)
                wf.setframerate(sample_rate)
                wf.writeframes(audio)
            async with aiofiles.open(filename, "wb") as file:
                await file.write(buffer.getvalue())
        print(f"Merged audio saved to {filename}")

# Handle the recorded audio chunks
@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):
    await save_audio(audio, sample_rate, num_channels)

STT Audio Passthrough

If you have an STT service in your pipeline, you will need to pass through the audio so that it’s available to the AudioBufferProcessor. You can do this by adding audio_passthrough=True to the STT service:

stt = DeepgramSTTService(
    api_key=os.getenv("DEEPGRAM_API_KEY"),
    audio_passthrough=True,
)

Configuration Options

sample_rate
Optional[int]
default:"None"

The desired output sample rate in Hz. If not specified, uses the transport’s sample rate.

num_channels
int
default:"1"

Number of audio channels:

  • 1: Mono (mixed user and bot audio)
  • 2: Stereo (user audio in left channel, bot audio in right channel)
buffer_size
int
default:"0"

Size in bytes that triggers the on_audio_data event:

  • 0: Only triggers when recording stops
  • >0: Triggers whenever buffer reaches this size (recommended for longer recordings)
enable_turn_audio
bool
default:"False"

Whether to enable separate event handling for user and bot turns:

  • True: Triggers per-turn audio events
  • False: Only triggers combined audio events

Recording Controls

Start Recording

Begin recording audio from the conversation:

await audiobuffer.start_recording()

Stop Recording

Stop the current recording session:

await audiobuffer.stop_recording()

Event Handlers

The processor supports multiple event handlers for different audio processing needs:

on_audio_data

Triggered when buffer_size is reached or recording stops, providing merged audio:

@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio: bytes, sample_rate: int, num_channels: int):
    # Handle the merged audio
    # audio: Raw audio bytes (mixed according to num_channels setting)
    # sample_rate: Sample rate in Hz
    # num_channels: Number of audio channels (1 or 2)

on_track_audio_data

Triggered alongside on_audio_data, providing separate user and bot audio tracks:

@audiobuffer.event_handler("on_track_audio_data")
async def on_track_audio_data(buffer, user_audio: bytes, bot_audio: bytes,
                             sample_rate: int, num_channels: int):
    # Handle separate audio tracks
    # user_audio: Raw user audio bytes (always mono)
    # bot_audio: Raw bot audio bytes (always mono)
    # sample_rate: Sample rate in Hz
    # num_channels: Always 1 for individual tracks

on_user_turn_audio_data

Triggered when a user speaking turn ends (requires enable_turn_audio=True):

@audiobuffer.event_handler("on_user_turn_audio_data")
async def on_user_turn_audio_data(buffer, audio: bytes, sample_rate: int, num_channels: int):
    # Handle audio from a single user turn

on_bot_turn_audio_data

Triggered when a bot speaking turn ends (requires enable_turn_audio=True):

@audiobuffer.event_handler("on_bot_turn_audio_data")
async def on_bot_turn_audio_data(buffer, audio: bytes, sample_rate: int, num_channels: int):
    # Handle audio from a single bot turn

Long-Form Recording Strategies

For longer conversations (>5-10 minutes), we recommend using a chunked approach to avoid memory issues and enable real-time processing:

Chunked Recording

Set a reasonable buffer_size to trigger periodic uploads:

# 30-second chunks (recommended for most use cases)
SAMPLE_RATE = 24000
CHUNK_DURATION = 30  # seconds
audiobuffer = AudioBufferProcessor(
    sample_rate=SAMPLE_RATE,
    buffer_size=SAMPLE_RATE * 2 * CHUNK_DURATION  # 2 bytes per sample (16-bit)
)

chunk_counter = 0

@audiobuffer.event_handler("on_track_audio_data")
async def on_chunk_ready(buffer, user_audio, bot_audio, sample_rate, num_channels):
    global chunk_counter

    # Upload or save individual chunks
    await upload_audio_chunk(f"user_chunk_{chunk_counter:03d}.wav", user_audio, sample_rate, 1)
    await upload_audio_chunk(f"bot_chunk_{chunk_counter:03d}.wav", bot_audio, sample_rate, 1)

    chunk_counter += 1

Multipart Upload Strategy

For cloud storage, consider using multipart uploads to stream audio chunks:

Conceptual Approach:

  1. Initialize multipart upload when recording starts
  2. Upload chunks as parts when buffers fill (every 30 seconds)
  3. Complete multipart upload when recording ends
  4. Post-process to create final WAV file(s)

Benefits:

  • Memory efficient for long sessions
  • Fault tolerant (no data loss if connection drops)
  • Enables real-time processing and analysis
  • Parallel upload of multiple tracks

Post-Processing Pipeline

After uploading chunks, create final audio files using tools like FFmpeg:

Concatenating Audio Files:

# Method 1: Simple concatenation (same format)
ffmpeg -i "concat:chunk_001.wav|chunk_002.wav|chunk_003.wav" -acodec copy final.wav

# Method 2: Using file list (recommended for many chunks)
# Create filelist.txt with format:
# file 'chunk_001.wav'
# file 'chunk_002.wav'
# ...
ffmpeg -f concat -safe 0 -i filelist.txt -c copy final_recording.wav

Automation Considerations:

  • Use sequence numbers in chunk filenames for proper ordering
  • Include metadata (sample rate, channels, duration) with each chunk
  • Implement retry logic for failed uploads
  • Consider using cloud functions/lambdas for automatic post-processing

Audio Processing Features

  • Automatic resampling of audio to specified sample rate
  • Buffer synchronization between user and bot audio streams
  • Silence insertion for non-continuous audio streams to maintain timing
  • Separate tracking of user and bot speaking turns
  • Stereo channel separation for user and bot audio (when num_channels=2)
  • Memory-efficient chunking for long-form recordings