> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Pipeline & Frame Processing

> Learn how Pipecat's pipeline architecture orchestrates frame processing for voice AI applications

The **Pipeline** is the core orchestration component in Pipecat that connects frame processors together, creating a structured path for data to flow through your voice AI application.

## Basic Pipeline Structure

A Pipeline takes a list of frame processors and connects them in sequence. Here's a simple voice AI pipeline that matches the voice AI agent architecture we discussed earlier:

```python theme={null}
pipeline = Pipeline([
    transport.input(),              # Receives user audio
    stt,                            # Speech-to-text conversion
    context_aggregator.user(),      # Collect user responses
    llm,                            # Language model processing
    tts,                            # Text-to-speech conversion
    transport.output(),             # Sends audio to user
    context_aggregator.assistant(), # Collect assistant responses
])
```

## Understanding Frames and Frame Processing

Before diving deeper into pipelines, it's important to understand how data moves through them using **frames** and **frame processors**.

### What Are Frames?

**Frames** are data containers that carry information through your pipeline. Think of them as packages on a conveyor belt. Each frame contains a specific type of data that processors can examine and act upon.

Frames automatically receive unique identifiers and names (like `TranscriptionFrame#1`) that help with debugging and tracking data flow through your pipeline.

Common frame types include:

* **Audio frames**: Raw audio data from users or generated by TTS
* **Text frames**: Transcriptions, LLM responses, or other text content
* **Image frames**: Visual data for multimodal applications
* **System frames**: Control signals for pipeline management
* **Context frames**: Conversation history and state information

Frames flow through your pipeline from processor to processor, carrying the data your voice AI application needs to operate.

### What Are Frame Processors?

**Frame Processors** are the workers in your pipeline. Each processor has a specific job - like converting speech to text, generating responses, or playing audio. They:

* **Receive frames** from the previous processor in the pipeline
* **Process the data** (transcribe audio, generate text, etc.)
* **Create new frames** with their output
* **Pass frames along** to the next processor

Frame processors are modular and reusable. You can swap out different STT services or LLM providers without changing the rest of your pipeline.

### Frame Types

Frames in Pipecat have different base classes that determine how they're processed:

```python theme={null}
@dataclass
class SystemFrame(Frame):
    """System frames are queued with high priority."""
    pass

@dataclass
class DataFrame(Frame):
    """Data frames are queued and processed in order."""
    pass

@dataclass
class ControlFrame(Frame):
    """Control frames are queued and processed in order."""
    pass
```

**Key differences:**

* **SystemFrames**: High-priority and ordered with other SystemFrames; interruptions do not discard them (interruptions, pipeline control, user input)
* **DataFrames & ControlFrames**: Queued and processed in order (audio output, text, images)

**Examples by type:**

```python theme={null}
# SystemFrames (high-priority and ordered with other SystemFrames)
InputAudioRawFrame         # User audio input
UserStartedSpeakingFrame   # Speech detection events
InterruptionFrame          # Interruption control
ErrorFrame                 # Error notifications

# DataFrames (queued and ordered)
OutputAudioRawFrame        # Audio for playback
TextFrame                  # Text content
TranscriptionFrame         # Speech-to-text results
LLMTextFrame               # LLM responses
AggregatedTextFrame        # Text aggregated into a describable unit
TTSTextFrame               # Text-to-speech text output

# ControlFrames (queued and ordered)
EndFrame                   # Pipeline shutdown
TTSStartedFrame            # TTS response boundaries
LLMFullResponseStartFrame  # LLM response boundaries
```

### Frame Processing Order

**Frames are processed in guaranteed order within their processing lane**, even across ParallelPipelines. SystemFrames are queued in a high-priority lane and ordered with other SystemFrames; DataFrames and ControlFrames are queued together in the non-system lane and ordered with each other. This enables reliable sequencing. For example, you can push two non-system frames in order and the order will be respected. Additionally, the corresponding processing will finish before allowing the next non-system frame to be processed. Let's look at an example where we push two frames—`TTSSpeakFrame` and `EndFrame`—in order to say goodbye then end the pipeline:

```python theme={null}
from pipecat.frames.frames import EndFrame, TTSSpeakFrame

# These will execute in order: speak first, then end pipeline
await task.queue_frames([
    TTSSpeakFrame("Goodbye!"),
    EndFrame()
])
```

### How Frame Processors Work

Every frame processor follows the same pattern with two key methods:

```python theme={null}
class TranscriptionLogger(FrameProcessor):
    async def process_frame(self, frame: Frame, direction: FrameDirection):
        # Always call parent first
        await super().process_frame(frame, direction)

        # Handle specific frame types
        if isinstance(frame, TranscriptionFrame):
            print(f"Transcription: {frame.text}")

        # Push frame to next processor
        await self.push_frame(frame, direction)
```

**Key methods:**

* **`process_frame()`**: Inspect and handle incoming frames
* **`push_frame()`**: Send frames upstream or downstream

<Card title="Custom Frame Processors" icon="code" href="/pipecat/fundamentals/custom-frame-processor">
  Learn how to build your own frame processors
</Card>

## How Data Flows Through Pipelines

Understanding data flow is crucial for building effective pipelines:

### Frame Processing Order

**Order matters**: Processors must be arranged so that each receives the frame types it needs:

1. `transport.input()` creates `InputAudioRawFrame`s from user audio
2. `stt` receives audio frames and outputs `TranscriptionFrame`s
3. `llm` processes text and generates `LLMTextFrame`s
4. `tts` converts text frames to `TTSAudioRawFrame`s, `AggregatedTextFrame`s, and `TTSTextFrame`s
5. `transport.output()` creates `OutputAudioRawFrame`s and sends audio back to user

* Note: An `LLMTextProcessor` can sit between the `llm` and `tts` to pre-aggregate `LLMTextFrame`s into `AggregatedTextFrame`s. This simply moves the aggregation step
  out of the TTS.

### Frame Propagation

**Processors always push frames**: Processors don't consume frames, they pass them along:

```python theme={null}
# This pipeline allows multiple processors to use the same audio
pipeline = Pipeline([
    transport.input(),          # Creates InputAudioRawFrame
    stt,                        # Uses audio → creates TranscriptionFrame
    # ...                       # Other processors can use the same audio
    tts,                        # Uses various text frames → creates TTSAudioRawFrame
    transport.output(),         # Uses TTSAudioRawFrame → sends audio to user
    audio_buffer_processor,     # Also uses the same user and assistant audio for recording
])
```

This design allows multiple processors to operate on the same data stream without interfering with each other.

### Parallel Processing Patterns

Use `ParallelPipeline` to create branches where each branch receives all upstream frames. Frames are collected and pushed individually from each branch:

```python theme={null}
pipeline = Pipeline([
    transport.input(),
    stt,
    context_aggregator.user(),
    llm,
    ParallelPipeline([
        # English branch
        [FunctionFilter(english_filter), english_tts],
        # Spanish branch
        [FunctionFilter(spanish_filter), spanish_tts],
    ]),
    transport.output(),
    context_aggregator.assistant(),
])
```

In this example:

* Both TTS branches receive all LLM output
* Each branch can filter and process frames independently
* Results from both branches flow to `transport.output()`

ParallelPipelines are traditionally paired with filters or gates to control which frames go where, allowing for complex conditional logic.

### Frame Queuing and Processing

Frame processors have internal queues that ensure ordered processing:

* **SystemFrames use a high-priority input queue** and are processed in order with other SystemFrames (interruptions, errors, input audio)
* **DataFrames and ControlFrames use the non-system process queue** and are processed in order with each other
* **Queuing is managed automatically** by the pipeline infrastructure
* **Order is guaranteed within each lane** even across complex pipeline structures

<Note>
  Learn more about frame flow patterns in the [Custom Frame Processor
  Guide](/pipecat/fundamentals/custom-frame-processor).
</Note>

## Pipeline Execution

### Creating a Pipeline Task

Wrap your pipeline in a `PipelineTask` to configure execution parameters:

```python theme={null}
task = PipelineTask(
    pipeline,
    params=PipelineParams(
        enable_metrics=True,
        enable_usage_metrics=True,
        audio_in_sample_rate=8000,
        audio_out_sample_rate=8000,
    ),
    observers=[RTVIObserver(rtvi)],
)
```

### Running the Pipeline

Use `PipelineRunner` to execute your pipeline task:

```python theme={null}
# Create the runner
runner = PipelineRunner(handle_sigint=False)

# Run the pipeline
await runner.run(task)
```

**PipelineRunner features:**

* **Lifecycle Management**: Handles startup, execution, and cleanup
* **Signal Handling**: Responds to SIGINT/SIGTERM for graceful shutdown (when `handle_sigint=True`)
* **Resource Cleanup**: Ensures proper disposal of resources when tasks complete

**Common PipelineRunner options:**

```python theme={null}
# Handle system signals automatically
# Useful when running a file directly
runner = PipelineRunner(handle_sigint=True)

# Custom signal handling
# Useful when the PipelineRunner is managed by
# another system like the development runner
runner = PipelineRunner(handle_sigint=False)
```

### Pipeline Lifecycle

The pipeline runs continuously, processing frames as they arrive, until one of these conditions occurs:

1. **Normal Completion**: Bot code returns (session ends)
2. **Signal Termination**: SIGINT or SIGTERM received (if `handle_sigint=True`)
3. **Task Cancellation**: Explicit call to `task.cancel()`
4. **Error Condition**: Unhandled exception in pipeline

**Common lifecycle patterns:**

```python theme={null}
# Graceful shutdown on client disconnect
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
    logger.info("Client disconnected - stopping pipeline")
    await task.cancel()

# Await the pipeline task
await runner.run(task)
```

**Pipeline states during execution:**

* **Starting**: Initializing processors and connections
* **Running**: Actively processing frames and handling events
* **Stopping**: Gracefully shutting down processors
* **Stopped**: All resources cleaned up

## Configuring Pipeline Behavior

Beyond the basic pipeline structure, you can fine-tune behavior through PipelineParams and add monitoring capabilities:

### Pipeline Parameters

`PipelineParams` lets you configure how your entire pipeline operates:

```python theme={null}
params = PipelineParams(
    # Audio configuration (affects all audio processors)
    audio_in_sample_rate=16000,
    audio_out_sample_rate=24000,

    # Performance monitoring
    enable_metrics=True,
    enable_usage_metrics=True,

    # Debugging and development
    send_initial_empty_metrics=False,
    report_only_initial_ttfb=True,
)
```

### Monitoring and Observability

Add observers to monitor pipeline events and track custom application metrics:

```python theme={null}
task = PipelineTask(
    pipeline,
    params=params,
    observers=[
        RTVIObserver(rtvi),           # RTVI protocol events
        CustomMetricsObserver(),      # Your custom metrics
    ],
)
```

These configuration options enable you to optimize performance, debug issues, and monitor your pipeline in production:

<CardGroup cols={3}>
  <Card title="Pipeline Parameters" icon="screwdriver-wrench" href="/api-reference/server/pipeline/pipeline-params">
    Configure execution parameters, audio settings, and debugging options
  </Card>

  <Card title="Metrics Guide" icon="chart-line" href="/pipecat/fundamentals/metrics">
    Monitor pipeline performance, latency, and API usage in real-time
  </Card>

  <Card title="Observer Pattern" icon="eye" href="/api-reference/server/utilities/observers/observer-pattern">
    Monitor pipeline events, frame processing, and custom application events
  </Card>
</CardGroup>

## Key Takeaways

* **Order matters** - arrange processors so each gets the frames it needs
* **Processors push frames** - processors pass frames downstream, not consume them
* **Frame types determine processing** - SystemFrames use the high-priority input queue, while DataFrames and ControlFrames use the non-system process queue
* **Queuing ensures reliability** - frames are processed in guaranteed order within their processing lane
* **Parallel processing** enables conditional logic and multi-modal handling
* **PipelineTask** configures execution parameters and monitoring
* **PipelineRunner** manages the complete pipeline lifecycle
* **PipelineParams** control pipeline-wide behavior like audio settings and metrics

## What's Next

Now that you understand how pipelines orchestrate processing, let's explore the different transport options that connect your pipeline to users.

<Card title="Transports" icon="arrow-right" href="/pipecat/learn/transports">
  Learn about the different ways users can connect to your voice AI pipeline
</Card>
