Skip to main content

Overview

BaseAgent is the abstract base class that all agents inherit from. It handles agent lifecycle, parent-child relationships, bus communication, and task coordination. Agents that return a pipeline from build_pipeline() run a Pipecat pipeline. Agents that don’t override build_pipeline() operate purely through bus messages (e.g. coordinators, orchestrators).
from pipecat_subagents.agents import BaseAgent

Configuration

name
str
required
Unique name for this agent. Used for bus message routing and registry lookup.
bus
AgentBus
required
The AgentBus for inter-agent communication.
active
bool
default:"True"
Whether the agent starts active. When False, the agent waits for an activate_agent() call before on_activated fires.
bridged
Optional[tuple[str, ...]]
default:"None"
Bridge configuration for receiving pipeline frames from the bus. None means not bridged. An empty tuple () means bridged, accepting frames from all bridges. A tuple of names like ("voice",) means bridged, accepting only frames from those bridges.
exclude_frames
Optional[tuple[type[Frame], ...]]
default:"None"
Frame types to exclude from bus forwarding when bridged. Lifecycle frames (StartFrame, EndFrame, CancelFrame, StopFrame) are always excluded.

Properties

bus

agent.bus -> AgentBus
The bus instance for agent communication.

active

agent.active -> bool
Whether this agent is currently active.

parent

agent.parent -> Optional[str]
The name of the parent agent, or None if this is a root agent.

registry

agent.registry -> Optional[AgentRegistry]
The shared agent registry, if set by a runner.

bridged

agent.bridged -> bool
Whether this agent is bridged (receives pipeline frames from the bus).

started_at

agent.started_at -> Optional[float]
Unix timestamp when this agent became ready, or None if not yet started.

children

agent.children -> list[BaseAgent]
The list of child agents added via add_agent().

pipeline_task

agent.pipeline_task -> PipelineTask
The PipelineTask for this agent. Raises RuntimeError if the pipeline task has not been created yet.

task_id

agent.task_id -> Optional[str]
The ID of the task this agent is currently working on, or None if idle.

task_groups

agent.task_groups -> dict[str, TaskGroup]
Active task groups launched by this agent, keyed by task_id.

Lifecycle Hooks

Override these methods to react to lifecycle events. Always call super() when overriding.

on_ready

async def on_ready(self) -> None
Called once when the agent’s pipeline starts and the agent is ready to operate.

on_finished

async def on_finished(self) -> None
Called when the agent’s pipeline has finished.

on_error

async def on_error(self, error: str, fatal: bool) -> None
Called when a pipeline error occurs. Override to handle errors (e.g. propagate via send_error(), fail a running task, or log and recover).
ParameterTypeDescription
errorstrDescription of the error
fatalboolWhether the error is unrecoverable

on_activated

async def on_activated(self, args: Optional[dict]) -> None
Called when this agent is activated. Override to react to activation.
ParameterTypeDescription
argsOptional[dict]Optional arguments from the caller

on_deactivated

async def on_deactivated(self) -> None
Called when this agent is deactivated.

on_agent_ready

async def on_agent_ready(self, data: AgentReadyData) -> None
Called when another agent is ready to receive messages. For local root agents this fires automatically. For remote agents it fires only for agents watched via watch_agent(). For child agents it fires only on the parent.
ParameterTypeDescription
dataAgentReadyDataInformation about the ready agent

on_agent_error

async def on_agent_error(self, data: AgentErrorData) -> None
Called when a child agent reports an error.
ParameterTypeDescription
dataAgentErrorDataInformation about the error

Agent Management

add_agent

async def add_agent(self, agent: BaseAgent) -> None
Register a child agent under this parent. The child’s lifecycle (end, cancel) is automatically managed by this parent agent.
ParameterTypeDescription
agentBaseAgentThe child agent instance to add

activate_agent

async def activate_agent(
    self,
    agent_name: str,
    *,
    args: Optional[AgentActivationArgs] = None,
) -> None
Activate an agent by name. The target agent’s on_activated hook will be called with the provided arguments.
ParameterTypeDefaultDescription
agent_namestrName of the agent to activate
argsOptional[AgentActivationArgs]NoneArguments forwarded to on_activated

deactivate_agent

async def deactivate_agent(self, agent_name: str) -> None
Deactivate an agent by name. The target agent’s on_deactivated hook will be called.
ParameterTypeDescription
agent_namestrName of the agent to deactivate

handoff_to

async def handoff_to(
    self,
    agent_name: str,
    *,
    args: Optional[AgentActivationArgs] = None,
) -> None
Hand off to another agent. Deactivates this agent and activates the target. For independent control, use activate_agent() and deactivate_agent() directly.
ParameterTypeDefaultDescription
agent_namestrName of the agent to hand off to
argsOptional[AgentActivationArgs]NoneArguments forwarded to on_activated

watch_agent

async def watch_agent(self, agent_name: str) -> None
Request notification when an agent registers. If the agent is already registered, on_agent_ready fires immediately. Otherwise it fires when the agent eventually registers.
ParameterTypeDescription
agent_namestrName of the agent to watch for

end

async def end(self, *, reason: Optional[str] = None) -> None
Request a graceful end of the session.
ParameterTypeDefaultDescription
reasonOptional[str]NoneHuman-readable reason for ending

cancel

async def cancel(self) -> None
Request an immediate cancellation of all agents.

Task Coordination

request_task

async def request_task(
    self,
    agent_name: str,
    *,
    name: Optional[str] = None,
    payload: Optional[dict] = None,
    timeout: Optional[float] = None,
) -> str
Send a task request to a single agent (fire-and-forget). Waits for the agent to be ready before sending the request. Does not wait for the task to complete; use callbacks (on_task_response, on_task_completed) or task() for that.
ParameterTypeDefaultDescription
agent_namestrName of the agent to send the task to
nameOptional[str]NoneTask name for routing to a named @task handler
payloadOptional[dict]NoneStructured data describing the work
timeoutOptional[float]NoneTimeout in seconds; auto-cancels after this duration
Returns: The generated task_id.

task

def task(
    self,
    agent_name: str,
    *,
    name: Optional[str] = None,
    payload: Optional[dict] = None,
    timeout: Optional[float] = None,
) -> TaskContext
Create a single-agent task context manager. Waits for the agent to be ready, sends a task request, and waits for the response on exit. Supports async for inside the block to receive intermediate events.
ParameterTypeDefaultDescription
agent_namestrName of the agent to send the task to
nameOptional[str]NoneTask name for routing to a named @task handler
payloadOptional[dict]NoneStructured data describing the work
timeoutOptional[float]NoneTimeout in seconds
Returns: A TaskContext to use with async with.
async with self.task("worker", payload=data) as t:
    async for event in t:
        if event.type == TaskGroupEvent.UPDATE:
            print(event.data)

print(t.response)

request_task_group

async def request_task_group(
    self,
    *agent_names: str,
    name: Optional[str] = None,
    payload: Optional[dict] = None,
    timeout: Optional[float] = None,
    cancel_on_error: bool = True,
) -> str
Send a task request to multiple agents (fire-and-forget). Waits for all agents to be ready before sending requests.
ParameterTypeDefaultDescription
*agent_namesstrNames of the agents to send the task to
nameOptional[str]NoneTask name for routing to named @task handlers
payloadOptional[dict]NoneStructured data describing the work
timeoutOptional[float]NoneTimeout in seconds
cancel_on_errorboolTrueWhether to cancel the group if a worker errors
Returns: The generated task_id shared by all agents in the group.

task_group

def task_group(
    self,
    *agent_names: str,
    name: Optional[str] = None,
    payload: Optional[dict] = None,
    timeout: Optional[float] = None,
    cancel_on_error: bool = True,
) -> TaskGroupContext
Create a task group context manager. Waits for agents to be ready, sends task requests, and waits for all responses on exit. Supports async for inside the block to receive intermediate events.
ParameterTypeDefaultDescription
*agent_namesstrNames of the agents to send the task to
nameOptional[str]NoneTask name for routing to named @task handlers
payloadOptional[dict]NoneStructured data describing the work
timeoutOptional[float]NoneTimeout in seconds
cancel_on_errorboolTrueWhether to cancel the group if a worker errors
Returns: A TaskGroupContext to use with async with.
async with self.task_group("w1", "w2", payload=data) as tg:
    async for event in tg:
        if event.type == TaskGroupEvent.UPDATE:
            print(f"{event.agent_name}: {event.data}")

for name, result in tg.responses.items():
    print(name, result)

cancel_task

async def cancel_task(self, task_id: str, *, reason: Optional[str] = None) -> None
Cancel a running task group.
ParameterTypeDefaultDescription
task_idstrThe task identifier to cancel
reasonOptional[str]NoneHuman-readable reason for cancellation

request_task_update

async def request_task_update(self, task_id: str, agent_name: str) -> None
Request a progress update from a task agent.
ParameterTypeDescription
task_idstrThe task identifier
agent_namestrName of the agent to request an update from

send_task_response

async def send_task_response(
    self,
    response: Optional[dict] = None,
    *,
    status: TaskStatus = TaskStatus.COMPLETED,
    urgent: bool = False,
) -> None
Send a task response back to the requester. After sending, the agent is ready to accept a new task.
ParameterTypeDefaultDescription
responseOptional[dict]NoneResult data
statusTaskStatusCOMPLETEDCompletion status
urgentboolFalseDeliver with system priority

send_task_update

async def send_task_update(
    self,
    update: Optional[dict] = None,
    *,
    urgent: bool = False,
) -> None
Send a progress update to the requester.
ParameterTypeDefaultDescription
updateOptional[dict]NoneProgress data
urgentboolFalseDeliver with system priority

send_task_stream_start

async def send_task_stream_start(self, data: Optional[dict] = None) -> None
Begin streaming task results back to the requester.
ParameterTypeDefaultDescription
dataOptional[dict]NoneOptional metadata about the stream

send_task_stream_data

async def send_task_stream_data(self, data: Optional[dict] = None) -> None
Send a streaming chunk to the requester.
ParameterTypeDefaultDescription
dataOptional[dict]NoneThe chunk payload

send_task_stream_end

async def send_task_stream_end(self, data: Optional[dict] = None) -> None
End the current stream and mark this agent’s task as complete.
ParameterTypeDefaultDescription
dataOptional[dict]NoneOptional final metadata

Task Hooks

Override these methods to handle task events. Always call super() when overriding.

on_task_request

async def on_task_request(self, message: BusTaskRequestMessage) -> None
Called when this agent receives a task request. Override to perform work. Use send_task_update() to report progress and send_task_response() to return results.

on_task_response

async def on_task_response(self, message: BusTaskResponseMessage) -> None
Called when a task agent sends a response. Override to process individual results as they arrive.

on_task_update

async def on_task_update(self, message: BusTaskUpdateMessage) -> None
Called when a task agent sends a progress update.

on_task_update_requested

async def on_task_update_requested(self, message: BusTaskUpdateRequestMessage) -> None
Called when the requester asks for a progress update. Override to send back a progress update via send_task_update().

on_task_completed

async def on_task_completed(self, result: TaskGroupResponse) -> None
Called when all agents in a task group have responded.

on_task_error

async def on_task_error(self, message: BusTaskResponseMessage) -> None
Called when a task group is cancelled due to a worker error. Fires when a worker responds with ERROR or FAILED status and cancel_on_error is set.

on_task_stream_start

async def on_task_stream_start(self, message: BusTaskStreamStartMessage) -> None
Called when a task agent begins streaming.

on_task_stream_data

async def on_task_stream_data(self, message: BusTaskStreamDataMessage) -> None
Called for each streaming chunk from a task agent.

on_task_stream_end

async def on_task_stream_end(self, message: BusTaskStreamEndMessage) -> None
Called when a task agent finishes streaming.

on_task_cancelled

async def on_task_cancelled(self, message: BusTaskCancelMessage) -> None
Called when this agent’s task is cancelled by the requester. Override to clean up resources or stop in-progress work.

Pipeline

build_pipeline

async def build_pipeline(self) -> Pipeline
Return this agent’s pipeline. Override to define a processing pipeline. The default returns a no-op pipeline for agents that operate purely through bus messages.

create_pipeline

async def create_pipeline(self, user_pipeline: Pipeline) -> Pipeline
Assemble the final pipeline from the user pipeline. When bridged, wraps the pipeline with bus edge processors.
ParameterTypeDescription
user_pipelinePipelineThe pipeline returned by build_pipeline()

build_pipeline_task

def build_pipeline_task(self, pipeline: Pipeline) -> PipelineTask
Create the PipelineTask for this agent’s pipeline. Override to customize task parameters (e.g. enable interruptions).
ParameterTypeDescription
pipelinePipelineThe fully assembled pipeline

queue_frame

async def queue_frame(
    self,
    frame: Frame,
    direction: FrameDirection = FrameDirection.DOWNSTREAM,
) -> None
Queue a frame into this agent’s pipeline task.
ParameterTypeDefaultDescription
frameFrameThe frame to queue
directionFrameDirectionDOWNSTREAMDirection the frame should travel

queue_frames

async def queue_frames(
    self,
    frames,
    direction: FrameDirection = FrameDirection.DOWNSTREAM,
) -> None
Queue multiple frames into this agent’s pipeline task.
ParameterTypeDefaultDescription
frameslist[Frame]The frames to queue
directionFrameDirectionDOWNSTREAMDirection the frames should travel

Bus

send_message

async def send_message(self, message: BusMessage) -> None
Send a message on the bus.
ParameterTypeDescription
messageBusMessageThe bus message to send

send_error

async def send_error(self, error: str) -> None
Report an error on the bus. Child agents send a local-only message to the parent. Root agents broadcast over the network.
ParameterTypeDescription
errorstrDescription of the error

on_bus_message

async def on_bus_message(self, message: BusMessage) -> None
Called for every bus message after built-in lifecycle handling. Override to handle custom message types.
ParameterTypeDescription
messageBusMessageThe bus message to process