New to building conversational flows? Check out our Pipecat Flows guide first.

Installation

Core Types

FlowArgs

FlowArgs
Dict[str, Any]

Type alias for function handler arguments.

FlowResult

FlowResult
TypedDict

Base type for function handler results. Additional fields can be included as needed.

FlowConfig

FlowConfig
TypedDict

Configuration for the entire conversation flow.

NodeConfig

NodeConfig
TypedDict

Configuration for a single node in the flow.

FlowManager

FlowManager
class

Main class for managing conversation flows, supporting both static (configuration-driven) and dynamic (runtime-determined) flows.

Methods

initialize
method

Initialize the flow with starting messages.

set_node
method

Set up a new conversation node.

  • For dynamic flows, the application must advance the conversation using set_node to set up the next node.
  • For static flows, set_node is triggered by a function call that contains a transition_to field.
register_action
method

Register a handler for a custom action type.

State Management

The FlowManager provides a state dictionary for storing conversation data:

Usage Examples

Node Functions
concept

Functions that execute operations within a state and optionally transition to a new state.

from pipecat_flows import FlowArgs, FlowResult

async def process_data(args: FlowArgs) -> FlowResult:
    """Handle data processing within a node."""
    data = args["data"]
    result = await process(data)
    return {
        "status": "success",
        "processed_data": result
    }

# Node configuration with transition
{
    "type": "function",
    "function": {
        "name": "process_data",
        "handler": process_data,
        "description": "Process user data",
        "parameters": {
            "type": "object",
            "properties": {
                "data": {"type": "string"}
            }
        },
        "transition_to": "next_node"  # Optional: Specify next node
    }
}
Edge Functions
concept

Functions that create transitions between nodes. Use transition_to to specify the target node.

# Edge function configuration
{
    "type": "function",
    "function": {
        "name": "next_step",
        "description": "Transition to next node",
        "parameters": {"type": "object", "properties": {}},
        "transition_to": "target_node"  # Required: Specify target node
    }
}

Function Properties

handler
Optional[Callable]

Async function that processes data within a node. Can be specified as:

  • Direct function reference
  • String with __function__: prefix (e.g., "__function__:process_data") to reference a function in the main script
transition_to
Optional[str]

Name of the node to transition to after function execution

Transition Types

Handler Signatures

Return Types

Provider-Specific Formats

You don’t need to handle these format differences manually - use the standard format and the FlowManager will adapt it for your chosen provider.

Built-in Actions

tts_say
action

Speaks text immediately using the TTS service.

"pre_actions": [
    {
        "type": "tts_say",
        "text": "Processing your request..."  # Required
    }
]
end_conversation
action

Ends the conversation and closes the connection.

"post_actions": [
    {
        "type": "end_conversation",
        "text": "Goodbye!"  # Optional farewell message
    }
]

Custom Actions

async def custom_notification(action: dict):
    """Custom action handler."""
    message = action.get("message", "")
    await notify_user(message)

# Register the action

flow_manager.register_action("notify", custom_notification)

# Use in node configuration

"pre_actions": [
    {
        "type": "notify",
        "message": "Important update!"
    }
]

Exceptions

FlowError
exception

Base exception for all flow-related errors.

from pipecat_flows import FlowError

try:
    await flow_manager.set_node("invalid", config)
except FlowError as e:
    print(f"Flow error: {e}")
FlowInitializationError
exception

Raised when flow initialization fails.

from pipecat_flows import FlowInitializationError

try:
    await flow_manager.initialize()
except FlowInitializationError as e:
    print(f"Initialization failed: {e}")

FlowTransitionError
exception

Raised when a state transition fails.

from pipecat_flows import FlowTransitionError

try:
    await flow_manager.set_node("next", node_config)
except FlowTransitionError as e:
    print(f"Transition failed: {e}")

InvalidFunctionError
exception

Raised when an invalid or unavailable function is called.

from pipecat_flows import InvalidFunctionError

try:
await flow_manager.set_node("node", {
  "functions": [{
    "type": "function",
    "function": {
    "name": "invalid_function"
    }
  }]
})

except InvalidFunctionError as e:
    print(f"Invalid function: {e}")

Error Handling Best Practices