Pipecat Flows provides a framework for building structured conversations in your AI applications. It enables you to create both predefined conversation paths and dynamically generated flows while handling the complexities of state management and LLM interactions.

The framework consists of:

  • A Python module for building conversation flows with Pipecat
  • A visual editor for designing and exporting flow configurations

Key Concepts

  • Nodes: Represent conversation states with specific messages and available functions
  • Messages: Set the role and tasks for each node
  • Functions: Define actions and transitions (Node functions for operations, Edge functions for transitions)
  • Actions: Execute operations during state transitions (pre/post actions)
  • State Management: Handle conversation state and data persistence

Example Flows

These examples are fully functional and can be run locally. Make sure you have the required dependencies installed and API keys configured.

When to Use Static vs Dynamic Flows

Static Flows are ideal when:

  • Conversation structure is known upfront
  • Paths follow predefined patterns
  • Flow can be fully configured in advance
  • Example: Customer service scripts, intake forms

Dynamic Flows are better when:

  • Paths depend on external data
  • Flow structure needs runtime modification
  • Complex decision trees are involved
  • Example: Personalized recommendations, adaptive workflows

Installation

If you’re already using Pipecat:

pip install pipecat-ai-flows

If you’re starting fresh:

# Basic installation
pip install pipecat-ai-flows

# Install Pipecat with specific LLM provider options:
pip install "pipecat-ai[daily,openai,deepgram]"     # For OpenAI
pip install "pipecat-ai[daily,anthropic,deepgram]"  # For Anthropic
pip install "pipecat-ai[daily,google,deepgram]"     # For Google

💡 Want to design your flows visually? Try the online Flow Editor

Core Concepts

Designing Conversation Flows

Functions in Pipecat Flows serve two key purposes:

  • interfacing with external systems and APIs through LLM function calls
  • advancing the conversation to the next node

Function Handlers

When you need to collect data, validate input, or retrieve information, add a handler to your function. These handlers are async functions that execute when the LLM calls the function, allowing you to interact with databases, APIs, or other external services:

# Example function handler
async def check_availability(args: FlowArgs) -> FlowResult:
    """Check restaurant availability for the requested time."""
    date = args["date"]
    time = args["time"]
    party_size = args["party_size"]

    # Interface with reservation system
    available = await reservation_system.check_availability(date, time, party_size)
    return {"status": "success", "available": available}

Transitioning Between Nodes

To advance the conversation, Pipecat Flows offers two approaches based on your flow type:

For static flows, use the transition_to property to specify the next node:

{
    "type": "function",
    "function": {
        "name": "confirm_reservation",
        "handler": save_reservation,  # Process the reservation
        "parameters": {...},
        "transition_to": "send_confirmation"  # Move to confirmation node
    }
}

For dynamic flows, use a transition callback to determine the next node at runtime:

{
    "type": "function",
    "function": {
        "name": "confirm_reservation",
        "handler": save_reservation,  # Process the reservation
        "parameters": {...},
        "transition_callback": send_confirmation  # Call the corresponding handler to set_node()
    }
}

Transition Callbacks (Dynamic Flows)

Transition callbacks receive both the original function arguments and the typed result:

async def handle_confirmation(
    args: Dict,           # Original function arguments
    result: FlowResult,   # Typed result from handler
    flow_manager: FlowManager
):
    """Handle transition based on function result."""
    if result["success"]:
        await flow_manager.set_node("send_confirmation", create_confirmation_node())
    else:
        await flow_manager.set_node(
            "retry_reservation",
            create_retry_node(result.error_message)
        )

For backwards compatibility, callbacks can also use the two-argument signature:

async def handle_confirmation(args: Dict, flow_manager: FlowManager):
    """Legacy transition handler."""
    await flow_manager.set_node("next", create_next_node())

You can combine both approaches: use handlers to process data and transitions to advance the conversation, creating flows that are both functional and conversational.

Node Structure

Each node in your flow represents a conversation state and consists of three main components:

Messages

Nodes use two types of messages to control the conversation:

  1. Role Messages: Define the bot’s personality or role (optional)
"role_messages": [
    {
        "role": "system",
        "content": "You are a friendly pizza ordering assistant. Keep responses casual and upbeat."
    }
]
  1. Task Messages: Define what the bot should do in the current node
"task_messages": [
    {
        "role": "system",
        "content": "Ask the customer which pizza size they'd like: small, medium, or large."
    }
]

Role messages are typically defined in your initial node and inherited by subsequent nodes, while task messages are specific to each node’s purpose.

Functions

Functions in Pipecat Flows can:

  1. Process data (using handler)
  2. Create transitions (using transition_to for static or transition_callback for dynamic flows)
  3. Do both simultaneously

This leads to two conceptual types of functions:

  • Node functions, which execute a handler within a state
  • Edge function, which create transitions between nodes and optionally executes a handler

Function type is defined implicitly based on the existence (or absence) of the transition properties

Node Functions

Functions that process data within a state. They typically:

  • Have a handler to interface with external systems or APIs
  • Trigger an immediate LLM completion with the result
from pipecat_flows import FlowArgs, FlowResult

async def select_size(args: FlowArgs) -> FlowResult:
    """Process pizza size selection."""
    size = args["size"]
    return {
        "status": "success",
        "size": size
    }

# Node function configuration
{
    "type": "function",
    "function": {
        "name": "select_size",
        "handler": select_size,           # Required: Processes the selection
        "description": "Select pizza size",
        "parameters": {
            "type": "object",
            "properties": {
                "size": {"type": "string", "enum": ["small", "medium", "large"]}
            }
        },
    }
}

Edge Functions

Functions that create transitions between nodes. When called, they:

  • Execute their handler (if one is provided)
  • Add function results to context
  • Must have transition_to (static flow) or transition_callback (dynamic flow) to specify the next state
  • Trigger LLM completion after both the function result and new node’s messages are in context
# Edge function configuration
{
    "type": "function",
    "function": {
        "name": "next_step",
        "description": "Move to next state",
        "parameters": {"type": "object", "properties": {}},
        "handler": select_size,        # Optional: Process data during transition
        "transition_to": "target_node" # Required: Specify next state
    }
}

A function’s behavior is determined by its properties:

  • handler only: Process data, stay in current state
  • transition_to (static flow) or transition_callback (dynamic flow) only: Pure transition to next state
  • Both: Process data, then transition

Actions

Actions are operations that execute during state transitions, with two distinct timing options:

  • Pre-actions: execute before the LLM completion
  • Post-actions: execute after the LLM completion

Pre-Actions

Execute before LLM inference. Useful for:

  • Providing immediate feedback while waiting for LLM responses
  • Bridging gaps during longer function calls
  • Setting up state or context
"pre_actions": [
    {
        "type": "tts_say",
        "text": "Hold on a moment..."  # Immediate feedback during processing
    }
],

Avoid mixing tts_say actions with chat completions as this may result in a conversation flow that feels unnatural. tts_say are best used as filler words when the LLM will take time to generate an completion.

Post-Actions

Execute after LLM inference completes. Useful for:

  • Cleanup operations
  • State finalization
  • Ensuring proper sequence of operations
"post_actions": [
    {
        "type": "end_conversation"  # Ensures TTS completes before ending
    }
]

Timing Considerations

  • Pre-actions: Execute immediately, before any LLM processing begins
  • LLM Inference: Processes the node’s messages and functions
  • Post-actions: Execute only after LLM processing and TTS completion

For example, when using end_conversation as a post-action, the sequence is:

  1. LLM generates response
  2. TTS speaks the response
  3. End conversation action executes

This ordering ensures proper completion of all operations.

Action Types

Flows comes equipped with built-in actions and you can define your own custom actions. See the reference docs for more information.

Context Management

Pipecat Flows provides three strategies for managing conversation context during node transitions:

Context Strategies

  • APPEND (default): Adds new messages to the existing context, maintaining the full conversation history
  • RESET: Clears the context and starts fresh with the new node’s messages
  • RESET_WITH_SUMMARY: Resets the context but includes an AI-generated summary of the previous conversation

Configuration

Context strategies can be configured globally or per-node:

from pipecat_flows import ContextStrategy, ContextStrategyConfig

# Global strategy configuration
flow_manager = FlowManager(
    task=task,
    llm=llm,
    context_aggregator=context_aggregator,
    context_strategy=ContextStrategyConfig(
        strategy=ContextStrategy.RESET_WITH_SUMMARY,
        summary_prompt="Summarize the key points discussed so far, focusing on decisions made and important information collected."
    )
)

# Per-node strategy configuration
node_config = {
    "task_messages": [...],
    "functions": [...],
    "context_strategy": ContextStrategyConfig(
        strategy=ContextStrategy.RESET_WITH_SUMMARY,
        summary_prompt="Provide a concise summary of the customer's order details and preferences."
    )
}

Strategy Selection

Choose your strategy based on your conversation needs:

  • Use APPEND when full conversation history is important
  • Use RESET when previous context might confuse the current node’s purpose
  • Use RESET_WITH_SUMMARY for long conversations where key points need to be preserved

When using RESET_WITH_SUMMARY, if summary generation fails or times out, the system automatically falls back to RESET strategy for resilience.

State Management

The state variable in FlowManager is a shared dictionary that persists throughout the conversation. Think of it as a conversation memory that lets you:

  • Store user information
  • Track conversation progress
  • Share data between nodes
  • Inform decision-making

Here’s a practical example of a pizza ordering flow:

# Store user choices as they're made
async def select_size(args: FlowArgs) -> FlowResult:
    """Handle pizza size selection."""
    size = args["size"]

    # Initialize order in state if it doesn't exist
    if "order" not in flow_manager.state:
        flow_manager.state["order"] = {}

    # Store the selection
    flow_manager.state["order"]["size"] = size

    return {"status": "success", "size": size}

async def select_toppings(args: FlowArgs) -> FlowResult:
    """Handle topping selection."""
    topping = args["topping"]

    # Get existing order and toppings
    order = flow_manager.state.get("order", {})
    toppings = order.get("toppings", [])

    # Add new topping
    toppings.append(topping)
    order["toppings"] = toppings
    flow_manager.state["order"] = order

    return {"status": "success", "toppings": toppings}

async def finalize_order(args: FlowArgs) -> FlowResult:
    """Process the complete order."""
    order = flow_manager.state.get("order", {})

    # Validate order has required information
    if "size" not in order:
        return {"status": "error", "error": "No size selected"}

    # Calculate price based on stored selections
    size = order["size"]
    toppings = order.get("toppings", [])
    price = calculate_price(size, len(toppings))

    return {
        "status": "success",
        "summary": f"Ordered: {size} pizza with {', '.join(toppings)}",
        "price": price
    }

In this example:

  1. select_size initializes the order and stores the size
  2. select_toppings builds a list of toppings
  3. finalize_order uses the stored information to process the complete order

The state variable makes it easy to:

  • Build up information across multiple interactions
  • Access previous choices when needed
  • Validate the complete order
  • Calculate final results

This is particularly useful when information needs to be collected across multiple conversation turns or when later decisions depend on earlier choices.

LLM Provider Support

Pipecat Flows automatically handles format differences between LLM providers:

OpenAI Format

"functions": [{
    "type": "function",
    "function": {
        "name": "function_name",
        "handler": select_size, # Optional handler
        "description": "description",
        "parameters": {...},
        "transition_to": "target_node" # Optional transition (or transition_callback for dynamic flows)
    }
}]

Anthropic Format

"functions": [{
    "name": "function_name",
    "handler": select_size, # Optional handler
    "description": "description",
    "input_schema": {...},
    "transition_to": "target_node" # Optional transition (or transition_callback for dynamic flows)
}]

Google (Gemini) Format

"functions": [{
    "function_declarations": [{
        "name": "function_name",
        "handler": select_size, # Optional handler
        "description": "description",
        "parameters": {...},
        "transition_to": "target_node" # Optional transition (or transition_callback for dynamic flows)
    }]
}]

You don’t need to handle these differences manually - Pipecat Flows adapts your configuration to the correct format based on your LLM provider.

Implementation Approaches

Static Flows

Static flows use a configuration-driven approach where the entire conversation structure is defined upfront.

Basic Setup

from pipecat_flows import FlowManager

# Define flow configuration
flow_config = {
    "initial_node": "greeting",
    "nodes": {
        "greeting": {
            "role_messages": [...],
            "task_messages": [...],
            "functions": [...]
        }
    }
}

# Initialize flow manager with static configuration
flow_manager = FlowManager(
    task=task,
    llm=llm,
    context_aggregator=context_aggregator,
    flow_config=flow_config
)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
    await transport.capture_participant_transcription(participant["id"])
    await flow_manager.initialize()

Example FlowConfig

flow_config = {
    "initial_node": "start",
    "nodes": {
        "start": {
            "role_messages": [
                {
                    "role": "system",
                    "content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.",
                }
            ],
            "task_messages": [
                {
                    "role": "system",
                    "content": "You are an order-taking assistant. Ask if they want pizza or sushi."
                }
            ],
            "functions": [
                {
                    "type": "function",
                    "function": {
                        "name": "choose_pizza",
                        "description": "User wants pizza",
                        "parameters": {"type": "object", "properties": {}},
                        "transition_to": "pizza_order"  # Specify transition
                    }
                },
                {
                    "type": "function",
                    "function": {
                        "name": "select_size",
                        "handler": select_size,
                        "description": "Select pizza size",
                        "parameters": {
                            "type": "object",
                            "properties": {
                                "size": {"type": "string", "enum": ["small", "medium", "large"]}
                            }
                        },
                        "transition_to": "toppings"  # Optional transition after processing
                    }
                }
            ]
        }
    }
}

Transition Best Practices

  • Use transition_to to make state changes explicit
  • Combine handlers with transitions when appropriate
  • Keep transitions focused on single responsibilities

Dynamic Flows

Dynamic flows create and modify conversation paths at runtime based on data or business logic.

Example Implementation

Here’s a complete example of a dynamic insurance quote flow:

from pipecat_flows import FlowManager, FlowArgs, FlowResult

# Define handlers and transitions
async def collect_age(args: FlowArgs) -> AgeResult:
    """Process age collection."""
    age = args["age"]
    return AgeResult(status="success", age=age)

async def handle_age_transition(
    args: Dict,
    result: AgeResult,  # Typed result from handler
    flow_manager: FlowManager
):
    """Transition handler that determines next node based on age."""
    # Use typed result directly
    if result.age < 25:
        await flow_manager.set_node("young_adult", create_young_adult_node())
    else:
        await flow_manager.set_node("standard", create_standard_node())

# Node creation functions
def create_initial_node() -> NodeConfig:
    """Create initial age collection node."""
    return {
        "role_messages": [
            {
                "role": "system",
                "content": "You are an insurance quote assistant."
            }
        ],
        "task_messages": [
            {
                "role": "system",
                "content": "Ask for the customer's age."
            }
        ],
        "functions": [
            {
                "type": "function",
                "function": {
                    "name": "collect_age",
                    "handler": collect_age,
                    "description": "Collect customer age",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "age": {"type": "integer"}
                        }
                    },
                    "transition_callback": handle_age_transition
                }
            }
        ]
    }

def create_young_adult_node() -> Dict[str, Any]:
    """Create node for young adult quotes."""
    return {
        "task_messages": [
            {
                "role": "system",
                "content": "Explain our special young adult coverage options."
            }
        ],
        "functions": [...]  # Additional quote-specific functions
    }

def create_standard_node() -> Dict[str, Any]:
    """Create node for standard quotes."""
    return {
        "task_messages": [
            {
                "role": "system",
                "content": "Present our standard coverage options."
            }
        ],
        "functions": [...]  # Additional quote-specific functions
    }

# Initialize flow manager
flow_manager = FlowManager(
    task=task,
    llm=llm,
    context_aggregator=context_aggregator,
)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
    await transport.capture_participant_transcription(participant["id"])
    await flow_manager.initialize()
    await flow_manager.set_node("initial", create_initial_node())

Best Practices

  • Keep data processing in handlers and flow logic in transition callbacks
  • Store shared data in flow_manager.state
  • Create separate functions for node creation

Flow Editor

The Pipecat Flow Editor provides a visual interface for creating and managing conversation flows. It offers a node-based interface that makes it easier to design, visualize, and modify your flows.

Visual Design

Node Types

  • Start Node (Green): Entry point of your flow

    "greeting": {
        "role_messages": [...],
        "task_messages": [...],
        "functions": [...]
    }
    
  • Flow Nodes (Blue): Intermediate states

    "collect_info": {
        "task_messages": [...],
        "functions": [...],
        "pre_actions": [...]
    }
    
  • End Node (Red): Final state

    "end": {
        "task_messages": [...],
        "functions": [],
        "post_actions": [{"type": "end_conversation"}]
    }
    
  • Function Nodes:

    • Edge Functions (Purple): Create transitions
      {
          "name": "next_node",
          "description": "Transition to next state"
      }
      
    • Node Functions (Orange): Perform operations
      {
          "name": "process_data",
          "handler": process_data_handler,
          "description": "Process user data"
      }
      

Naming Conventions

  • Start Node: Use descriptive names (e.g., “greeting”, “welcome”)
  • Flow Nodes: Name based on purpose (e.g., “collect_info”, “verify_data”)
  • End Node: Conventionally named “end”
  • Functions: Use clear, action-oriented names

Function Configuration

# Edge Function (Transition)
{
    "type": "function",
    "function": {
        "name": "next_state",
        "description": "Clear transition description",
        "parameters": {...}
        "transition_to": "target_node_name" # Transition target
    }
}

# Node Function (Operation)
{
    "type": "function",
    "function": {
        "name": "process_data",
        "handler": process_handler,  # Include handler
        "description": "Clear operation description",
        "parameters": {...}
    }
}

When using the Flow Editor, function handlers can be specified using the __function__: token:

{
    "type": "function",
    "function": {
        "name": "process_data",
        "handler": "__function__:process_data",  # References function in main script
        "description": "Process user data",
        "parameters": {...}
    }
}

The handler will be looked up in your main script when the flow is executed.

When function handlers are specified in the flow editor, they will be exported with the __function__: token.

Using the Editor

Creating a New Flow

  1. Start with a descriptively named Start Node
  2. Add Flow Nodes for each conversation state
  3. Connect nodes using Edge Functions
  4. Add Node Functions for operations
  5. Include an End Node

Import/Export

# Export format
{
    "initial_node": "greeting",
    "nodes": {
        "greeting": {
            "role_messages": [...],
            "task_messages": [...],
            "functions": [...],
            "pre_actions": [...]
        },
        "process": {
            "task_messages": [...],
            "functions": [...],
        },
        "end": {
            "task_messages": [...],
            "functions": [],
            "post_actions": [...]
        }
    }
}

Tips

  • Use the visual preview to verify flow logic
  • Test exported configurations
  • Document node purposes and transitions
  • Keep flows modular and maintainable

Try the editor at flows.pipecat.ai