Agent types
Pipecat Subagents provides three built-in agent types, each building on the previous:
Type Purpose BaseAgentFoundation for all agents. Owns a pipeline, manages lifecycle, handles tasks. LLMAgentExtends BaseAgent with an LLM pipeline and automatic @tool registration. FlowsAgentExtends LLMAgent with structured conversation flows via Pipecat Flows.
BaseAgent
Every agent extends BaseAgent. It defines its own pipeline, manages its lifecycle, can hand off control to other agents, and can coordinate tasks with workers.
LLMAgent
LLMAgent extends BaseAgent with an LLM pipeline. You provide the LLM service and define tools via the @tool decorator — the framework handles the rest.
FlowsAgent
FlowsAgent extends LLMAgent with structured conversation flows via Pipecat Flows. You define nodes and transitions for deterministic conversation paths.
LLMAgent and FlowsAgent are covered in detail in the Fundamentals section. This guide focuses on BaseAgent and the overall system.
The AgentRunner
The AgentRunner is the entry point for every subagent system. It:
Creates and manages the shared message bus
Starts agent pipelines and manages their lifecycle
Tracks agent readiness through a registry
Coordinates graceful shutdown
from pipecat_subagents.runner import AgentRunner
runner = AgentRunner()
When you don’t provide a bus, the runner creates an AsyncQueueBus automatically — an in-process bus backed by asyncio queues. For distributed setups, you can pass a RedisBus instead.
Your first agent
Here’s the simplest possible subagent system — a single BaseAgent running a complete voice pipeline:
import os
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesAppendFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService, CartesiaTTSSettings
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.base_llm import OpenAILLMSettings
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat_subagents.agents import BaseAgent
from pipecat_subagents.bus import AgentBus
from pipecat_subagents.runner import AgentRunner
class SimpleAgent ( BaseAgent ):
def __init__ ( self , name : str , * , bus : AgentBus, transport : BaseTransport):
super (). __init__ (name, bus = bus)
self ._transport = transport
async def build_pipeline ( self ) -> Pipeline:
stt = DeepgramSTTService( api_key = os.getenv( "DEEPGRAM_API_KEY" ))
tts = CartesiaTTSService(
api_key = os.getenv( "CARTESIA_API_KEY" ),
settings = CartesiaTTSSettings(
voice = "9626c31c-bec5-4cca-baa8-f8ba9e84c8bc" ,
),
)
llm = OpenAILLMService(
api_key = os.getenv( "OPENAI_API_KEY" ),
settings = OpenAILLMSettings(
system_instruction = "You are a helpful voice assistant. Keep responses brief." ,
),
)
context = LLMContext()
context_aggregator = LLMContextAggregatorPair(
context,
user_params = LLMUserAggregatorParams( vad_analyzer = SileroVADAnalyzer()),
)
@self._transport.event_handler ( "on_client_connected" )
async def on_client_connected ( transport , client ):
await self .queue_frame(
LLMMessagesAppendFrame(
messages = [{ "role" : "developer" , "content" : "Greet the user." }],
run_llm = True ,
)
)
@self._transport.event_handler ( "on_client_disconnected" )
async def on_client_disconnected ( transport , client ):
await self .end()
return Pipeline(
[
self ._transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
self ._transport.output(),
context_aggregator.assistant(),
]
)
This looks similar to a regular Pipecat pipeline — and that’s the point. A single agent wraps a standard pipeline with lifecycle management.
Running the agent
To run the agent, create a runner, add the agent, and call run():
async def run_bot ( transport : BaseTransport, runner_args : RunnerArguments):
runner = AgentRunner( handle_sigint = runner_args.handle_sigint)
agent = SimpleAgent( "assistant" , bus = runner.bus, transport = transport)
await runner.add_agent(agent)
await runner.run()
The runner blocks until end() or cancel() is called. In this case, the agent calls self.end() when the client disconnects.
Lifecycle hooks
BaseAgent provides hooks you can override to react to lifecycle events. Always call super() in your overrides:
class MyAgent ( BaseAgent ):
async def on_ready ( self ) -> None :
"""Called once when the agent's pipeline is ready."""
await super ().on_ready()
logger.info( f "Agent { self .name } is ready" )
async def on_finished ( self ) -> None :
"""Called when the agent's pipeline finishes."""
await super ().on_finished()
async def on_error ( self , error : str , fatal : bool ) -> None :
"""Called when a pipeline error occurs."""
await super ().on_error(error, fatal)
Adding child agents
Agents can have child agents. This is how you build multi-agent systems — the main agent adds LLM agents as children:
@self._transport.event_handler ( "on_client_connected" )
async def on_client_connected ( transport , client ):
greeter = GreeterAgent( "greeter" , bus = self .bus)
support = SupportAgent( "support" , bus = self .bus)
await self .add_agent(greeter)
await self .add_agent(support)
When the parent shuts down, children are shut down too. To get notified when a child agent is ready, use @agent_ready or call watch_agent() explicitly.
What’s next
A single agent works, but the real power comes when multiple agents coordinate. Next, let’s learn how agents transfer control to each other.
Agent Handoff Activation, deactivation, and seamless control transfer