Skip to main content

Initialization

Initialize your flow by creating a FlowManager instance and calling initialize() to start the conversation. First, create the FlowManager:
flow_manager = FlowManager(
    task=task,                              # PipelineTask
    llm=llm,                                # LLMService
    context_aggregator=context_aggregator,  # Context aggregator
    transport=transport,                    # Transport
)
Then, initialize by passing the first NodeConfig into the initialize() method:
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
    logger.info(f"Client connected")
    # Kick off the conversation.
    await flow_manager.initialize(create_initial_node())

Cross-Node State

Pipecat Flows supports cross-node state through the flow_manager.state dictionary. This persistent storage lets you share data across nodes throughout the entire conversation:
async def record_favorite_color_and_set_next_node(
    args: FlowArgs, flow_manager: FlowManager
) -> tuple[str, NodeConfig]:
    """Function handler that records the color then sets the next node.

    Here "record" means print to the console, but any logic could go here;
    Write to a database, make an API call, etc.
    """
    flow_manager.state["color"] = args["color"]  # Cross-node state setting
    print(f"Your favorite color is: {args['color']}")
    return args["color"], create_end_node()

Global Functions

Pipecat Flows supports defining functions that are available across all nodes in your flow. They’re defined in the same way as node-specific functions, but are passed into the FlowManager at initialization:
flow_manager = FlowManager(
    task=task,
    llm=llm,
    context_aggregator=context_aggregator,
    transport=transport,
    global_functions=[global_function_1, global_function_2], # Cross-node functions
)

Usage Example

Here’s an example that ties together all the concepts covered in the guides:
def create_initial_node() -> NodeConfig:
    """Create the initial node of the flow.

    Define the bot's role and task for the node as well as the function for it to call.
    The function call includes a handler which provides the function call result to
    Pipecat and then transitions to the next node.
    """
    record_favorite_color_func = FlowsFunctionSchema(
        name="record_favorite_color_func",
        description="Record the color the user said is their favorite.",
        required=["color"],
        handler=record_favorite_color_and_set_next_node,
        properties={"color": {"type": "string"}},
    )

    return {
        "name": "initial",
        "role_message": "You are an inquisitive child. Use very simple language. Ask simple questions. You must ALWAYS use one of the available functions to progress the conversation. Your responses will be converted to audio. Avoid outputting special characters and emojis.",
        "task_messages": [
            {
                "role": "system",
                "content": "Say 'Hello world' and ask what is the user's favorite color.",
            }
        ],
        "functions": [record_favorite_color_func],
    }