New to building conversational flows? Check out our Pipecat Flows guide first.

Installation

Core Types

FlowArgs

FlowArgs
Dict[str, Any]

Type alias for function handler arguments.

FlowResult

FlowResult
TypedDict

Base type for function handler results. Additional fields can be included as needed.

FlowConfig

FlowConfig
TypedDict

Configuration for the entire conversation flow.

NodeConfig

NodeConfig
TypedDict

Configuration for a single node in the flow.

ContextStrategy

ContextStrategy
Enum

Strategy for managing conversation context during node transitions.

ContextStrategyConfig

ContextStrategyConfig
dataclass

Configuration for context management strategy.

# Example usage
config = ContextStrategyConfig(
    strategy=ContextStrategy.RESET_WITH_SUMMARY,
    summary_prompt="Summarize the key points discussed so far."
)

FlowManager

FlowManager
class

Main class for managing conversation flows, supporting both static (configuration-driven) and dynamic (runtime-determined) flows.

Methods

initialize
method

Initialize the flow with starting messages.

set_node
method

Set up a new conversation node.

  • For dynamic flows, the application must advance the conversation using set_node to set up the next node.
  • For static flows, set_node is triggered by a function call that contains a transition_to field.
register_action
method

Register a handler for a custom action type.

get_current_context
method

Get the current conversation context.

Returns a list of messages in the current context, including system messages, user messages, and assistant responses.

Example usage:

# Access current conversation context
context = flow_manager.get_current_context()

# Use in handlers
async def process_response(args: FlowArgs) -> FlowResult:
    context = flow_manager.get_current_context()
    # Process conversation history
    return {"status": "success"}

State Management

The FlowManager provides a state dictionary for storing conversation data:

Usage Examples

Node Functions
concept

Functions that execute operations within a state.

from pipecat_flows import FlowArgs, FlowResult

async def process_data(args: FlowArgs) -> FlowResult:
    """Handle data processing within a node."""
    data = args["data"]
    result = await process(data)
    return {
        "status": "success",
        "processed_data": result
    }

# Node configuration with transition
{
    "type": "function",
    "function": {
        "name": "process_data",
        "handler": process_data,
        "description": "Process user data",
        "parameters": {
            "type": "object",
            "properties": {
                "data": {"type": "string"}
            }
        },
    }
}
Edge Functions
concept

Functions that create transitions between nodes. Use transition_to (static flow) or transition_callback (dynamic flow) to specify the target node.

# Edge function configuration
{
    "type": "function",
    "function": {
        "name": "next_step",
        "description": "Transition to next node",
        "parameters": {"type": "object", "properties": {}},
        "transition_to": "target_node"  # Required: Specify target node (or use transition_callback for dynamic flows)
    }
}

Function Properties

handler
Optional[Callable]

Async function that processes data within a node. Can be specified as:

  • Direct function reference
  • Either a Callable function or a string with __function__: prefix (e.g., "__function__:process_data") to reference a function in the main script
transition_callback
Optional[Callable]

Handler for dynamic flow transitions. Must be an async function with one of these signatures:

# New style (recommended)
async def handle_transition(
    args: Dict[str, Any],
    result: FlowResult,
    flow_manager: FlowManager
) -> None:
    """Handle transition to next node."""
    if result.available:  # Type-safe access to result
        await flow_manager.set_node("confirm", create_confirmation_node())
    else:
        await flow_manager.set_node(
            "no_availability",
            create_no_availability_node(result.alternative_times)
        )

# Legacy style (supported for backwards compatibility)
async def handle_transition(
    args: Dict[str, Any],
    flow_manager: FlowManager
) -> None:
    """Handle transition to next node."""
    await flow_manager.set_node("next", create_next_node())

The callback receives:

  • args: Arguments from the function call
  • result: Typed result from the function handler (new style only)
  • flow_manager: Reference to the FlowManager instance

Example usage:

async def handle_availability_check(
    args: Dict,
    result: TimeResult,  # Typed result
    flow_manager: FlowManager
):
    """Handle availability check and transition based on result."""
    if result.available:
        await flow_manager.set_node("confirm", create_confirmation_node())
    else:
        await flow_manager.set_node(
            "no_availability",
            create_no_availability_node(result.alternative_times)
        )

# Use in function configuration
{
    "type": "function",
    "function": {
        "name": "check_availability",
        "handler": check_availability,
        "parameters": {...},
        "transition_callback": handle_availability_check
    }
}

Note: A function cannot have both transition_to and transition_callback.

Handler Signatures

Return Types

Provider-Specific Formats

You don’t need to handle these format differences manually - use the standard format and the FlowManager will adapt it for your chosen provider.

Actions

pre_actions and post_actions are used to manage conversation flow. They are included in the NodeConfig and executed before and after the LLM function call, respectively.

Two categories of actions are available:

  • Built-in actions: These actions ship with Flows by default.
  • Custom actions: These are developer defined actions.

Built-in Actions

Common actions shipped with Flows for managing conversation flow. To use them, just add them to your NodeConfig.

tts_say
action

Speaks text immediately using the TTS service.

"pre_actions": [
    {
        "type": "tts_say",
        "text": "Processing your request..."  # Required
    }
]
end_conversation
action

Ends the conversation and closes the connection.

"post_actions": [
    {
        "type": "end_conversation",
        "text": "Goodbye!"  # Optional farewell message
    }
]

Custom Actions

You can define custom pre_actions and post_actions. They are registered automatically when included in the NodeConfig.

Custom actions are composed of:

action_type
str
required

String identifier for the action

handler
Callable
required

Async or sync function that handles the action

Example:

# Define custom action handler
async def custom_notification(action: dict):
    """Custom action handler."""
    message = action.get("message", "")
    await notify_user(message)

# Use in node configuration
"pre_actions": [
    {
        "type": "notify",
        "handler": send_notification
    }
]

Exceptions

FlowError
exception

Base exception for all flow-related errors.

from pipecat_flows import FlowError

try:
    await flow_manager.set_node("invalid", config)
except FlowError as e:
    print(f"Flow error: {e}")
FlowInitializationError
exception

Raised when flow initialization fails.

from pipecat_flows import FlowInitializationError

try:
    await flow_manager.initialize()
except FlowInitializationError as e:
    print(f"Initialization failed: {e}")

FlowTransitionError
exception

Raised when a state transition fails.

from pipecat_flows import FlowTransitionError

try:
    await flow_manager.set_node("next", node_config)
except FlowTransitionError as e:
    print(f"Transition failed: {e}")

InvalidFunctionError
exception

Raised when an invalid or unavailable function is called.

from pipecat_flows import InvalidFunctionError

try:
await flow_manager.set_node("node", {
  "functions": [{
    "type": "function",
    "function": {
    "name": "invalid_function"
    }
  }]
})

except InvalidFunctionError as e:
    print(f"Invalid function: {e}")