Overview

The Producer and Consumer processors work as a pair to route frames between different parts of a pipeline, particularly useful when working with ParallelPipeline. They allow you to selectively capture frames from one pipeline branch and inject them into another.

ProducerProcessor

ProducerProcessor examines frames flowing through the pipeline, applies a filter to decide which frames to share, and optionally transforms these frames before sending them to connected consumers.

Constructor Parameters

filter
Callable[[Frame], Awaitable[bool]]
required

An async function that determines which frames should be sent to consumers. Should return True for frames to be shared.

transformer
Callable[[Frame], Awaitable[Frame]]
default:"identity_transformer"

Optional async function that transforms frames before sending to consumers. By default, passes frames unchanged.

passthrough
bool
default:"True"

When True, passes all frames through the normal pipeline flow. When False, only passes through frames that don’t match the filter.

ConsumerProcessor

ConsumerProcessor receives frames from a ProducerProcessor and injects them into its pipeline branch.

Constructor Parameters

producer
ProducerProcessor
required

The producer processor that will send frames to this consumer.

transformer
Callable[[Frame], Awaitable[Frame]]
default:"identity_transformer"

Optional async function that transforms frames before injecting them into the pipeline.

direction
FrameDirection
default:"FrameDirection.DOWNSTREAM"

The direction in which to push received frames. Usually DOWNSTREAM to send frames forward in the pipeline.

Usage Examples

Basic Usage: Moving TTS Audio Between Branches

# Create a producer that captures TTS audio frames
async def is_tts_audio(frame: Frame) -> bool:
    return isinstance(frame, TTSAudioRawFrame)

# Define an async transformer function
async def tts_to_input_audio_transformer(frame: Frame) -> Frame:
    if isinstance(frame, TTSAudioRawFrame):
        # Convert TTS audio to input audio format
        return InputAudioRawFrame(
            audio=frame.audio,
            sample_rate=frame.sample_rate,
            num_channels=frame.num_channels
        )
    return frame

producer = ProducerProcessor(
    filter=is_tts_audio,
    transformer=tts_to_input_audio_transformer
    passthrough=True  # Keep these frames in original pipeline
)

# Create a consumer to receive the frames
consumer = ConsumerProcessor(
    producer=producer,
    direction=FrameDirection.DOWNSTREAM
)

# Use in a ParallelPipeline
pipeline = Pipeline([
    transport.input(),
    ParallelPipeline(
        # Branch 1: LLM for bot responses
        [
            llm,
            tts,
            producer,  # Capture TTS audio here
        ],
        # Branch 2: Audio processing branch
        [
            consumer,  # Receive TTS audio here
            llm, # Speech-to-Speech LLM (audio in)
        ]
    ),
    transport.output(),
])