> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Agents and Runner

> Learn about agent types, the AgentRunner, and how to build your first agent.

## Agent types

Pipecat Subagents provides three built-in agent types, each building on the previous:

| Type         | Purpose                                                                       |
| ------------ | ----------------------------------------------------------------------------- |
| `BaseAgent`  | Foundation for all agents. Owns a pipeline, manages lifecycle, handles tasks. |
| `LLMAgent`   | Extends `BaseAgent` with an LLM pipeline and automatic `@tool` registration.  |
| `FlowsAgent` | Extends `LLMAgent` with structured conversation flows via Pipecat Flows.      |

### BaseAgent

Every agent extends `BaseAgent`. It defines its own pipeline, manages its lifecycle, can hand off control to other agents, and can coordinate tasks with workers.

### LLMAgent

`LLMAgent` extends `BaseAgent` with an LLM pipeline. You provide the LLM service and define tools via the `@tool` decorator -- the framework handles the rest.

### FlowsAgent

`FlowsAgent` extends `LLMAgent` with structured conversation flows via Pipecat Flows. You define nodes and transitions for deterministic conversation paths.

<Info>
  `LLMAgent` and `FlowsAgent` are covered in detail in the Fundamentals section. This guide focuses on `BaseAgent` and the overall system.
</Info>

## The AgentRunner

The `AgentRunner` is the entry point for every subagent system. It:

* Creates and manages the shared message bus
* Starts agent pipelines and manages their lifecycle
* Tracks agent readiness through a registry
* Coordinates graceful shutdown

```python theme={null}
from pipecat_subagents.runner import AgentRunner

runner = AgentRunner()
```

When you don't provide a bus, the runner creates an `AsyncQueueBus` automatically -- an in-process bus backed by asyncio queues. For distributed setups, you can pass a `RedisBus` instead.

## Your first agent

Here's the simplest possible subagent system -- a single `BaseAgent` running a complete voice pipeline:

```python theme={null}
import os

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesAppendFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
    LLMContextAggregatorPair,
    LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService, CartesiaTTSSettings
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.base_llm import OpenAILLMSettings
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams

from pipecat_subagents.agents import BaseAgent
from pipecat_subagents.bus import AgentBus
from pipecat_subagents.runner import AgentRunner


class SimpleAgent(BaseAgent):
    def __init__(self, name: str, *, bus: AgentBus, transport: BaseTransport):
        super().__init__(name, bus=bus)
        self._transport = transport

    async def build_pipeline(self) -> Pipeline:
        stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
        tts = CartesiaTTSService(
            api_key=os.getenv("CARTESIA_API_KEY"),
            settings=CartesiaTTSSettings(
                voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc",
            ),
        )
        llm = OpenAILLMService(
            api_key=os.getenv("OPENAI_API_KEY"),
            settings=OpenAILLMSettings(
                system_instruction="You are a helpful voice assistant. Keep responses brief.",
            ),
        )

        context = LLMContext()
        context_aggregator = LLMContextAggregatorPair(
            context,
            user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
        )

        @self._transport.event_handler("on_client_connected")
        async def on_client_connected(transport, client):
            await self.queue_frame(
                LLMMessagesAppendFrame(
                    messages=[{"role": "developer", "content": "Greet the user."}],
                    run_llm=True,
                )
            )

        @self._transport.event_handler("on_client_disconnected")
        async def on_client_disconnected(transport, client):
            await self.end()

        return Pipeline(
            [
                self._transport.input(),
                stt,
                context_aggregator.user(),
                llm,
                tts,
                self._transport.output(),
                context_aggregator.assistant(),
            ]
        )
```

This looks similar to a regular Pipecat pipeline -- and that's the point. A single agent wraps a standard pipeline with lifecycle management.

## Running the agent

To run the agent, create a runner, add the agent, and call `run()`:

```python theme={null}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
    runner = AgentRunner(handle_sigint=runner_args.handle_sigint)
    agent = SimpleAgent("assistant", bus=runner.bus, transport=transport)
    await runner.add_agent(agent)
    await runner.run()
```

The runner blocks until `end()` or `cancel()` is called. In this case, the agent calls `self.end()` when the client disconnects.

## Lifecycle hooks

`BaseAgent` provides hooks you can override to react to lifecycle events. Always call `super()` in your overrides:

```python theme={null}
class MyAgent(BaseAgent):
    async def on_ready(self) -> None:
        """Called once when the agent's pipeline is ready."""
        await super().on_ready()
        logger.info(f"Agent {self.name} is ready")

    async def on_finished(self) -> None:
        """Called when the agent's pipeline finishes."""
        await super().on_finished()

    async def on_error(self, error: str, fatal: bool) -> None:
        """Called when a pipeline error occurs."""
        await super().on_error(error, fatal)
```

## Adding child agents

Agents can have child agents. This is how you build multi-agent systems -- the main agent adds LLM agents as children:

```python theme={null}
@self._transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
    greeter = GreeterAgent("greeter", bus=self.bus)
    support = SupportAgent("support", bus=self.bus)
    await self.add_agent(greeter)
    await self.add_agent(support)
```

When the parent shuts down, children are shut down too. To get notified when a child agent is ready, use `@agent_ready` or call `watch_agent()` explicitly.

## What's next

A single agent works, but the real power comes when multiple agents coordinate. Next, let's learn how agents transfer control to each other.

<Card title="Agent Handoff" icon="arrow-right" href="/subagents/learn/agent-handoff">
  Activation, deactivation, and seamless control transfer
</Card>
