Overview
FlowManager orchestrates conversation flows by managing state transitions, function registration, and message handling across different LLM providers.
Configuration
All parameters are keyword-only.
Pipeline task instance used for queueing frames into the pipeline.
llm
LLMService | LLMSwitcher
required
LLM service instance or an LLMSwitcher for switching between LLM providers
at runtime. Supports OpenAILLMService and any service that extends it (Groq,
Together, Cerebras, DeepSeek, etc.), AnthropicLLMService,
GoogleLLMService, and AWSBedrockLLMService.
Context aggregator for managing conversation context. Typically obtained from
create_context_aggregator() on the LLM service.
transport
BaseTransport
default:"None"
Transport instance for communication (e.g., DailyTransport). When provided,
accessible via the transport property in function and action handlers.
global_functions
List[FlowsFunctionSchema | FlowsDirectFunction]
default:"None"
Functions that will be available at every node. These are registered once
during initialization and automatically included alongside node-specific
functions. Useful for capabilities like “transfer to human” that should be
accessible from any conversation state.
Properties
state
flow_manager.state -> Dict[str, Any]
Shared state dictionary that persists across node transitions. Use this to store and retrieve conversation data such as user preferences, collected information, or any data that needs to be accessible across different nodes.
# Store data
flow_manager.state["user_name"] = "Alice"
# Retrieve data
name = flow_manager.state.get("user_name", "Unknown")
transport
flow_manager.transport -> Optional[BaseTransport]
The transport instance provided during initialization, or None if not set. Use this to interact with the communication platform (e.g., mute participants, access room info).
async def my_handler(args, flow_manager):
transport = flow_manager.transport
if transport:
participants = transport.participants()
current_node
flow_manager.current_node -> Optional[str]
The identifier of the currently active conversation node. Returns None before initialization or if no node has been set.
async def my_handler(args, flow_manager):
if flow_manager.current_node == "collecting_payment":
await setup_secure_session(flow_manager)
task
flow_manager.task -> PipelineTask
The pipeline task instance used for frame queueing. Use this for advanced flow control such as queuing custom frames.
async def my_handler(args, flow_manager):
from pipecat.frames.frames import TTSUpdateSettingsFrame
await flow_manager.task.queue_frame(
TTSUpdateSettingsFrame(settings={"voice": "new-voice-id"})
)
Methods
initialize
await flow_manager.initialize(initial_node: Optional[NodeConfig] = None) -> None
Initialize the flow manager. Must be called before any node transitions can occur.
| Parameter | Type | Default | Description |
|---|
initial_node | NodeConfig | None | Initial node configuration. Can also be set later via set_node_from_config(). |
Raises: FlowInitializationError if initialization fails.
flow_manager = FlowManager(task=task, llm=llm, context_aggregator=context_aggregator)
await flow_manager.initialize(initial_node=create_initial_node())
set_node_from_config
await flow_manager.set_node_from_config(node_config: NodeConfig) -> None
Transition to a new conversation node. Used to manually trigger node transitions. The node name is taken from the name field in the config, or a UUID is generated if not provided.
| Parameter | Type | Description |
|---|
node_config | NodeConfig | Configuration for the new node. |
Raises: FlowTransitionError if the manager is not initialized. FlowError if node setup fails.
In most cases, prefer returning the next node from a consolidated function
handler instead of calling this method directly.
await flow_manager.set_node_from_config({
"name": "collect_email",
"task_messages": [{"role": "system", "content": "Ask the user for their email."}],
"functions": [collect_email_function],
})
get_current_context
flow_manager.get_current_context() -> List[dict]
Get the current conversation context as a list of messages, including system messages, user messages, and assistant responses.
Raises: FlowError if the context aggregator is not available.
messages = flow_manager.get_current_context()
register_action
flow_manager.register_action(action_type: str, handler: Callable) -> None
Register a handler for a custom action type. The handler can be either a legacy handler (action) or a modern handler (action, flow_manager).
| Parameter | Type | Description |
|---|
action_type | str | String identifier for the action (e.g., "notify_slack"). |
handler | Callable | Async function that handles the action. |
async def notify_slack(action: dict, flow_manager: FlowManager):
channel = action.get("channel", "#general")
text = action.get("text", "")
await slack_client.post_message(channel=channel, text=text)
flow_manager.register_action("notify_slack", notify_slack)
Once registered, the action can be used in node pre_actions or post_actions:
node_config: NodeConfig = {
"task_messages": [...],
"pre_actions": [{"type": "notify_slack", "channel": "#support", "text": "New session started"}],
}
Usage
Basic Setup
from pipecat_flows import FlowManager, FlowResult, NodeConfig
async def create_initial_node() -> NodeConfig:
return {
"task_messages": [
{"role": "system", "content": "Greet the user and ask how you can help."}
],
"functions": [help_function],
}
flow_manager = FlowManager(
task=task,
llm=llm,
context_aggregator=context_aggregator,
transport=transport,
)
await flow_manager.initialize(initial_node=await create_initial_node())
Using Global Functions
from pipecat_flows import FlowManager, FlowsFunctionSchema
transfer_function = FlowsFunctionSchema(
name="transfer_to_human",
description="Transfer the conversation to a human agent",
properties={},
required=[],
handler=handle_transfer,
)
flow_manager = FlowManager(
task=task,
llm=llm,
context_aggregator=context_aggregator,
global_functions=[transfer_function],
)