> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# FlowManager

> Core orchestration class for managing conversation flows

## Overview

`FlowManager` orchestrates conversation flows by managing state transitions, function registration, and message handling across different LLM providers.

## Configuration

All parameters are keyword-only.

<ParamField path="task" type="PipelineTask" required>
  Pipeline task instance used for queueing frames into the pipeline.
</ParamField>

<ParamField path="llm" type="LLMService | LLMSwitcher" required>
  LLM service instance or an `LLMSwitcher` for switching between providers at
  runtime. Any `LLMService` subclass is supported, including OpenAI-compatible
  services (Groq, Together, Cerebras, DeepSeek, etc.), Anthropic, Google Gemini,
  and AWS Bedrock.
</ParamField>

<ParamField path="context_aggregator" type="Any" required>
  Context aggregator for managing conversation context. Typically created using
  `LLMContextAggregatorPair` from
  `pipecat.processors.aggregators.llm_response_universal`.
</ParamField>

<ParamField path="context_strategy" type="ContextStrategyConfig" default="ContextStrategyConfig(strategy=ContextStrategy.APPEND)">
  Default context strategy for managing conversation context during node
  transitions. Can be overridden per-node via
  [`NodeConfig.context_strategy`](/api-reference/pipecat-flows/types#nodeconfig).
  See
  [ContextStrategyConfig](/api-reference/pipecat-flows/types#contextstrategyconfig).
</ParamField>

<ParamField path="transport" type="BaseTransport" default="None">
  Transport instance for communication (e.g., `DailyTransport`). When provided,
  accessible via the `transport` property in function and action handlers.
</ParamField>

<ParamField path="global_functions" type="list[FlowsFunctionSchema | FlowsDirectFunction]" default="None">
  Functions that will be available at every node. These are registered once
  during initialization and automatically included alongside node-specific
  functions. Useful for capabilities like "transfer to human" that should be
  accessible from any conversation state.
</ParamField>

## Properties

### state

```python theme={null}
flow_manager.state -> dict[str, Any]
```

Shared state dictionary that persists across node transitions. Use this to store and retrieve conversation data such as user preferences, collected information, or any data that needs to be accessible across different nodes.

```python theme={null}
# Store data
flow_manager.state["user_name"] = "Alice"

# Retrieve data
name = flow_manager.state.get("user_name", "Unknown")
```

### transport

```python theme={null}
flow_manager.transport -> BaseTransport | None
```

The transport instance provided during initialization, or `None` if not set. Use this to interact with the communication platform (e.g., mute participants, access room info).

```python theme={null}
async def my_handler(args, flow_manager):
    transport = flow_manager.transport
    if transport:
        participants = transport.participants()
```

### current\_node

```python theme={null}
flow_manager.current_node -> str | None
```

The identifier of the currently active conversation node. Returns `None` before initialization or if no node has been set.

```python theme={null}
async def my_handler(args, flow_manager):
    if flow_manager.current_node == "collecting_payment":
        await setup_secure_session(flow_manager)
```

### task

```python theme={null}
flow_manager.task -> PipelineTask
```

The pipeline task instance used for frame queueing. Use this for advanced flow control such as queuing custom frames.

```python theme={null}
async def my_handler(args, flow_manager):
    from pipecat.frames.frames import TTSUpdateSettingsFrame
    await flow_manager.task.queue_frame(
        TTSUpdateSettingsFrame(settings={"voice": "new-voice-id"})
    )
```

## Methods

### initialize

```python theme={null}
await flow_manager.initialize(initial_node: NodeConfig | None = None) -> None
```

Initialize the flow manager. Must be called before any node transitions can occur.

| Parameter      | Type         | Default | Description                                                                     |
| -------------- | ------------ | ------- | ------------------------------------------------------------------------------- |
| `initial_node` | `NodeConfig` | `None`  | Initial node configuration. Can also be set later via `set_node_from_config()`. |

**Raises:** `FlowInitializationError` if initialization fails.

```python theme={null}
flow_manager = FlowManager(task=task, llm=llm, context_aggregator=context_aggregator)
await flow_manager.initialize(initial_node=create_initial_node())
```

### set\_node\_from\_config

```python theme={null}
await flow_manager.set_node_from_config(node_config: NodeConfig) -> None
```

Transition to a new conversation node. Used to manually trigger node transitions. The node name is taken from the `name` field in the config, or a UUID is generated if not provided.

| Parameter     | Type         | Description                     |
| ------------- | ------------ | ------------------------------- |
| `node_config` | `NodeConfig` | Configuration for the new node. |

**Raises:** `FlowTransitionError` if the manager is not initialized. `FlowError` if node setup fails.

<Note>
  In most cases, prefer returning the next node from a consolidated function
  handler instead of calling this method directly.
</Note>

```python theme={null}
await flow_manager.set_node_from_config({
    "name": "collect_email",
    "task_messages": [{"role": "developer", "content": "Ask the user for their email."}],
    "functions": [collect_email_function],
})
```

### get\_current\_context

```python theme={null}
flow_manager.get_current_context() -> list[dict]
```

Get the current conversation context as a list of messages, including developer messages, user messages, and assistant responses.

**Raises:** `FlowError` if the context aggregator is not available.

```python theme={null}
messages = flow_manager.get_current_context()
```

### register\_action

```python theme={null}
flow_manager.register_action(action_type: str, handler: Callable) -> None
```

Register a handler for a custom action type. The handler can be either a legacy handler `(action)` or a modern handler `(action, flow_manager)`.

| Parameter     | Type       | Description                                                |
| ------------- | ---------- | ---------------------------------------------------------- |
| `action_type` | `str`      | String identifier for the action (e.g., `"notify_slack"`). |
| `handler`     | `Callable` | Async function that handles the action.                    |

```python theme={null}
async def notify_slack(action: dict, flow_manager: FlowManager):
    channel = action.get("channel", "#general")
    text = action.get("text", "")
    await slack_client.post_message(channel=channel, text=text)

flow_manager.register_action("notify_slack", notify_slack)
```

Once registered, the action can be used in node `pre_actions` or `post_actions`:

```python theme={null}
node_config: NodeConfig = {
    "task_messages": [...],
    "pre_actions": [{"type": "notify_slack", "channel": "#support", "text": "New session started"}],
}
```

## Usage

### Basic Setup

```python theme={null}
from pipecat_flows import FlowManager, FlowResult, NodeConfig

async def create_initial_node() -> NodeConfig:
    return {
        "task_messages": [
            {"role": "developer", "content": "Greet the user and ask how you can help."}
        ],
        "functions": [help_function],
    }

flow_manager = FlowManager(
    task=task,
    llm=llm,
    context_aggregator=context_aggregator,
    transport=transport,
)
await flow_manager.initialize(initial_node=await create_initial_node())
```

### Using Global Functions

```python theme={null}
from pipecat_flows import FlowManager, FlowsFunctionSchema

transfer_function = FlowsFunctionSchema(
    name="transfer_to_human",
    description="Transfer the conversation to a human agent",
    properties={},
    required=[],
    handler=handle_transfer,
)

flow_manager = FlowManager(
    task=task,
    llm=llm,
    context_aggregator=context_aggregator,
    global_functions=[transfer_function],
)
```
