Skip to main content

Overview

FlowManager orchestrates conversation flows by managing state transitions, function registration, and message handling across different LLM providers.

Configuration

All parameters are keyword-only.
task
PipelineTask
required
Pipeline task instance used for queueing frames into the pipeline.
llm
LLMService | LLMSwitcher
required
LLM service instance or an LLMSwitcher for switching between LLM providers at runtime. Supports OpenAILLMService and any service that extends it (Groq, Together, Cerebras, DeepSeek, etc.), AnthropicLLMService, GoogleLLMService, and AWSBedrockLLMService.
context_aggregator
Any
required
Context aggregator for managing conversation context. Typically obtained from create_context_aggregator() on the LLM service.
context_strategy
ContextStrategyConfig
Default context strategy for managing conversation context during node transitions. Can be overridden per-node via NodeConfig.context_strategy. See ContextStrategyConfig.
transport
BaseTransport
default:"None"
Transport instance for communication (e.g., DailyTransport). When provided, accessible via the transport property in function and action handlers.
global_functions
List[FlowsFunctionSchema | FlowsDirectFunction]
default:"None"
Functions that will be available at every node. These are registered once during initialization and automatically included alongside node-specific functions. Useful for capabilities like “transfer to human” that should be accessible from any conversation state.

Properties

state

flow_manager.state -> Dict[str, Any]
Shared state dictionary that persists across node transitions. Use this to store and retrieve conversation data such as user preferences, collected information, or any data that needs to be accessible across different nodes.
# Store data
flow_manager.state["user_name"] = "Alice"

# Retrieve data
name = flow_manager.state.get("user_name", "Unknown")

transport

flow_manager.transport -> Optional[BaseTransport]
The transport instance provided during initialization, or None if not set. Use this to interact with the communication platform (e.g., mute participants, access room info).
async def my_handler(args, flow_manager):
    transport = flow_manager.transport
    if transport:
        participants = transport.participants()

current_node

flow_manager.current_node -> Optional[str]
The identifier of the currently active conversation node. Returns None before initialization or if no node has been set.
async def my_handler(args, flow_manager):
    if flow_manager.current_node == "collecting_payment":
        await setup_secure_session(flow_manager)

task

flow_manager.task -> PipelineTask
The pipeline task instance used for frame queueing. Use this for advanced flow control such as queuing custom frames.
async def my_handler(args, flow_manager):
    from pipecat.frames.frames import TTSUpdateSettingsFrame
    await flow_manager.task.queue_frame(
        TTSUpdateSettingsFrame(settings={"voice": "new-voice-id"})
    )

Methods

initialize

await flow_manager.initialize(initial_node: Optional[NodeConfig] = None) -> None
Initialize the flow manager. Must be called before any node transitions can occur.
ParameterTypeDefaultDescription
initial_nodeNodeConfigNoneInitial node configuration. Can also be set later via set_node_from_config().
Raises: FlowInitializationError if initialization fails.
flow_manager = FlowManager(task=task, llm=llm, context_aggregator=context_aggregator)
await flow_manager.initialize(initial_node=create_initial_node())

set_node_from_config

await flow_manager.set_node_from_config(node_config: NodeConfig) -> None
Transition to a new conversation node. Used to manually trigger node transitions. The node name is taken from the name field in the config, or a UUID is generated if not provided.
ParameterTypeDescription
node_configNodeConfigConfiguration for the new node.
Raises: FlowTransitionError if the manager is not initialized. FlowError if node setup fails.
In most cases, prefer returning the next node from a consolidated function handler instead of calling this method directly.
await flow_manager.set_node_from_config({
    "name": "collect_email",
    "task_messages": [{"role": "system", "content": "Ask the user for their email."}],
    "functions": [collect_email_function],
})

get_current_context

flow_manager.get_current_context() -> List[dict]
Get the current conversation context as a list of messages, including system messages, user messages, and assistant responses. Raises: FlowError if the context aggregator is not available.
messages = flow_manager.get_current_context()

register_action

flow_manager.register_action(action_type: str, handler: Callable) -> None
Register a handler for a custom action type. The handler can be either a legacy handler (action) or a modern handler (action, flow_manager).
ParameterTypeDescription
action_typestrString identifier for the action (e.g., "notify_slack").
handlerCallableAsync function that handles the action.
async def notify_slack(action: dict, flow_manager: FlowManager):
    channel = action.get("channel", "#general")
    text = action.get("text", "")
    await slack_client.post_message(channel=channel, text=text)

flow_manager.register_action("notify_slack", notify_slack)
Once registered, the action can be used in node pre_actions or post_actions:
node_config: NodeConfig = {
    "task_messages": [...],
    "pre_actions": [{"type": "notify_slack", "channel": "#support", "text": "New session started"}],
}

Usage

Basic Setup

from pipecat_flows import FlowManager, FlowResult, NodeConfig

async def create_initial_node() -> NodeConfig:
    return {
        "task_messages": [
            {"role": "system", "content": "Greet the user and ask how you can help."}
        ],
        "functions": [help_function],
    }

flow_manager = FlowManager(
    task=task,
    llm=llm,
    context_aggregator=context_aggregator,
    transport=transport,
)
await flow_manager.initialize(initial_node=await create_initial_node())

Using Global Functions

from pipecat_flows import FlowManager, FlowsFunctionSchema

transfer_function = FlowsFunctionSchema(
    name="transfer_to_human",
    description="Transfer the conversation to a human agent",
    properties={},
    required=[],
    handler=handle_transfer,
)

flow_manager = FlowManager(
    task=task,
    llm=llm,
    context_aggregator=context_aggregator,
    global_functions=[transfer_function],
)