> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# BaseAgent

> Core agent class for the multi-agent framework

## Overview

`BaseAgent` is the abstract base class that all agents inherit from. It handles agent lifecycle, parent-child relationships, bus communication, and task coordination.

Agents that return a pipeline from `build_pipeline()` run a Pipecat pipeline. Agents that don't override `build_pipeline()` operate purely through bus messages (e.g. coordinators, orchestrators).

```python theme={null}
from pipecat_subagents.agents import BaseAgent
```

## Configuration

<ParamField path="name" type="str" required>
  Unique name for this agent. Used for bus message routing and registry lookup.
</ParamField>

<ParamField path="bus" type="AgentBus" required>
  The [`AgentBus`](/api-reference/pipecat-subagents/bus#agentbus) for
  inter-agent communication.
</ParamField>

<ParamField path="active" type="bool" default="True">
  Whether the agent starts active. When `False`, the agent waits for an
  `activate_agent()` call before `on_activated` fires.
</ParamField>

<ParamField path="bridged" type="tuple[str, ...] | None" default="None">
  Bridge configuration for receiving pipeline frames from the bus. `None` means
  not bridged. An empty tuple `()` means bridged, accepting frames from all
  bridges. A tuple of names like `("voice",)` means bridged, accepting only
  frames from those bridges.
</ParamField>

<ParamField path="exclude_frames" type="tuple[type[Frame], ...] | None" default="None">
  Frame types to exclude from bus forwarding when bridged. Lifecycle frames
  (`StartFrame`, `EndFrame`, `CancelFrame`, `StopFrame`) are always excluded.
</ParamField>

## Properties

### bus

```python theme={null}
agent.bus -> AgentBus
```

The bus instance for agent communication.

### active

```python theme={null}
agent.active -> bool
```

Whether this agent is currently active.

### activation\_args

```python theme={null}
agent.activation_args -> Optional[dict]
```

The arguments from the most recent activation, or `None` if the agent is inactive. The value is cleared when the agent is deactivated.

### parent

```python theme={null}
agent.parent -> str | None
```

The name of the parent agent, or `None` if this is a root agent.

### registry

```python theme={null}
agent.registry -> AgentRegistry | None
```

The shared agent registry, if set by a runner.

### bridged

```python theme={null}
agent.bridged -> bool
```

Whether this agent is bridged (receives pipeline frames from the bus).

### ready

```python theme={null}
agent.ready -> bool
```

Whether this agent's pipeline has started and is ready to operate.

### started\_at

```python theme={null}
agent.started_at -> float | None
```

Unix timestamp when this agent became ready, or `None` if not yet started.

### children

```python theme={null}
agent.children -> list[BaseAgent]
```

The list of child agents added via `add_agent()`.

### pipeline\_task

```python theme={null}
agent.pipeline_task -> PipelineTask
```

The `PipelineTask` for this agent. Raises `RuntimeError` if the pipeline task has not been created yet.

### task\_id

```python theme={null}
agent.task_id -> str | None
```

The ID of the task this agent is currently working on, or `None` if idle.

### task\_groups

```python theme={null}
agent.task_groups -> dict[str, TaskGroup]
```

Active task groups launched by this agent, keyed by `task_id`.

## Lifecycle Hooks

Override these methods to react to lifecycle events. Always call `super()` when overriding.

### on\_ready

```python theme={null}
async def on_ready(self) -> None
```

Called once when the agent's pipeline starts and the agent is ready to operate.

### on\_finished

```python theme={null}
async def on_finished(self) -> None
```

Called when the agent's pipeline has finished.

### on\_error

```python theme={null}
async def on_error(self, error: str, fatal: bool) -> None
```

Called when a pipeline error occurs. Override to handle errors (e.g. propagate via `send_error()`, fail a running task, or log and recover).

| Parameter | Type   | Description                        |
| --------- | ------ | ---------------------------------- |
| `error`   | `str`  | Description of the error           |
| `fatal`   | `bool` | Whether the error is unrecoverable |

### on\_activated

```python theme={null}
async def on_activated(self, args: dict | None) -> None
```

Called when this agent is activated. Override to react to activation.

| Parameter | Type   | Description |                                    |
| --------- | ------ | ----------- | ---------------------------------- |
| `args`    | \`dict | None\`      | Optional arguments from the caller |

### on\_deactivated

```python theme={null}
async def on_deactivated(self) -> None
```

Called when this agent is deactivated.

### on\_agent\_ready

```python theme={null}
async def on_agent_ready(self, data: AgentReadyData) -> None
```

Called when another agent is ready to receive messages. For local root agents this fires automatically. For remote agents it fires only for agents watched via `watch_agent()`. For child agents it fires only on the parent.

| Parameter | Type                                                                      | Description                       |
| --------- | ------------------------------------------------------------------------- | --------------------------------- |
| `data`    | [`AgentReadyData`](/api-reference/pipecat-subagents/types#agentreadydata) | Information about the ready agent |

### on\_agent\_error

```python theme={null}
async def on_agent_error(self, data: AgentErrorData) -> None
```

Called when a child agent reports an error.

| Parameter | Type                                                                      | Description                 |
| --------- | ------------------------------------------------------------------------- | --------------------------- |
| `data`    | [`AgentErrorData`](/api-reference/pipecat-subagents/types#agenterrordata) | Information about the error |

## Agent Management

### add\_agent

```python theme={null}
async def add_agent(self, agent: BaseAgent) -> None
```

Register a child agent under this parent. The child's lifecycle (end, cancel) is automatically managed by this parent agent.

| Parameter | Type        | Description                     |
| --------- | ----------- | ------------------------------- |
| `agent`   | `BaseAgent` | The child agent instance to add |

### activate\_agent

```python theme={null}
async def activate_agent(
    self,
    agent_name: str,
    *,
    args: AgentActivationArgs | None = None,
) -> None
```

Activate an agent by name. The target agent's `on_activated` hook will be called with the provided arguments.

| Parameter    | Type                  | Default | Description                   |                                       |
| ------------ | --------------------- | ------- | ----------------------------- | ------------------------------------- |
| `agent_name` | `str`                 |         | Name of the agent to activate |                                       |
| `args`       | \`AgentActivationArgs | None\`  | `None`                        | Arguments forwarded to `on_activated` |

### deactivate\_agent

```python theme={null}
async def deactivate_agent(self, agent_name: str) -> None
```

Deactivate an agent by name. The target agent's `on_deactivated` hook will be called.

| Parameter    | Type  | Description                     |
| ------------ | ----- | ------------------------------- |
| `agent_name` | `str` | Name of the agent to deactivate |

### handoff\_to

```python theme={null}
async def handoff_to(
    self,
    agent_name: str,
    *,
    args: AgentActivationArgs | None = None,
) -> None
```

Hand off to another agent. Deactivates this agent and activates the target. For independent control, use `activate_agent()` and `deactivate_agent()` directly.

| Parameter    | Type                  | Default | Description                      |                                       |
| ------------ | --------------------- | ------- | -------------------------------- | ------------------------------------- |
| `agent_name` | `str`                 |         | Name of the agent to hand off to |                                       |
| `args`       | \`AgentActivationArgs | None\`  | `None`                           | Arguments forwarded to `on_activated` |

### watch\_agent

```python theme={null}
async def watch_agent(self, agent_name: str) -> None
```

Request notification when an agent registers. If the agent is already registered, `on_agent_ready` fires immediately. Otherwise it fires when the agent eventually registers.

| Parameter    | Type  | Description                    |
| ------------ | ----- | ------------------------------ |
| `agent_name` | `str` | Name of the agent to watch for |

### end

```python theme={null}
async def end(self, *, reason: str | None = None) -> None
```

Request a graceful end of the session.

| Parameter | Type  | Default | Description |                                  |
| --------- | ----- | ------- | ----------- | -------------------------------- |
| `reason`  | \`str | None\`  | `None`      | Human-readable reason for ending |

### cancel

```python theme={null}
async def cancel(self) -> None
```

Request an immediate cancellation of all agents.

## Task Coordination

### request\_task

```python theme={null}
async def request_task(
    self,
    agent_name: str,
    *,
    name: str | None = None,
    payload: dict | None = None,
    timeout: float | None = None,
) -> str
```

Send a task request to a single agent (fire-and-forget). Waits for the agent to be ready before sending the request. Does not wait for the task to complete; use callbacks (`on_task_response`, `on_task_completed`) or `task()` for that.

| Parameter    | Type    | Default | Description                           |                                                      |
| ------------ | ------- | ------- | ------------------------------------- | ---------------------------------------------------- |
| `agent_name` | `str`   |         | Name of the agent to send the task to |                                                      |
| `name`       | \`str   | None\`  | `None`                                | Task name for routing to a named `@task` handler     |
| `payload`    | \`dict  | None\`  | `None`                                | Structured data describing the work                  |
| `timeout`    | \`float | None\`  | `None`                                | Timeout in seconds; auto-cancels after this duration |

**Returns:** The generated `task_id`.

### task

```python theme={null}
def task(
    self,
    agent_name: str,
    *,
    name: str | None = None,
    payload: dict | None = None,
    timeout: float | None = None,
) -> TaskContext
```

Create a single-agent task context manager. Waits for the agent to be ready, sends a task request, and waits for the response on exit. Supports `async for` inside the block to receive intermediate events.

| Parameter    | Type    | Default | Description                           |                                                  |
| ------------ | ------- | ------- | ------------------------------------- | ------------------------------------------------ |
| `agent_name` | `str`   |         | Name of the agent to send the task to |                                                  |
| `name`       | \`str   | None\`  | `None`                                | Task name for routing to a named `@task` handler |
| `payload`    | \`dict  | None\`  | `None`                                | Structured data describing the work              |
| `timeout`    | \`float | None\`  | `None`                                | Timeout in seconds                               |

**Returns:** A [`TaskContext`](/api-reference/pipecat-subagents/types#taskcontext) to use with `async with`.

```python theme={null}
async with self.task("worker", payload=data) as t:
    async for event in t:
        if event.type == TaskGroupEvent.UPDATE:
            print(event.data)

print(t.response)
```

### request\_task\_group

```python theme={null}
async def request_task_group(
    self,
    *agent_names: str,
    name: str | None = None,
    payload: dict | None = None,
    timeout: float | None = None,
    cancel_on_error: bool = True,
) -> str
```

Send a task request to multiple agents (fire-and-forget). Waits for all agents to be ready before sending requests.

| Parameter         | Type    | Default | Description                                    |                                                 |
| ----------------- | ------- | ------- | ---------------------------------------------- | ----------------------------------------------- |
| `*agent_names`    | `str`   |         | Names of the agents to send the task to        |                                                 |
| `name`            | \`str   | None\`  | `None`                                         | Task name for routing to named `@task` handlers |
| `payload`         | \`dict  | None\`  | `None`                                         | Structured data describing the work             |
| `timeout`         | \`float | None\`  | `None`                                         | Timeout in seconds                              |
| `cancel_on_error` | `bool`  | `True`  | Whether to cancel the group if a worker errors |                                                 |

**Returns:** The generated `task_id` shared by all agents in the group.

### task\_group

```python theme={null}
def task_group(
    self,
    *agent_names: str,
    name: str | None = None,
    payload: dict | None = None,
    timeout: float | None = None,
    cancel_on_error: bool = True,
) -> TaskGroupContext
```

Create a task group context manager. Waits for agents to be ready, sends task requests, and waits for all responses on exit. Supports `async for` inside the block to receive intermediate events.

| Parameter         | Type    | Default | Description                                    |                                                 |
| ----------------- | ------- | ------- | ---------------------------------------------- | ----------------------------------------------- |
| `*agent_names`    | `str`   |         | Names of the agents to send the task to        |                                                 |
| `name`            | \`str   | None\`  | `None`                                         | Task name for routing to named `@task` handlers |
| `payload`         | \`dict  | None\`  | `None`                                         | Structured data describing the work             |
| `timeout`         | \`float | None\`  | `None`                                         | Timeout in seconds                              |
| `cancel_on_error` | `bool`  | `True`  | Whether to cancel the group if a worker errors |                                                 |

**Returns:** A [`TaskGroupContext`](/api-reference/pipecat-subagents/types#taskgroupcontext) to use with `async with`.

```python theme={null}
async with self.task_group("w1", "w2", payload=data) as tg:
    async for event in tg:
        if event.type == TaskGroupEvent.UPDATE:
            print(f"{event.agent_name}: {event.data}")

for name, result in tg.responses.items():
    print(name, result)
```

### cancel\_task

```python theme={null}
async def cancel_task(self, task_id: str, *, reason: str | None = None) -> None
```

Cancel a running task group.

| Parameter | Type  | Default | Description                   |                                        |
| --------- | ----- | ------- | ----------------------------- | -------------------------------------- |
| `task_id` | `str` |         | The task identifier to cancel |                                        |
| `reason`  | \`str | None\`  | `None`                        | Human-readable reason for cancellation |

### request\_task\_update

```python theme={null}
async def request_task_update(self, task_id: str, agent_name: str) -> None
```

Request a progress update from a task agent.

| Parameter    | Type  | Description                                 |
| ------------ | ----- | ------------------------------------------- |
| `task_id`    | `str` | The task identifier                         |
| `agent_name` | `str` | Name of the agent to request an update from |

### send\_task\_response

```python theme={null}
async def send_task_response(
    self,
    response: dict | None = None,
    *,
    status: TaskStatus = TaskStatus.COMPLETED,
    urgent: bool = False,
) -> None
```

Send a task response back to the requester. After sending, the agent is ready to accept a new task.

| Parameter  | Type                                                              | Default     | Description                  |             |
| ---------- | ----------------------------------------------------------------- | ----------- | ---------------------------- | ----------- |
| `response` | \`dict                                                            | None\`      | `None`                       | Result data |
| `status`   | [`TaskStatus`](/api-reference/pipecat-subagents/types#taskstatus) | `COMPLETED` | Completion status            |             |
| `urgent`   | `bool`                                                            | `False`     | Deliver with system priority |             |

### send\_task\_update

```python theme={null}
async def send_task_update(
    self,
    update: dict | None = None,
    *,
    urgent: bool = False,
) -> None
```

Send a progress update to the requester.

| Parameter | Type   | Default | Description                  |               |
| --------- | ------ | ------- | ---------------------------- | ------------- |
| `update`  | \`dict | None\`  | `None`                       | Progress data |
| `urgent`  | `bool` | `False` | Deliver with system priority |               |

### send\_task\_stream\_start

```python theme={null}
async def send_task_stream_start(self, data: dict | None = None) -> None
```

Begin streaming task results back to the requester.

| Parameter | Type   | Default | Description |                                    |
| --------- | ------ | ------- | ----------- | ---------------------------------- |
| `data`    | \`dict | None\`  | `None`      | Optional metadata about the stream |

### send\_task\_stream\_data

```python theme={null}
async def send_task_stream_data(self, data: dict | None = None) -> None
```

Send a streaming chunk to the requester.

| Parameter | Type   | Default | Description |                   |
| --------- | ------ | ------- | ----------- | ----------------- |
| `data`    | \`dict | None\`  | `None`      | The chunk payload |

### send\_task\_stream\_end

```python theme={null}
async def send_task_stream_end(self, data: dict | None = None) -> None
```

End the current stream and mark this agent's task as complete.

| Parameter | Type   | Default | Description |                         |
| --------- | ------ | ------- | ----------- | ----------------------- |
| `data`    | \`dict | None\`  | `None`      | Optional final metadata |

## Task Hooks

Override these methods to handle task events. Always call `super()` when overriding.

### on\_task\_request

```python theme={null}
async def on_task_request(self, message: BusTaskRequestMessage) -> None
```

Called when this agent receives a task request. Override to perform work. Use `send_task_update()` to report progress and `send_task_response()` to return results.

### on\_task\_response

```python theme={null}
async def on_task_response(self, message: BusTaskResponseMessage) -> None
```

Called when a task agent sends a response. Override to process individual results as they arrive.

### on\_task\_update

```python theme={null}
async def on_task_update(self, message: BusTaskUpdateMessage) -> None
```

Called when a task agent sends a progress update.

### on\_task\_update\_requested

```python theme={null}
async def on_task_update_requested(self, message: BusTaskUpdateRequestMessage) -> None
```

Called when the requester asks for a progress update. Override to send back a progress update via `send_task_update()`.

### on\_task\_completed

```python theme={null}
async def on_task_completed(self, result: TaskGroupResponse) -> None
```

Called when all agents in a task group have responded.

### on\_task\_error

```python theme={null}
async def on_task_error(self, message: BusTaskResponseMessage) -> None
```

Called when a task group is cancelled due to a worker error. Fires when a worker responds with `ERROR` or `FAILED` status and `cancel_on_error` is set.

### on\_task\_stream\_start

```python theme={null}
async def on_task_stream_start(self, message: BusTaskStreamStartMessage) -> None
```

Called when a task agent begins streaming.

### on\_task\_stream\_data

```python theme={null}
async def on_task_stream_data(self, message: BusTaskStreamDataMessage) -> None
```

Called for each streaming chunk from a task agent.

### on\_task\_stream\_end

```python theme={null}
async def on_task_stream_end(self, message: BusTaskStreamEndMessage) -> None
```

Called when a task agent finishes streaming.

### on\_task\_cancelled

```python theme={null}
async def on_task_cancelled(self, message: BusTaskCancelMessage) -> None
```

Called when this agent's task is cancelled by the requester. Override to clean up resources or stop in-progress work.

## Pipeline

### build\_pipeline

```python theme={null}
async def build_pipeline(self) -> Pipeline
```

Return this agent's pipeline. Override to define a processing pipeline. The default returns a no-op pipeline for agents that operate purely through bus messages.

### create\_pipeline

```python theme={null}
async def create_pipeline(self, user_pipeline: Pipeline) -> Pipeline
```

Assemble the final pipeline from the user pipeline. When bridged, wraps the pipeline with bus edge processors.

| Parameter       | Type       | Description                                 |
| --------------- | ---------- | ------------------------------------------- |
| `user_pipeline` | `Pipeline` | The pipeline returned by `build_pipeline()` |

### build\_pipeline\_task

```python theme={null}
def build_pipeline_task(self, pipeline: Pipeline) -> PipelineTask
```

Create the `PipelineTask` for this agent's pipeline. Override to customize task parameters (e.g. enable interruptions).

| Parameter  | Type       | Description                  |
| ---------- | ---------- | ---------------------------- |
| `pipeline` | `Pipeline` | The fully assembled pipeline |

### queue\_frame

```python theme={null}
async def queue_frame(
    self,
    frame: Frame,
    direction: FrameDirection = FrameDirection.DOWNSTREAM,
) -> None
```

Queue a frame into this agent's pipeline task.

| Parameter   | Type             | Default      | Description                       |
| ----------- | ---------------- | ------------ | --------------------------------- |
| `frame`     | `Frame`          |              | The frame to queue                |
| `direction` | `FrameDirection` | `DOWNSTREAM` | Direction the frame should travel |

### queue\_frames

```python theme={null}
async def queue_frames(
    self,
    frames,
    direction: FrameDirection = FrameDirection.DOWNSTREAM,
) -> None
```

Queue multiple frames into this agent's pipeline task.

| Parameter   | Type             | Default      | Description                        |
| ----------- | ---------------- | ------------ | ---------------------------------- |
| `frames`    | `list[Frame]`    |              | The frames to queue                |
| `direction` | `FrameDirection` | `DOWNSTREAM` | Direction the frames should travel |

## Bus

### send\_message

```python theme={null}
async def send_message(self, message: BusMessage) -> None
```

Send a message on the bus.

| Parameter | Type                                                      | Description             |
| --------- | --------------------------------------------------------- | ----------------------- |
| `message` | [`BusMessage`](/api-reference/pipecat-subagents/messages) | The bus message to send |

### send\_error

```python theme={null}
async def send_error(self, error: str) -> None
```

Report an error on the bus. Child agents send a local-only message to the parent. Root agents broadcast over the network.

| Parameter | Type  | Description              |
| --------- | ----- | ------------------------ |
| `error`   | `str` | Description of the error |

### on\_bus\_message

```python theme={null}
async def on_bus_message(self, message: BusMessage) -> None
```

Called for every bus message after built-in lifecycle handling. Override to handle custom message types.

| Parameter | Type                                                      | Description                |
| --------- | --------------------------------------------------------- | -------------------------- |
| `message` | [`BusMessage`](/api-reference/pipecat-subagents/messages) | The bus message to process |
