> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Producer & Consumer Processors

> Route frames between different parts of a pipeline, allowing selective frame sharing across parallel branches or within complex pipelines

## Overview

The Producer and Consumer processors work as a pair to route frames between different parts of a pipeline, particularly useful when working with [`ParallelPipeline`](/api-reference/server/pipeline/parallel-pipeline). They allow you to selectively capture frames from one pipeline branch and inject them into another.

## ProducerProcessor

`ProducerProcessor` examines frames flowing through the pipeline, applies a filter to decide which frames to share, and optionally transforms these frames before sending them to connected consumers.

### Constructor Parameters

<ParamField path="filter" type="Callable[[Frame], Awaitable[bool]]" required>
  An async function that determines which frames should be sent to consumers.
  Should return `True` for frames to be shared.
</ParamField>

<ParamField path="transformer" type="Callable[[Frame], Awaitable[Frame]]" default="identity_transformer">
  Optional async function that transforms frames before sending to consumers. By
  default, passes frames unchanged.
</ParamField>

<ParamField path="passthrough" type="bool" default="True">
  When `True`, passes all frames through the normal pipeline flow. When `False`,
  only passes through frames that don't match the filter.
</ParamField>

## ConsumerProcessor

`ConsumerProcessor` receives frames from a `ProducerProcessor` and injects them into its pipeline branch.

### Constructor Parameters

<ParamField path="producer" type="ProducerProcessor" required>
  The producer processor that will send frames to this consumer.
</ParamField>

<ParamField path="transformer" type="Callable[[Frame], Awaitable[Frame]]" default="identity_transformer">
  Optional async function that transforms frames before injecting them into the
  pipeline.
</ParamField>

<ParamField path="direction" type="FrameDirection" default="FrameDirection.DOWNSTREAM">
  The direction in which to push received frames. Usually `DOWNSTREAM` to send
  frames forward in the pipeline.
</ParamField>

## Usage Examples

### Basic Usage: Moving TTS Audio Between Branches

```python theme={null}
# Create a producer that captures TTS audio frames
async def is_tts_audio(frame: Frame) -> bool:
    return isinstance(frame, TTSAudioRawFrame)

# Define an async transformer function
async def tts_to_input_audio_transformer(frame: Frame) -> Frame:
    if isinstance(frame, TTSAudioRawFrame):
        # Convert TTS audio to input audio format
        return InputAudioRawFrame(
            audio=frame.audio,
            sample_rate=frame.sample_rate,
            num_channels=frame.num_channels
        )
    return frame

producer = ProducerProcessor(
    filter=is_tts_audio,
    transformer=tts_to_input_audio_transformer
    passthrough=True  # Keep these frames in original pipeline
)

# Create a consumer to receive the frames
consumer = ConsumerProcessor(
    producer=producer,
    direction=FrameDirection.DOWNSTREAM
)

# Use in a ParallelPipeline
pipeline = Pipeline([
    transport.input(),
    ParallelPipeline(
        # Branch 1: LLM for bot responses
        [
            llm,
            tts,
            producer,  # Capture TTS audio here
        ],
        # Branch 2: Audio processing branch
        [
            consumer,  # Receive TTS audio here
            llm, # Speech-to-Speech LLM (audio in)
        ]
    ),
    transport.output(),
])
```
