The Pipeline is the core orchestration component in Pipecat that connects frame processors together, creating a structured path for data to flow through your voice AI application.

Basic Pipeline Structure

A Pipeline takes a list of frame processors and connects them in sequence. Here’s a simple voice AI pipeline that matches the voice AI agent architecture we discussed earlier:
pipeline = Pipeline([
    transport.input(),              # Receives user audio
    stt,                            # Speech-to-text conversion
    context_aggregator.user(),      # Collect user responses
    llm,                            # Language model processing
    tts,                            # Text-to-speech conversion
    transport.output(),             # Sends audio to user
    context_aggregator.assistant(), # Collect assistant responses
])

Understanding Frames and Frame Processing

Before diving deeper into pipelines, it’s important to understand how data moves through them using frames and frame processors.

What Are Frames?

Frames are data containers that carry information through your pipeline. Think of them as packages on a conveyor belt. Each frame contains a specific type of data that processors can examine and act upon. Frames automatically receive unique identifiers and names (like TranscriptionFrame#1) that help with debugging and tracking data flow through your pipeline. Common frame types include:
  • Audio frames: Raw audio data from users or generated by TTS
  • Text frames: Transcriptions, LLM responses, or other text content
  • Image frames: Visual data for multimodal applications
  • System frames: Control signals for pipeline management
  • Context frames: Conversation history and state information
Frames flow through your pipeline from processor to processor, carrying the data your voice AI application needs to operate.

What Are Frame Processors?

Frame Processors are the workers in your pipeline. Each processor has a specific job - like converting speech to text, generating responses, or playing audio. They:
  • Receive frames from the previous processor in the pipeline
  • Process the data (transcribe audio, generate text, etc.)
  • Create new frames with their output
  • Pass frames along to the next processor
Frame processors are modular and reusable. You can swap out different STT services or LLM providers without changing the rest of your pipeline.

Frame Types

Frames in Pipecat have different base classes that determine how they’re processed:
@dataclass
class SystemFrame(Frame):
    """System frames are processed immediately, bypassing queues."""
    pass

@dataclass
class DataFrame(Frame):
    """Data frames are queued and processed in order."""
    pass

@dataclass
class ControlFrame(Frame):
    """Control frames are queued and processed in order."""
    pass
Key differences:
  • SystemFrames: Processed immediately (interruptions, pipeline control, user input)
  • DataFrames & ControlFrames: Queued and processed in order (audio output, text, images)
Examples by type:
# SystemFrames (processed immediately)
InputAudioRawFrame         # User audio input
UserStartedSpeakingFrame   # Speech detection events
StartInterruptionFrame     # Interruption control
ErrorFrame                 # Error notifications

# DataFrames (queued and ordered)
OutputAudioRawFrame        # Audio for playback
TextFrame                  # Text content
TranscriptionFrame         # Speech-to-text results
LLMTextFrame               # LLM responses

# ControlFrames (queued and ordered)
EndFrame                   # Pipeline shutdown
TTSStartedFrame            # TTS response boundaries
LLMFullResponseStartFrame  # LLM response boundaries

Frame Processing Order

Frames are processed in guaranteed order, even across ParallelPipelines. This enables reliable sequencing. For example, you can push two frames in order and the order will be respected. Additionally, the corresponding processing will finish before allowing the next frame to be processed. Let’s look at an example where we push two frames—TTSSpeakFrame and EndFrame—in order to say goodbye then end the pipeline:
from pipecat.frames.frames import EndFrame, TTSSpeakFrame

# These will execute in order: speak first, then end pipeline
await task.queue_frames([
    TTSSpeakFrame("Goodbye!"),
    EndFrame()
])

How Frame Processors Work

Every frame processor follows the same pattern with two key methods:
class TranscriptionLogger(FrameProcessor):
    async def process_frame(self, frame: Frame, direction: FrameDirection):
        # Always call parent first
        await super().process_frame(frame, direction)

        # Handle specific frame types
        if isinstance(frame, TranscriptionFrame):
            print(f"Transcription: {frame.text}")

        # Push frame to next processor
        await self.push_frame(frame, direction)
Key methods:
  • process_frame(): Inspect and handle incoming frames
  • push_frame(): Send frames upstream or downstream

Custom Frame Processors

Learn how to build your own frame processors

How Data Flows Through Pipelines

Understanding data flow is crucial for building effective pipelines:

Frame Processing Order

Order matters: Processors must be arranged so that each receives the frame types it needs:
  1. transport.input() creates InputAudioRawFrames from user audio
  2. stt receives audio frames and outputs TranscriptionFrames
  3. llm processes text and generates LLMTextFrames
  4. tts converts text frames to TTSAudioRawFrames and TTSTextFrames
  5. transport.output() creates OutputAudioRawFrames and sends audio back to user

Frame Propagation

Processors always push frames: Processors don’t consume frames, they pass them along:
# This pipeline allows multiple processors to use the same audio
pipeline = Pipeline([
    transport.input(),          # Creates InputAudioRawFrame
    stt,                        # Uses audio → creates TranscriptionFrame
    # ...                       # Other processors can use the same audio
    tts,                        # Uses LLMTextFrame → creates TTSAudioRawFrame
    transport.output(),         # Uses TTSAudioRawFrame → sends audio to user
    audio_buffer_processor,     # Also uses the same user and assistant audio for recording
])
This design allows multiple processors to operate on the same data stream without interfering with each other.

Parallel Processing Patterns

Use ParallelPipeline to create branches where each branch receives all upstream frames. Frames are collected and pushed individually from each branch:
pipeline = Pipeline([
    transport.input(),
    stt,
    context_aggregator.user(),
    llm,
    ParallelPipeline([
        # English branch
        [FunctionFilter(english_filter), english_tts],
        # Spanish branch
        [FunctionFilter(spanish_filter), spanish_tts],
    ]),
    transport.output(),
    context_aggregator.assistant(),
])
In this example:
  • Both TTS branches receive all LLM output
  • Each branch can filter and process frames independently
  • Results from both branches flow to transport.output()
ParallelPipelines are traditionally paired with filters or gates to control which frames go where, allowing for complex conditional logic.

Frame Queuing and Processing

Frame processors have internal queues that ensure ordered processing:
  • SystemFrames bypass queues for immediate processing (interruptions, errors)
  • DataFrames and ControlFrames are queued and processed in order
  • Queuing is managed automatically by the pipeline infrastructure
  • Order is guaranteed even across complex pipeline structures
Learn more about frame flow patterns in the Custom Frame Processor Guide.

Pipeline Execution

Creating a Pipeline Task

Wrap your pipeline in a PipelineTask to configure execution parameters:
task = PipelineTask(
    pipeline,
    params=PipelineParams(
        enable_metrics=True,
        enable_usage_metrics=True,
        audio_in_sample_rate=8000,
        audio_out_sample_rate=8000,
    ),
    observers=[RTVIObserver(rtvi)],
)

Running the Pipeline

Use PipelineRunner to execute your pipeline task:
# Create the runner
runner = PipelineRunner(handle_sigint=False)

# Run the pipeline
await runner.run(task)
PipelineRunner features:
  • Lifecycle Management: Handles startup, execution, and cleanup
  • Signal Handling: Responds to SIGINT/SIGTERM for graceful shutdown (when handle_sigint=True)
  • Resource Cleanup: Ensures proper disposal of resources when tasks complete
Common PipelineRunner options:
# Handle system signals automatically
# Useful when running a file directly
runner = PipelineRunner(handle_sigint=True)

# Custom signal handling
# Useful when the PipelineRunner is managed by
# another system like the development runner
runner = PipelineRunner(handle_sigint=False)

Pipeline Lifecycle

The pipeline runs continuously, processing frames as they arrive, until one of these conditions occurs:
  1. Normal Completion: Bot code returns (session ends)
  2. Signal Termination: SIGINT or SIGTERM received (if handle_sigint=True)
  3. Task Cancellation: Explicit call to task.cancel()
  4. Error Condition: Unhandled exception in pipeline
Common lifecycle patterns:
# Graceful shutdown on client disconnect
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
    logger.info("Client disconnected - stopping pipeline")
    await task.cancel()

# Await the pipeline task
await runner.run(task)
Pipeline states during execution:
  • Starting: Initializing processors and connections
  • Running: Actively processing frames and handling events
  • Stopping: Gracefully shutting down processors
  • Stopped: All resources cleaned up

Configuring Pipeline Behavior

Beyond the basic pipeline structure, you can fine-tune behavior through PipelineParams and add monitoring capabilities:

Pipeline Parameters

PipelineParams lets you configure how your entire pipeline operates:
params = PipelineParams(
    # Audio configuration (affects all audio processors)
    audio_in_sample_rate=16000,
    audio_out_sample_rate=24000,

    # Performance monitoring
    enable_metrics=True,
    enable_usage_metrics=True,

    # Debugging and development
    send_initial_empty_metrics=False,
    report_only_initial_ttfb=True,
)

Monitoring and Observability

Add observers to monitor pipeline events and track custom application metrics:
task = PipelineTask(
    pipeline,
    params=params,
    observers=[
        RTVIObserver(rtvi),           # RTVI protocol events
        CustomMetricsObserver(),      # Your custom metrics
    ],
)
These configuration options enable you to optimize performance, debug issues, and monitor your pipeline in production:

Key Takeaways

  • Order matters - arrange processors so each gets the frames it needs
  • Processors push frames - processors pass frames downstream, not consume them
  • Frame types determine processing - SystemFrames bypass queues, others are ordered
  • Queuing ensures reliability - frames are processed in guaranteed order
  • Parallel processing enables conditional logic and multi-modal handling
  • PipelineTask configures execution parameters and monitoring
  • PipelineRunner manages the complete pipeline lifecycle
  • PipelineParams control pipeline-wide behavior like audio settings and metrics

What’s Next

Now that you understand how pipelines orchestrate processing, let’s explore the different transport options that connect your pipeline to users.

Transports

Learn about the different ways users can connect to your voice AI pipeline