Documentation Index
Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
Use this file to discover all available pages before exploring further.
Overview
BaseWorker is the abstract base class that all agents inherit from. It handles agent lifecycle, parent-child relationships, bus communication, and job coordination.
A BaseWorker connects to a WorkerBus, registers itself in the shared registry, accepts activation/deactivation, and exchanges job requests/responses with other agents. Agents that need to run a Pipecat pipeline use a concrete subclass like LLMWorker, while BaseWorker itself operates purely through bus messages (e.g. coordinators, orchestrators).
from pipecat.pipeline.base_worker import BaseWorker, WorkerActivationArgs
Configuration
Unique name for this agent, used for bus message routing and registry lookup.
If None, an auto-generated name is used (useful for instances that don’t
participate in inter-agent communication).
Whether the agent starts active. When False, the agent waits for an
activate_worker() call before on_activated fires.
The bus is not passed in the constructor. It is provided by the runner when
the agent is registered with
WorkerRunner.add_workers(),
which calls attach() internally.
Properties
bus
The bus this agent is attached to. Raises RuntimeError if accessed before attach() has been called.
active
Whether this agent is currently active.
activation_args
agent.activation_args -> dict | None
The arguments from the most recent activation, or None if the agent is inactive. The value is cleared when the agent is deactivated.
parent
agent.parent -> str | None
The name of the parent agent, or None if this is a root agent.
registry
agent.registry -> WorkerRegistry
The shared agent registry this agent is attached to. Raises RuntimeError if accessed before attach() has been called.
started_at
agent.started_at -> float | None
Unix timestamp when this agent became ready, or None if not yet started.
bridged
Whether this agent is bridged onto the bus (receives pipeline frames from the bus). Always False on BaseWorker; subclasses such as PipelineWorker override it.
children
agent.children -> list[BaseWorker]
The list of child agents added via add_workers().
active_jobs
agent.active_jobs -> dict[str, BusJobRequestMessage]
Active job requests this agent is currently working on, keyed by job_id.
job_groups
agent.job_groups -> dict[str, JobGroup]
Active job groups launched by this agent, keyed by job_id.
Lifecycle Hooks
Override these methods to react to lifecycle events. Always call super() when overriding.
on_activated
async def on_activated(self, args: dict | None) -> None
Called when this agent is activated. Override to react to activation.
| Parameter | Type | Description |
|---|
args | dict | None | Optional arguments from the caller |
on_deactivated
async def on_deactivated(self) -> None
Called when this agent is deactivated.
on_worker_ready
async def on_worker_ready(self, data: WorkerReadyData) -> None
Called when another agent is ready to receive messages. For local root agents this fires automatically. For remote agents it fires only for agents watched via watch_workers(). For child agents it fires only on the parent.
| Parameter | Type | Description |
|---|
data | WorkerReadyData | Information about the ready agent |
on_worker_failed
async def on_worker_failed(self, data: WorkerErrorData) -> None
Called when a child agent reports an error.
| Parameter | Type | Description |
|---|
data | WorkerErrorData | Information about the error |
Agent Management
add_workers
async def add_workers(self, *workers: BaseWorker, watch: bool = True) -> None
Register one or more child agents under this parent. Each child’s lifecycle (end, cancel) is automatically managed by this parent agent. By default the children are also watched, so the parent receives on_worker_ready when each one starts.
| Parameter | Type | Default | Description |
|---|
*workers | BaseWorker | | One or more child agent instances to add |
watch | bool | True | Whether to watch each newly added child for on_worker_ready |
activate_worker
async def activate_worker(
self,
worker_name: str,
*,
args: WorkerActivationArgs | None = None,
deactivate_self: bool = False,
) -> None
Activate an agent by name. The target agent’s on_activated hook will be called with the provided arguments. To hand off (deactivate this agent and activate the target), pass deactivate_self=True.
| Parameter | Type | Default | Description |
|---|
worker_name | str | | Name of the agent to activate |
args | WorkerActivationArgs | None | None | Arguments forwarded to on_activated |
deactivate_self | bool | False | Whether to deactivate this agent before activating the target |
deactivate_worker
async def deactivate_worker(self, worker_name: str) -> None
Deactivate an agent by name. The target agent’s on_deactivated hook will be called.
| Parameter | Type | Description |
|---|
worker_name | str | Name of the agent to deactivate |
watch_workers
async def watch_workers(self, *worker_names: str) -> None
Request notification when one or more agents register. For each name: if the agent is already registered, on_worker_ready fires immediately. Otherwise it fires when the agent eventually registers.
| Parameter | Type | Description |
|---|
*worker_names | str | Names of the agents to watch for |
end
async def end(self, *, reason: str | None = None) -> None
Request a graceful end of the session.
| Parameter | Type | Default | Description |
|---|
reason | str | None | None | Human-readable reason for ending |
cancel
async def cancel(self, *, reason: str | None = None) -> None
Request an immediate cancellation of all agents.
| Parameter | Type | Default | Description |
|---|
reason | str | None | None | Human-readable reason for cancelling |
wait
async def wait(self) -> None
Wait for this agent to finish.
Job Coordination
request_job
async def request_job(
self,
worker_name: str,
*,
name: str | None = None,
payload: dict | None = None,
timeout: float | None = None,
) -> str
Send a job request to a single agent (fire-and-forget). Waits for the agent to be ready before sending the request. Does not wait for the job to complete; use callbacks (on_job_response, on_job_completed) or job() for that.
| Parameter | Type | Default | Description |
|---|
worker_name | str | | Name of the agent to send the job to |
name | str | None | None | Job name for routing to a named @job handler |
payload | dict | None | None | Structured data describing the work |
timeout | float | None | None | Timeout in seconds; auto-cancels after this duration |
Returns: The generated job_id.
job
def job(
self,
worker_name: str,
*,
name: str | None = None,
payload: dict | None = None,
timeout: float | None = None,
) -> JobContext
Create a single-agent job context manager. Waits for the agent to be ready, sends a job request, and waits for the response on exit. Supports async for inside the block to receive intermediate events.
| Parameter | Type | Default | Description |
|---|
worker_name | str | | Name of the agent to send the job to |
name | str | None | None | Job name for routing to a named @job handler |
payload | dict | None | None | Structured data describing the work |
timeout | float | None | None | Timeout in seconds |
Returns: A JobContext to use with async with.
async with self.job("worker", payload=data) as j:
async for event in j:
if event.type == JobEvent.UPDATE:
print(event.data)
print(j.response)
request_job_group
async def request_job_group(
self,
*worker_names: str,
name: str | None = None,
payload: dict | None = None,
timeout: float | None = None,
cancel_on_error: bool = True,
) -> str
Send a job request to multiple agents (fire-and-forget). Waits for all agents to be ready before sending requests.
| Parameter | Type | Default | Description |
|---|
*worker_names | str | | Names of the agents to send the job to |
name | str | None | None | Job name for routing to named @job handlers |
payload | dict | None | None | Structured data describing the work |
timeout | float | None | None | Timeout in seconds |
cancel_on_error | bool | True | Whether to cancel the group if a worker errors |
Returns: The generated job_id shared by all agents in the group.
job_group
def job_group(
self,
*worker_names: str,
name: str | None = None,
payload: dict | None = None,
timeout: float | None = None,
cancel_on_error: bool = True,
) -> JobGroupContext
Create a job group context manager. Waits for agents to be ready, sends job requests, and waits for all responses on exit. Supports async for inside the block to receive intermediate events.
| Parameter | Type | Default | Description |
|---|
*worker_names | str | | Names of the agents to send the job to |
name | str | None | None | Job name for routing to named @job handlers |
payload | dict | None | None | Structured data describing the work |
timeout | float | None | None | Timeout in seconds |
cancel_on_error | bool | True | Whether to cancel the group if a worker errors |
Returns: A JobGroupContext to use with async with.
async with self.job_group("w1", "w2", payload=data) as jg:
async for event in jg:
if event.type == JobGroupEvent.UPDATE:
print(f"{event.worker_name}: {event.data}")
for name, result in jg.responses.items():
print(name, result)
cancel_job_group
async def cancel_job_group(self, job_id: str, *, reason: str | None = None) -> None
Cancel a running job group.
| Parameter | Type | Default | Description |
|---|
job_id | str | | The job identifier to cancel |
reason | str | None | None | Human-readable reason for cancellation |
request_job_update
async def request_job_update(self, job_id: str, worker_name: str) -> None
Request a progress update from a worker agent.
| Parameter | Type | Description |
|---|
job_id | str | The job identifier |
worker_name | str | Name of the agent to request an update from |
send_job_response
async def send_job_response(
self,
job_id: str,
response: dict | None = None,
*,
status: JobStatus = JobStatus.COMPLETED,
urgent: bool = False,
) -> None
Send a job response back to the requester. After sending, the job is removed from the set of active jobs.
| Parameter | Type | Default | Description |
|---|
job_id | str | | The job being responded to |
response | dict | None | None | Result data |
status | JobStatus | COMPLETED | Completion status |
urgent | bool | False | Deliver with system priority |
send_job_update
async def send_job_update(
self,
job_id: str,
update: dict | None = None,
*,
urgent: bool = False,
) -> None
Send a progress update to the requester.
| Parameter | Type | Default | Description |
|---|
job_id | str | | The job being updated |
update | dict | None | None | Progress data |
urgent | bool | False | Deliver with system priority |
send_job_stream_start
async def send_job_stream_start(self, job_id: str, data: dict | None = None) -> None
Begin streaming job results back to the requester.
| Parameter | Type | Default | Description |
|---|
job_id | str | | The job being streamed |
data | dict | None | None | Optional metadata about the stream |
send_job_stream_data
async def send_job_stream_data(self, job_id: str, data: dict | None = None) -> None
Send a streaming chunk to the requester.
| Parameter | Type | Default | Description |
|---|
job_id | str | | The job being streamed |
data | dict | None | None | The chunk payload |
send_job_stream_end
async def send_job_stream_end(self, job_id: str, data: dict | None = None) -> None
End the current stream and mark this agent’s job as complete.
| Parameter | Type | Default | Description |
|---|
job_id | str | | The job being streamed |
data | dict | None | None | Optional final metadata |
create_job_group_and_request_job
async def create_job_group_and_request_job(
self,
worker_names: list[str],
*,
name: str | None = None,
payload: dict | None = None,
timeout: float | None = None,
cancel_on_error: bool = True,
) -> JobGroup
Wait for agents to be ready, create a job group, and send requests. Does not wait for the group to complete; call group.wait() or use job_group() for that. Used internally by job() and job_group().
Returns: The created JobGroup.
Job Hooks
Override these methods to handle job events. Always call super() when overriding.
on_job_request
async def on_job_request(self, message: BusJobRequestMessage) -> None
Called when this agent receives a job request that does not match a named @job handler. Override to perform work. Use send_job_update() to report progress and send_job_response() to return results.
on_job_response
async def on_job_response(self, message: BusJobResponseMessage) -> None
Called when a worker agent sends a response. Override to process individual results as they arrive.
on_job_update
async def on_job_update(self, message: BusJobUpdateMessage) -> None
Called when a worker agent sends a progress update.
on_job_update_requested
async def on_job_update_requested(self, message: BusJobUpdateRequestMessage) -> None
Called when the requester asks for a progress update. Override to send back a progress update via send_job_update().
on_job_completed
async def on_job_completed(self, result: JobGroupResponse) -> None
Called when all agents in a job group have responded.
on_job_error
async def on_job_error(self, message: BusJobResponseMessage) -> None
Called when a job group is cancelled due to a worker error. Fires when a worker responds with ERROR or FAILED status and cancel_on_error is set.
on_job_stream_start
async def on_job_stream_start(self, message: BusJobStreamStartMessage) -> None
Called when a worker agent begins streaming.
on_job_stream_data
async def on_job_stream_data(self, message: BusJobStreamDataMessage) -> None
Called for each streaming chunk from a worker agent.
on_job_stream_end
async def on_job_stream_end(self, message: BusJobStreamEndMessage) -> None
Called when a worker agent finishes streaming.
on_job_cancelled
async def on_job_cancelled(self, message: BusJobCancelMessage) -> None
Called when this agent’s job is cancelled by the requester. Override to clean up resources or stop in-progress work.
Bus
send_bus_message
async def send_bus_message(self, message: BusMessage) -> None
Send a message on the bus.
| Parameter | Type | Description |
|---|
message | BusMessage | The bus message to send |
send_bus_error_message
async def send_bus_error_message(self, error: str) -> None
Report an error on the bus. Child agents send a local-only message to the parent. Root agents broadcast over the network.
| Parameter | Type | Description |
|---|
error | str | Description of the error |
on_bus_message
async def on_bus_message(self, message: BusMessage) -> None
Called for every bus message after built-in lifecycle handling. Override to handle custom message types.
| Parameter | Type | Description |
|---|
message | BusMessage | The bus message to process |
Decorators
@job
Mark an agent method as a job handler. Decorated methods are automatically collected at initialization and dispatched when matching job requests arrive. Each request runs in its own asyncio task so the bus message loop is never blocked.
from pipecat.pipeline.job_decorator import job
The decorator requires a name argument:
@job(name="research")
async def on_research(self, message):
result = await do_research(message.payload)
await self.send_job_response(message.job_id, result)
@job(name="write", sequential=True)
async def on_write(self, message):
result = await do_write(message.payload)
await self.send_job_response(message.job_id, result)
Job name to match. The handler only receives requests with a matching name.
When True, requests with this name run one at a time in FIFO order. When
False (the default), multiple requests run concurrently. The wait time
counts against the requester’s timeout.
Job handler methods receive a BusJobRequestMessage.
@worker_ready
Mark a method as a handler for a specific agent becoming ready. Decorated methods are collected at initialization; when the agent starts, it calls watch_workers for each handler, and the method is called when the watched agent registers.
from pipecat.pipeline.worker_ready_decorator import worker_ready
@worker_ready(name="greeter")
async def on_greeter_ready(self, data: WorkerReadyData) -> None:
await self.activate_worker("greeter", args=...)
The name of the agent to watch.
The handler receives a WorkerReadyData instance.