Overview
BaseAgent is the abstract base class that all agents inherit from. It handles agent lifecycle, parent-child relationships, bus communication, and task coordination.
Agents that return a pipeline from build_pipeline() run a Pipecat pipeline. Agents that don’t override build_pipeline() operate purely through bus messages (e.g. coordinators, orchestrators).
from pipecat_subagents.agents import BaseAgent
Configuration
Unique name for this agent. Used for bus message routing and registry lookup.
The AgentBus for
inter-agent communication.
Whether the agent starts active. When False, the agent waits for an
activate_agent() call before on_activated fires.
bridged
Optional[tuple[str, ...]]
default:"None"
Bridge configuration for receiving pipeline frames from the bus. None means
not bridged. An empty tuple () means bridged, accepting frames from all
bridges. A tuple of names like ("voice",) means bridged, accepting only
frames from those bridges.
exclude_frames
Optional[tuple[type[Frame], ...]]
default:"None"
Frame types to exclude from bus forwarding when bridged. Lifecycle frames
(StartFrame, EndFrame, CancelFrame, StopFrame) are always excluded.
Properties
bus
The bus instance for agent communication.
active
Whether this agent is currently active.
parent
agent.parent -> Optional[str]
The name of the parent agent, or None if this is a root agent.
registry
agent.registry -> Optional[AgentRegistry]
The shared agent registry, if set by a runner.
bridged
Whether this agent is bridged (receives pipeline frames from the bus).
started_at
agent.started_at -> Optional[float]
Unix timestamp when this agent became ready, or None if not yet started.
children
agent.children -> list[BaseAgent]
The list of child agents added via add_agent().
pipeline_task
agent.pipeline_task -> PipelineTask
The PipelineTask for this agent. Raises RuntimeError if the pipeline task has not been created yet.
task_id
agent.task_id -> Optional[str]
The ID of the task this agent is currently working on, or None if idle.
task_groups
agent.task_groups -> dict[str, TaskGroup]
Active task groups launched by this agent, keyed by task_id.
Lifecycle Hooks
Override these methods to react to lifecycle events. Always call super() when overriding.
on_ready
async def on_ready(self) -> None
Called once when the agent’s pipeline starts and the agent is ready to operate.
on_finished
async def on_finished(self) -> None
Called when the agent’s pipeline has finished.
on_error
async def on_error(self, error: str, fatal: bool) -> None
Called when a pipeline error occurs. Override to handle errors (e.g. propagate via send_error(), fail a running task, or log and recover).
| Parameter | Type | Description |
|---|
error | str | Description of the error |
fatal | bool | Whether the error is unrecoverable |
on_activated
async def on_activated(self, args: Optional[dict]) -> None
Called when this agent is activated. Override to react to activation.
| Parameter | Type | Description |
|---|
args | Optional[dict] | Optional arguments from the caller |
on_deactivated
async def on_deactivated(self) -> None
Called when this agent is deactivated.
on_agent_ready
async def on_agent_ready(self, data: AgentReadyData) -> None
Called when another agent is ready to receive messages. For local root agents this fires automatically. For remote agents it fires only for agents watched via watch_agent(). For child agents it fires only on the parent.
| Parameter | Type | Description |
|---|
data | AgentReadyData | Information about the ready agent |
on_agent_error
async def on_agent_error(self, data: AgentErrorData) -> None
Called when a child agent reports an error.
| Parameter | Type | Description |
|---|
data | AgentErrorData | Information about the error |
Agent Management
add_agent
async def add_agent(self, agent: BaseAgent) -> None
Register a child agent under this parent. The child’s lifecycle (end, cancel) is automatically managed by this parent agent.
| Parameter | Type | Description |
|---|
agent | BaseAgent | The child agent instance to add |
activate_agent
async def activate_agent(
self,
agent_name: str,
*,
args: Optional[AgentActivationArgs] = None,
) -> None
Activate an agent by name. The target agent’s on_activated hook will be called with the provided arguments.
| Parameter | Type | Default | Description |
|---|
agent_name | str | | Name of the agent to activate |
args | Optional[AgentActivationArgs] | None | Arguments forwarded to on_activated |
deactivate_agent
async def deactivate_agent(self, agent_name: str) -> None
Deactivate an agent by name. The target agent’s on_deactivated hook will be called.
| Parameter | Type | Description |
|---|
agent_name | str | Name of the agent to deactivate |
handoff_to
async def handoff_to(
self,
agent_name: str,
*,
args: Optional[AgentActivationArgs] = None,
) -> None
Hand off to another agent. Deactivates this agent and activates the target. For independent control, use activate_agent() and deactivate_agent() directly.
| Parameter | Type | Default | Description |
|---|
agent_name | str | | Name of the agent to hand off to |
args | Optional[AgentActivationArgs] | None | Arguments forwarded to on_activated |
watch_agent
async def watch_agent(self, agent_name: str) -> None
Request notification when an agent registers. If the agent is already registered, on_agent_ready fires immediately. Otherwise it fires when the agent eventually registers.
| Parameter | Type | Description |
|---|
agent_name | str | Name of the agent to watch for |
end
async def end(self, *, reason: Optional[str] = None) -> None
Request a graceful end of the session.
| Parameter | Type | Default | Description |
|---|
reason | Optional[str] | None | Human-readable reason for ending |
cancel
async def cancel(self) -> None
Request an immediate cancellation of all agents.
Task Coordination
request_task
async def request_task(
self,
agent_name: str,
*,
name: Optional[str] = None,
payload: Optional[dict] = None,
timeout: Optional[float] = None,
) -> str
Send a task request to a single agent (fire-and-forget). Waits for the agent to be ready before sending the request. Does not wait for the task to complete; use callbacks (on_task_response, on_task_completed) or task() for that.
| Parameter | Type | Default | Description |
|---|
agent_name | str | | Name of the agent to send the task to |
name | Optional[str] | None | Task name for routing to a named @task handler |
payload | Optional[dict] | None | Structured data describing the work |
timeout | Optional[float] | None | Timeout in seconds; auto-cancels after this duration |
Returns: The generated task_id.
task
def task(
self,
agent_name: str,
*,
name: Optional[str] = None,
payload: Optional[dict] = None,
timeout: Optional[float] = None,
) -> TaskContext
Create a single-agent task context manager. Waits for the agent to be ready, sends a task request, and waits for the response on exit. Supports async for inside the block to receive intermediate events.
| Parameter | Type | Default | Description |
|---|
agent_name | str | | Name of the agent to send the task to |
name | Optional[str] | None | Task name for routing to a named @task handler |
payload | Optional[dict] | None | Structured data describing the work |
timeout | Optional[float] | None | Timeout in seconds |
Returns: A TaskContext to use with async with.
async with self.task("worker", payload=data) as t:
async for event in t:
if event.type == TaskGroupEvent.UPDATE:
print(event.data)
print(t.response)
request_task_group
async def request_task_group(
self,
*agent_names: str,
name: Optional[str] = None,
payload: Optional[dict] = None,
timeout: Optional[float] = None,
cancel_on_error: bool = True,
) -> str
Send a task request to multiple agents (fire-and-forget). Waits for all agents to be ready before sending requests.
| Parameter | Type | Default | Description |
|---|
*agent_names | str | | Names of the agents to send the task to |
name | Optional[str] | None | Task name for routing to named @task handlers |
payload | Optional[dict] | None | Structured data describing the work |
timeout | Optional[float] | None | Timeout in seconds |
cancel_on_error | bool | True | Whether to cancel the group if a worker errors |
Returns: The generated task_id shared by all agents in the group.
task_group
def task_group(
self,
*agent_names: str,
name: Optional[str] = None,
payload: Optional[dict] = None,
timeout: Optional[float] = None,
cancel_on_error: bool = True,
) -> TaskGroupContext
Create a task group context manager. Waits for agents to be ready, sends task requests, and waits for all responses on exit. Supports async for inside the block to receive intermediate events.
| Parameter | Type | Default | Description |
|---|
*agent_names | str | | Names of the agents to send the task to |
name | Optional[str] | None | Task name for routing to named @task handlers |
payload | Optional[dict] | None | Structured data describing the work |
timeout | Optional[float] | None | Timeout in seconds |
cancel_on_error | bool | True | Whether to cancel the group if a worker errors |
Returns: A TaskGroupContext to use with async with.
async with self.task_group("w1", "w2", payload=data) as tg:
async for event in tg:
if event.type == TaskGroupEvent.UPDATE:
print(f"{event.agent_name}: {event.data}")
for name, result in tg.responses.items():
print(name, result)
cancel_task
async def cancel_task(self, task_id: str, *, reason: Optional[str] = None) -> None
Cancel a running task group.
| Parameter | Type | Default | Description |
|---|
task_id | str | | The task identifier to cancel |
reason | Optional[str] | None | Human-readable reason for cancellation |
request_task_update
async def request_task_update(self, task_id: str, agent_name: str) -> None
Request a progress update from a task agent.
| Parameter | Type | Description |
|---|
task_id | str | The task identifier |
agent_name | str | Name of the agent to request an update from |
send_task_response
async def send_task_response(
self,
response: Optional[dict] = None,
*,
status: TaskStatus = TaskStatus.COMPLETED,
urgent: bool = False,
) -> None
Send a task response back to the requester. After sending, the agent is ready to accept a new task.
| Parameter | Type | Default | Description |
|---|
response | Optional[dict] | None | Result data |
status | TaskStatus | COMPLETED | Completion status |
urgent | bool | False | Deliver with system priority |
send_task_update
async def send_task_update(
self,
update: Optional[dict] = None,
*,
urgent: bool = False,
) -> None
Send a progress update to the requester.
| Parameter | Type | Default | Description |
|---|
update | Optional[dict] | None | Progress data |
urgent | bool | False | Deliver with system priority |
send_task_stream_start
async def send_task_stream_start(self, data: Optional[dict] = None) -> None
Begin streaming task results back to the requester.
| Parameter | Type | Default | Description |
|---|
data | Optional[dict] | None | Optional metadata about the stream |
send_task_stream_data
async def send_task_stream_data(self, data: Optional[dict] = None) -> None
Send a streaming chunk to the requester.
| Parameter | Type | Default | Description |
|---|
data | Optional[dict] | None | The chunk payload |
send_task_stream_end
async def send_task_stream_end(self, data: Optional[dict] = None) -> None
End the current stream and mark this agent’s task as complete.
| Parameter | Type | Default | Description |
|---|
data | Optional[dict] | None | Optional final metadata |
Task Hooks
Override these methods to handle task events. Always call super() when overriding.
on_task_request
async def on_task_request(self, message: BusTaskRequestMessage) -> None
Called when this agent receives a task request. Override to perform work. Use send_task_update() to report progress and send_task_response() to return results.
on_task_response
async def on_task_response(self, message: BusTaskResponseMessage) -> None
Called when a task agent sends a response. Override to process individual results as they arrive.
on_task_update
async def on_task_update(self, message: BusTaskUpdateMessage) -> None
Called when a task agent sends a progress update.
on_task_update_requested
async def on_task_update_requested(self, message: BusTaskUpdateRequestMessage) -> None
Called when the requester asks for a progress update. Override to send back a progress update via send_task_update().
on_task_completed
async def on_task_completed(self, result: TaskGroupResponse) -> None
Called when all agents in a task group have responded.
on_task_error
async def on_task_error(self, message: BusTaskResponseMessage) -> None
Called when a task group is cancelled due to a worker error. Fires when a worker responds with ERROR or FAILED status and cancel_on_error is set.
on_task_stream_start
async def on_task_stream_start(self, message: BusTaskStreamStartMessage) -> None
Called when a task agent begins streaming.
on_task_stream_data
async def on_task_stream_data(self, message: BusTaskStreamDataMessage) -> None
Called for each streaming chunk from a task agent.
on_task_stream_end
async def on_task_stream_end(self, message: BusTaskStreamEndMessage) -> None
Called when a task agent finishes streaming.
on_task_cancelled
async def on_task_cancelled(self, message: BusTaskCancelMessage) -> None
Called when this agent’s task is cancelled by the requester. Override to clean up resources or stop in-progress work.
Pipeline
build_pipeline
async def build_pipeline(self) -> Pipeline
Return this agent’s pipeline. Override to define a processing pipeline. The default returns a no-op pipeline for agents that operate purely through bus messages.
create_pipeline
async def create_pipeline(self, user_pipeline: Pipeline) -> Pipeline
Assemble the final pipeline from the user pipeline. When bridged, wraps the pipeline with bus edge processors.
| Parameter | Type | Description |
|---|
user_pipeline | Pipeline | The pipeline returned by build_pipeline() |
build_pipeline_task
def build_pipeline_task(self, pipeline: Pipeline) -> PipelineTask
Create the PipelineTask for this agent’s pipeline. Override to customize task parameters (e.g. enable interruptions).
| Parameter | Type | Description |
|---|
pipeline | Pipeline | The fully assembled pipeline |
queue_frame
async def queue_frame(
self,
frame: Frame,
direction: FrameDirection = FrameDirection.DOWNSTREAM,
) -> None
Queue a frame into this agent’s pipeline task.
| Parameter | Type | Default | Description |
|---|
frame | Frame | | The frame to queue |
direction | FrameDirection | DOWNSTREAM | Direction the frame should travel |
queue_frames
async def queue_frames(
self,
frames,
direction: FrameDirection = FrameDirection.DOWNSTREAM,
) -> None
Queue multiple frames into this agent’s pipeline task.
| Parameter | Type | Default | Description |
|---|
frames | list[Frame] | | The frames to queue |
direction | FrameDirection | DOWNSTREAM | Direction the frames should travel |
Bus
send_message
async def send_message(self, message: BusMessage) -> None
Send a message on the bus.
| Parameter | Type | Description |
|---|
message | BusMessage | The bus message to send |
send_error
async def send_error(self, error: str) -> None
Report an error on the bus. Child agents send a local-only message to the parent. Root agents broadcast over the network.
| Parameter | Type | Description |
|---|
error | str | Description of the error |
on_bus_message
async def on_bus_message(self, message: BusMessage) -> None
Called for every bus message after built-in lifecycle handling. Override to handle custom message types.
| Parameter | Type | Description |
|---|
message | BusMessage | The bus message to process |