What is FlowsAgent?
FlowsAgent integrates Pipecat Flows into the subagents framework. It provides structured, node-based conversations where each node defines what the agent says, what functions are available, and where the conversation goes next.
Use FlowsAgent when you need deterministic conversation paths — like booking flows, intake forms, or multi-step processes — that would be hard to control with a free-form LLM prompt. You can have multiple FlowsAgent instances in the same system, each with its own conversation paths, and hand off between them or combine them with LLMAgent for free-form sections.
Creating a FlowsAgent
Subclass FlowsAgent and implement three methods:
from pipecat.services.llm_service import LLMService
from pipecat.services.openai.base_llm import OpenAILLMSettings
from pipecat.services.openai.llm import OpenAILLMService
from pipecat_flows import FlowManager, FlowResult, NodeConfig
from pipecat_subagents.agents.flows_agent import FlowsAgent
class ReservationAgent(FlowsAgent):
def __init__(self, name, *, bus, context_aggregator):
super().__init__(name, bus=bus, context_aggregator=context_aggregator)
def build_llm(self) -> LLMService:
return OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMSettings(
system_instruction="You are a reservation assistant. Be friendly and brief.",
),
)
def build_initial_node(self) -> NodeConfig:
return {
"name": "get_party_size",
"task_messages": [
{"role": "user", "content": "Ask how many people are in their party."},
],
"functions": [self.collect_party_size],
}
FlowsAgent requires a context_aggregator parameter. Pass the same LLMContextAggregatorPair instance that your main agent uses, so the flows agent can share conversation context.
Defining node functions
Node functions are async methods that receive a FlowManager and any parameters the LLM extracted. They return a tuple of (result, next_node):
async def collect_party_size(
self, flow_manager: FlowManager, size: int
) -> tuple[FlowResult, NodeConfig]:
"""Record the number of people in the party.
Args:
size (int): Number of people (1-12).
"""
return {"size": size}, self._create_time_node()
def _create_time_node(self) -> NodeConfig:
return {
"name": "get_time",
"task_messages": [
{"role": "user", "content": "Ask what time they'd like to dine."},
],
"functions": [self.check_availability],
}
Each node defines:
name — an identifier for the node
task_messages — messages injected when entering the node (guide the LLM)
functions — which functions are available at this node
The LLM collects user input and calls the appropriate function. The function processes the input and returns the next node, creating a directed conversation flow.
Conditional branching
Node functions can return different next nodes based on results:
async def check_availability(
self, flow_manager: FlowManager, time: str, party_size: int
) -> tuple[FlowResult, NodeConfig]:
"""Check if the requested time is available.
Args:
time (str): Reservation time (e.g. '6:00 PM').
party_size (int): Number of people.
"""
available, alternatives = await self._check(party_size, time)
if available:
return {"time": time, "available": True}, self._create_confirmation_node()
else:
return {"time": time, "available": False, "alternatives": alternatives}, {
"name": "no_availability",
"task_messages": [
{
"role": "user",
"content": f"Apologize that {time} is not available. "
f"Suggest: {', '.join(alternatives)}.",
},
],
"functions": [self.check_availability, self.end_reservation],
}
Ending the flow
Use post_actions with "end_conversation" to end the flow:
async def end_reservation(self, flow_manager: FlowManager) -> tuple[None, NodeConfig]:
"""Confirm and end the reservation."""
return None, {
"name": "end",
"task_messages": [
{"role": "user", "content": "Thank them and say goodbye."},
],
"post_actions": [{"type": "end_conversation"}],
}
Mixing with LLM agents
A common pattern is combining a FlowsAgent with an LLMAgent router. The router handles free-form conversation and transfers to the flows agent when structured input is needed:
class RouterAgent(LLMAgent):
@tool(cancel_on_interruption=False)
async def transfer_to_agent(self, params: FunctionCallParams, agent: str, reason: str):
"""Transfer to another agent.
Args:
agent (str): The agent to transfer to.
reason (str): Why the transfer is happening.
"""
await self.handoff_to(
agent,
args=LLMAgentActivationArgs(
messages=[{"role": "user", "content": reason}],
),
result_callback=params.result_callback,
)
The main agent creates both and adds them as children:
@self._transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
router = RouterAgent("router", bus=self.bus, bridged=())
reservation = ReservationAgent(
"reservation",
bus=self.bus,
context_aggregator=self._context_aggregator,
)
await self.add_agent(router)
await self.add_agent(reservation)
Resuming flows
When a FlowsAgent is reactivated (for example, after being handed off and then handed back), it resumes from build_resume_node() instead of build_initial_node():
def build_resume_node(self) -> NodeConfig:
"""Node to resume from on reactivation."""
return self.build_initial_node() # Start over, or return a different node
Override this to control where the flow resumes.
You can add tools that are available at every node using the @tool decorator:
class ReservationAgent(FlowsAgent):
@tool
async def transfer_to_agent(
self, flow_manager: FlowManager, agent: str, reason: str
) -> tuple[FlowResult, NodeConfig]:
"""Transfer to another agent.
Args:
agent (str): The agent name.
reason (str): Transfer reason.
"""
await self.handoff_to(
agent,
args=LLMAgentActivationArgs(
messages=[{"role": "user", "content": reason}],
),
)
return {"status": "success"}, self.build_initial_node()
Tools decorated with @tool on a FlowsAgent are available at every node, regardless of the node’s functions list. This is useful for escape hatches like transferring to another agent.
Install the Flows extra to use FlowsAgent: pip install pipecat-ai-subagents[flows]