Skip to main content

Agent types

Pipecat Subagents provides three built-in agent types, each building on the previous:
TypePurpose
BaseAgentFoundation for all agents. Owns a pipeline, manages lifecycle, handles tasks.
LLMAgentExtends BaseAgent with an LLM pipeline and automatic @tool registration.
FlowsAgentExtends LLMAgent with structured conversation flows via Pipecat Flows.

BaseAgent

Every agent extends BaseAgent. It defines its own pipeline, manages its lifecycle, can hand off control to other agents, and can coordinate tasks with workers.

LLMAgent

LLMAgent extends BaseAgent with an LLM pipeline. You provide the LLM service and define tools via the @tool decorator — the framework handles the rest.

FlowsAgent

FlowsAgent extends LLMAgent with structured conversation flows via Pipecat Flows. You define nodes and transitions for deterministic conversation paths.
LLMAgent and FlowsAgent are covered in detail in the Fundamentals section. This guide focuses on BaseAgent and the overall system.

The AgentRunner

The AgentRunner is the entry point for every subagent system. It:
  • Creates and manages the shared message bus
  • Starts agent pipelines and manages their lifecycle
  • Tracks agent readiness through a registry
  • Coordinates graceful shutdown
from pipecat_subagents.runner import AgentRunner

runner = AgentRunner()
When you don’t provide a bus, the runner creates an AsyncQueueBus automatically — an in-process bus backed by asyncio queues. For distributed setups, you can pass a RedisBus instead.

Your first agent

Here’s the simplest possible subagent system — a single BaseAgent running a complete voice pipeline:
import os

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesAppendFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
    LLMContextAggregatorPair,
    LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService, CartesiaTTSSettings
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.base_llm import OpenAILLMSettings
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams

from pipecat_subagents.agents import BaseAgent
from pipecat_subagents.bus import AgentBus
from pipecat_subagents.runner import AgentRunner


class SimpleAgent(BaseAgent):
    def __init__(self, name: str, *, bus: AgentBus, transport: BaseTransport):
        super().__init__(name, bus=bus)
        self._transport = transport

    async def build_pipeline(self) -> Pipeline:
        stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
        tts = CartesiaTTSService(
            api_key=os.getenv("CARTESIA_API_KEY"),
            settings=CartesiaTTSSettings(
                voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc",
            ),
        )
        llm = OpenAILLMService(
            api_key=os.getenv("OPENAI_API_KEY"),
            settings=OpenAILLMSettings(
                system_instruction="You are a helpful voice assistant. Keep responses brief.",
            ),
        )

        context = LLMContext()
        context_aggregator = LLMContextAggregatorPair(
            context,
            user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
        )

        @self._transport.event_handler("on_client_connected")
        async def on_client_connected(transport, client):
            await self.queue_frame(
                LLMMessagesAppendFrame(
                    messages=[{"role": "developer", "content": "Greet the user."}],
                    run_llm=True,
                )
            )

        @self._transport.event_handler("on_client_disconnected")
        async def on_client_disconnected(transport, client):
            await self.end()

        return Pipeline(
            [
                self._transport.input(),
                stt,
                context_aggregator.user(),
                llm,
                tts,
                self._transport.output(),
                context_aggregator.assistant(),
            ]
        )
This looks similar to a regular Pipecat pipeline — and that’s the point. A single agent wraps a standard pipeline with lifecycle management.

Running the agent

To run the agent, create a runner, add the agent, and call run():
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
    runner = AgentRunner(handle_sigint=runner_args.handle_sigint)
    agent = SimpleAgent("assistant", bus=runner.bus, transport=transport)
    await runner.add_agent(agent)
    await runner.run()
The runner blocks until end() or cancel() is called. In this case, the agent calls self.end() when the client disconnects.

Lifecycle hooks

BaseAgent provides hooks you can override to react to lifecycle events. Always call super() in your overrides:
class MyAgent(BaseAgent):
    async def on_ready(self) -> None:
        """Called once when the agent's pipeline is ready."""
        await super().on_ready()
        logger.info(f"Agent {self.name} is ready")

    async def on_finished(self) -> None:
        """Called when the agent's pipeline finishes."""
        await super().on_finished()

    async def on_error(self, error: str, fatal: bool) -> None:
        """Called when a pipeline error occurs."""
        await super().on_error(error, fatal)

Adding child agents

Agents can have child agents. This is how you build multi-agent systems — the main agent adds LLM agents as children:
@self._transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
    greeter = GreeterAgent("greeter", bus=self.bus)
    support = SupportAgent("support", bus=self.bus)
    await self.add_agent(greeter)
    await self.add_agent(support)
When the parent shuts down, children are shut down too. To get notified when a child agent is ready, use @agent_ready or call watch_agent() explicitly.

What’s next

A single agent works, but the real power comes when multiple agents coordinate. Next, let’s learn how agents transfer control to each other.

Agent Handoff

Activation, deactivation, and seamless control transfer