Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt

Use this file to discover all available pages before exploring further.

Overview

BaseWorker is the abstract base class that all agents inherit from. It handles agent lifecycle, parent-child relationships, bus communication, and job coordination. A BaseWorker connects to a WorkerBus, registers itself in the shared registry, accepts activation/deactivation, and exchanges job requests/responses with other agents. Agents that need to run a Pipecat pipeline use a concrete subclass like LLMWorker, while BaseWorker itself operates purely through bus messages (e.g. coordinators, orchestrators).
from pipecat.pipeline.base_worker import BaseWorker, WorkerActivationArgs

Configuration

name
str | None
default:"None"
Unique name for this agent, used for bus message routing and registry lookup. If None, an auto-generated name is used (useful for instances that don’t participate in inter-agent communication).
active
bool
default:"True"
Whether the agent starts active. When False, the agent waits for an activate_worker() call before on_activated fires.
The bus is not passed in the constructor. It is provided by the runner when the agent is registered with WorkerRunner.add_workers(), which calls attach() internally.

Properties

bus

agent.bus -> WorkerBus
The bus this agent is attached to. Raises RuntimeError if accessed before attach() has been called.

active

agent.active -> bool
Whether this agent is currently active.

activation_args

agent.activation_args -> dict | None
The arguments from the most recent activation, or None if the agent is inactive. The value is cleared when the agent is deactivated.

parent

agent.parent -> str | None
The name of the parent agent, or None if this is a root agent.

registry

agent.registry -> WorkerRegistry
The shared agent registry this agent is attached to. Raises RuntimeError if accessed before attach() has been called.

started_at

agent.started_at -> float | None
Unix timestamp when this agent became ready, or None if not yet started.

bridged

agent.bridged -> bool
Whether this agent is bridged onto the bus (receives pipeline frames from the bus). Always False on BaseWorker; subclasses such as PipelineWorker override it.

children

agent.children -> list[BaseWorker]
The list of child agents added via add_workers().

active_jobs

agent.active_jobs -> dict[str, BusJobRequestMessage]
Active job requests this agent is currently working on, keyed by job_id.

job_groups

agent.job_groups -> dict[str, JobGroup]
Active job groups launched by this agent, keyed by job_id.

Lifecycle Hooks

Override these methods to react to lifecycle events. Always call super() when overriding.

on_activated

async def on_activated(self, args: dict | None) -> None
Called when this agent is activated. Override to react to activation.
ParameterTypeDescription
argsdict | NoneOptional arguments from the caller

on_deactivated

async def on_deactivated(self) -> None
Called when this agent is deactivated.

on_worker_ready

async def on_worker_ready(self, data: WorkerReadyData) -> None
Called when another agent is ready to receive messages. For local root agents this fires automatically. For remote agents it fires only for agents watched via watch_workers(). For child agents it fires only on the parent.
ParameterTypeDescription
dataWorkerReadyDataInformation about the ready agent

on_worker_failed

async def on_worker_failed(self, data: WorkerErrorData) -> None
Called when a child agent reports an error.
ParameterTypeDescription
dataWorkerErrorDataInformation about the error

Agent Management

add_workers

async def add_workers(self, *workers: BaseWorker, watch: bool = True) -> None
Register one or more child agents under this parent. Each child’s lifecycle (end, cancel) is automatically managed by this parent agent. By default the children are also watched, so the parent receives on_worker_ready when each one starts.
ParameterTypeDefaultDescription
*workersBaseWorkerOne or more child agent instances to add
watchboolTrueWhether to watch each newly added child for on_worker_ready

activate_worker

async def activate_worker(
    self,
    worker_name: str,
    *,
    args: WorkerActivationArgs | None = None,
    deactivate_self: bool = False,
) -> None
Activate an agent by name. The target agent’s on_activated hook will be called with the provided arguments. To hand off (deactivate this agent and activate the target), pass deactivate_self=True.
ParameterTypeDefaultDescription
worker_namestrName of the agent to activate
argsWorkerActivationArgs | NoneNoneArguments forwarded to on_activated
deactivate_selfboolFalseWhether to deactivate this agent before activating the target

deactivate_worker

async def deactivate_worker(self, worker_name: str) -> None
Deactivate an agent by name. The target agent’s on_deactivated hook will be called.
ParameterTypeDescription
worker_namestrName of the agent to deactivate

watch_workers

async def watch_workers(self, *worker_names: str) -> None
Request notification when one or more agents register. For each name: if the agent is already registered, on_worker_ready fires immediately. Otherwise it fires when the agent eventually registers.
ParameterTypeDescription
*worker_namesstrNames of the agents to watch for

end

async def end(self, *, reason: str | None = None) -> None
Request a graceful end of the session.
ParameterTypeDefaultDescription
reasonstr | NoneNoneHuman-readable reason for ending

cancel

async def cancel(self, *, reason: str | None = None) -> None
Request an immediate cancellation of all agents.
ParameterTypeDefaultDescription
reasonstr | NoneNoneHuman-readable reason for cancelling

wait

async def wait(self) -> None
Wait for this agent to finish.

Job Coordination

request_job

async def request_job(
    self,
    worker_name: str,
    *,
    name: str | None = None,
    payload: dict | None = None,
    timeout: float | None = None,
) -> str
Send a job request to a single agent (fire-and-forget). Waits for the agent to be ready before sending the request. Does not wait for the job to complete; use callbacks (on_job_response, on_job_completed) or job() for that.
ParameterTypeDefaultDescription
worker_namestrName of the agent to send the job to
namestr | NoneNoneJob name for routing to a named @job handler
payloaddict | NoneNoneStructured data describing the work
timeoutfloat | NoneNoneTimeout in seconds; auto-cancels after this duration
Returns: The generated job_id.

job

def job(
    self,
    worker_name: str,
    *,
    name: str | None = None,
    payload: dict | None = None,
    timeout: float | None = None,
) -> JobContext
Create a single-agent job context manager. Waits for the agent to be ready, sends a job request, and waits for the response on exit. Supports async for inside the block to receive intermediate events.
ParameterTypeDefaultDescription
worker_namestrName of the agent to send the job to
namestr | NoneNoneJob name for routing to a named @job handler
payloaddict | NoneNoneStructured data describing the work
timeoutfloat | NoneNoneTimeout in seconds
Returns: A JobContext to use with async with.
async with self.job("worker", payload=data) as j:
    async for event in j:
        if event.type == JobEvent.UPDATE:
            print(event.data)

print(j.response)

request_job_group

async def request_job_group(
    self,
    *worker_names: str,
    name: str | None = None,
    payload: dict | None = None,
    timeout: float | None = None,
    cancel_on_error: bool = True,
) -> str
Send a job request to multiple agents (fire-and-forget). Waits for all agents to be ready before sending requests.
ParameterTypeDefaultDescription
*worker_namesstrNames of the agents to send the job to
namestr | NoneNoneJob name for routing to named @job handlers
payloaddict | NoneNoneStructured data describing the work
timeoutfloat | NoneNoneTimeout in seconds
cancel_on_errorboolTrueWhether to cancel the group if a worker errors
Returns: The generated job_id shared by all agents in the group.

job_group

def job_group(
    self,
    *worker_names: str,
    name: str | None = None,
    payload: dict | None = None,
    timeout: float | None = None,
    cancel_on_error: bool = True,
) -> JobGroupContext
Create a job group context manager. Waits for agents to be ready, sends job requests, and waits for all responses on exit. Supports async for inside the block to receive intermediate events.
ParameterTypeDefaultDescription
*worker_namesstrNames of the agents to send the job to
namestr | NoneNoneJob name for routing to named @job handlers
payloaddict | NoneNoneStructured data describing the work
timeoutfloat | NoneNoneTimeout in seconds
cancel_on_errorboolTrueWhether to cancel the group if a worker errors
Returns: A JobGroupContext to use with async with.
async with self.job_group("w1", "w2", payload=data) as jg:
    async for event in jg:
        if event.type == JobGroupEvent.UPDATE:
            print(f"{event.worker_name}: {event.data}")

for name, result in jg.responses.items():
    print(name, result)

cancel_job_group

async def cancel_job_group(self, job_id: str, *, reason: str | None = None) -> None
Cancel a running job group.
ParameterTypeDefaultDescription
job_idstrThe job identifier to cancel
reasonstr | NoneNoneHuman-readable reason for cancellation

request_job_update

async def request_job_update(self, job_id: str, worker_name: str) -> None
Request a progress update from a worker agent.
ParameterTypeDescription
job_idstrThe job identifier
worker_namestrName of the agent to request an update from

send_job_response

async def send_job_response(
    self,
    job_id: str,
    response: dict | None = None,
    *,
    status: JobStatus = JobStatus.COMPLETED,
    urgent: bool = False,
) -> None
Send a job response back to the requester. After sending, the job is removed from the set of active jobs.
ParameterTypeDefaultDescription
job_idstrThe job being responded to
responsedict | NoneNoneResult data
statusJobStatusCOMPLETEDCompletion status
urgentboolFalseDeliver with system priority

send_job_update

async def send_job_update(
    self,
    job_id: str,
    update: dict | None = None,
    *,
    urgent: bool = False,
) -> None
Send a progress update to the requester.
ParameterTypeDefaultDescription
job_idstrThe job being updated
updatedict | NoneNoneProgress data
urgentboolFalseDeliver with system priority

send_job_stream_start

async def send_job_stream_start(self, job_id: str, data: dict | None = None) -> None
Begin streaming job results back to the requester.
ParameterTypeDefaultDescription
job_idstrThe job being streamed
datadict | NoneNoneOptional metadata about the stream

send_job_stream_data

async def send_job_stream_data(self, job_id: str, data: dict | None = None) -> None
Send a streaming chunk to the requester.
ParameterTypeDefaultDescription
job_idstrThe job being streamed
datadict | NoneNoneThe chunk payload

send_job_stream_end

async def send_job_stream_end(self, job_id: str, data: dict | None = None) -> None
End the current stream and mark this agent’s job as complete.
ParameterTypeDefaultDescription
job_idstrThe job being streamed
datadict | NoneNoneOptional final metadata

create_job_group_and_request_job

async def create_job_group_and_request_job(
    self,
    worker_names: list[str],
    *,
    name: str | None = None,
    payload: dict | None = None,
    timeout: float | None = None,
    cancel_on_error: bool = True,
) -> JobGroup
Wait for agents to be ready, create a job group, and send requests. Does not wait for the group to complete; call group.wait() or use job_group() for that. Used internally by job() and job_group(). Returns: The created JobGroup.

Job Hooks

Override these methods to handle job events. Always call super() when overriding.

on_job_request

async def on_job_request(self, message: BusJobRequestMessage) -> None
Called when this agent receives a job request that does not match a named @job handler. Override to perform work. Use send_job_update() to report progress and send_job_response() to return results.

on_job_response

async def on_job_response(self, message: BusJobResponseMessage) -> None
Called when a worker agent sends a response. Override to process individual results as they arrive.

on_job_update

async def on_job_update(self, message: BusJobUpdateMessage) -> None
Called when a worker agent sends a progress update.

on_job_update_requested

async def on_job_update_requested(self, message: BusJobUpdateRequestMessage) -> None
Called when the requester asks for a progress update. Override to send back a progress update via send_job_update().

on_job_completed

async def on_job_completed(self, result: JobGroupResponse) -> None
Called when all agents in a job group have responded.

on_job_error

async def on_job_error(self, message: BusJobResponseMessage) -> None
Called when a job group is cancelled due to a worker error. Fires when a worker responds with ERROR or FAILED status and cancel_on_error is set.

on_job_stream_start

async def on_job_stream_start(self, message: BusJobStreamStartMessage) -> None
Called when a worker agent begins streaming.

on_job_stream_data

async def on_job_stream_data(self, message: BusJobStreamDataMessage) -> None
Called for each streaming chunk from a worker agent.

on_job_stream_end

async def on_job_stream_end(self, message: BusJobStreamEndMessage) -> None
Called when a worker agent finishes streaming.

on_job_cancelled

async def on_job_cancelled(self, message: BusJobCancelMessage) -> None
Called when this agent’s job is cancelled by the requester. Override to clean up resources or stop in-progress work.

Bus

send_bus_message

async def send_bus_message(self, message: BusMessage) -> None
Send a message on the bus.
ParameterTypeDescription
messageBusMessageThe bus message to send

send_bus_error_message

async def send_bus_error_message(self, error: str) -> None
Report an error on the bus. Child agents send a local-only message to the parent. Root agents broadcast over the network.
ParameterTypeDescription
errorstrDescription of the error

on_bus_message

async def on_bus_message(self, message: BusMessage) -> None
Called for every bus message after built-in lifecycle handling. Override to handle custom message types.
ParameterTypeDescription
messageBusMessageThe bus message to process

Decorators

@job

Mark an agent method as a job handler. Decorated methods are automatically collected at initialization and dispatched when matching job requests arrive. Each request runs in its own asyncio task so the bus message loop is never blocked.
from pipecat.pipeline.job_decorator import job
The decorator requires a name argument:
@job(name="research")
async def on_research(self, message):
    result = await do_research(message.payload)
    await self.send_job_response(message.job_id, result)


@job(name="write", sequential=True)
async def on_write(self, message):
    result = await do_write(message.payload)
    await self.send_job_response(message.job_id, result)
name
str
required
Job name to match. The handler only receives requests with a matching name.
sequential
bool
default:"False"
When True, requests with this name run one at a time in FIFO order. When False (the default), multiple requests run concurrently. The wait time counts against the requester’s timeout.
Job handler methods receive a BusJobRequestMessage.

@worker_ready

Mark a method as a handler for a specific agent becoming ready. Decorated methods are collected at initialization; when the agent starts, it calls watch_workers for each handler, and the method is called when the watched agent registers.
from pipecat.pipeline.worker_ready_decorator import worker_ready
@worker_ready(name="greeter")
async def on_greeter_ready(self, data: WorkerReadyData) -> None:
    await self.activate_worker("greeter", args=...)
name
str
required
The name of the agent to watch.
The handler receives a WorkerReadyData instance.