> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# BaseWorker

> Core agent class for the multi-agent framework

## Overview

`BaseWorker` is the abstract base class that all agents inherit from. It handles agent lifecycle, parent-child relationships, bus communication, and job coordination.

A `BaseWorker` connects to a [`WorkerBus`](/api-reference/server/bus/bus#workerbus), registers itself in the shared registry, accepts activation/deactivation, and exchanges job requests/responses with other agents. Agents that need to run a Pipecat pipeline use a concrete subclass like [`LLMWorker`](/api-reference/server/workers/llm-worker), while `BaseWorker` itself operates purely through bus messages (e.g. coordinators, orchestrators).

```python theme={null}
from pipecat.pipeline.base_worker import BaseWorker, WorkerActivationArgs
```

## Configuration

<ParamField path="name" type="str | None" default="None">
  Unique name for this agent, used for bus message routing and registry lookup.
  If `None`, an auto-generated name is used (useful for instances that don't
  participate in inter-agent communication).
</ParamField>

<ParamField path="active" type="bool" default="True">
  Whether the agent starts active. When `False`, the agent waits for an
  `activate_worker()` call before `on_activated` fires.
</ParamField>

<Note>
  The bus is not passed in the constructor. It is provided by the runner when
  the agent is registered with
  [`WorkerRunner.add_workers()`](/api-reference/server/workers/runner#add_workers),
  which calls `attach()` internally.
</Note>

## Properties

### bus

```python theme={null}
agent.bus -> WorkerBus
```

The bus this agent is attached to. Raises `RuntimeError` if accessed before `attach()` has been called.

### active

```python theme={null}
agent.active -> bool
```

Whether this agent is currently active.

### activation\_args

```python theme={null}
agent.activation_args -> dict | None
```

The arguments from the most recent activation, or `None` if the agent is inactive. The value is cleared when the agent is deactivated.

### parent

```python theme={null}
agent.parent -> str | None
```

The name of the parent agent, or `None` if this is a root agent.

### registry

```python theme={null}
agent.registry -> WorkerRegistry
```

The shared agent registry this agent is attached to. Raises `RuntimeError` if accessed before `attach()` has been called.

### started\_at

```python theme={null}
agent.started_at -> float | None
```

Unix timestamp when this agent became ready, or `None` if not yet started.

### bridged

```python theme={null}
agent.bridged -> bool
```

Whether this agent is bridged onto the bus (receives pipeline frames from the bus). Always `False` on `BaseWorker`; subclasses such as `PipelineWorker` override it.

### children

```python theme={null}
agent.children -> list[BaseWorker]
```

The list of child agents added via `add_workers()`.

### active\_jobs

```python theme={null}
agent.active_jobs -> dict[str, BusJobRequestMessage]
```

Active job requests this agent is currently working on, keyed by `job_id`.

### job\_groups

```python theme={null}
agent.job_groups -> dict[str, JobGroup]
```

Active job groups launched by this agent, keyed by `job_id`.

## Lifecycle Hooks

Override these methods to react to lifecycle events. Always call `super()` when overriding.

### on\_activated

```python theme={null}
async def on_activated(self, args: dict | None) -> None
```

Called when this agent is activated. Override to react to activation.

| Parameter | Type           | Description                        |
| --------- | -------------- | ---------------------------------- |
| `args`    | `dict \| None` | Optional arguments from the caller |

### on\_deactivated

```python theme={null}
async def on_deactivated(self) -> None
```

Called when this agent is deactivated.

### on\_worker\_ready

```python theme={null}
async def on_worker_ready(self, data: WorkerReadyData) -> None
```

Called when another agent is ready to receive messages. For local root agents this fires automatically. For remote agents it fires only for agents watched via `watch_workers()`. For child agents it fires only on the parent.

| Parameter | Type                                                                     | Description                       |
| --------- | ------------------------------------------------------------------------ | --------------------------------- |
| `data`    | [`WorkerReadyData`](/api-reference/server/workers/types#workerreadydata) | Information about the ready agent |

### on\_worker\_failed

```python theme={null}
async def on_worker_failed(self, data: WorkerErrorData) -> None
```

Called when a child agent reports an error.

| Parameter | Type                                                                     | Description                 |
| --------- | ------------------------------------------------------------------------ | --------------------------- |
| `data`    | [`WorkerErrorData`](/api-reference/server/workers/types#workererrordata) | Information about the error |

## Agent Management

### add\_workers

```python theme={null}
async def add_workers(self, *workers: BaseWorker, watch: bool = True) -> None
```

Register one or more child agents under this parent. Each child's lifecycle (end, cancel) is automatically managed by this parent agent. By default the children are also watched, so the parent receives `on_worker_ready` when each one starts.

| Parameter  | Type         | Default | Description                                                   |
| ---------- | ------------ | ------- | ------------------------------------------------------------- |
| `*workers` | `BaseWorker` |         | One or more child agent instances to add                      |
| `watch`    | `bool`       | `True`  | Whether to watch each newly added child for `on_worker_ready` |

### activate\_worker

```python theme={null}
async def activate_worker(
    self,
    worker_name: str,
    *,
    args: WorkerActivationArgs | None = None,
    deactivate_self: bool = False,
) -> None
```

Activate an agent by name. The target agent's `on_activated` hook will be called with the provided arguments. To hand off (deactivate this agent and activate the target), pass `deactivate_self=True`.

| Parameter         | Type                           | Default | Description                                                   |
| ----------------- | ------------------------------ | ------- | ------------------------------------------------------------- |
| `worker_name`     | `str`                          |         | Name of the agent to activate                                 |
| `args`            | `WorkerActivationArgs \| None` | `None`  | Arguments forwarded to `on_activated`                         |
| `deactivate_self` | `bool`                         | `False` | Whether to deactivate this agent before activating the target |

### deactivate\_worker

```python theme={null}
async def deactivate_worker(self, worker_name: str) -> None
```

Deactivate an agent by name. The target agent's `on_deactivated` hook will be called.

| Parameter     | Type  | Description                     |
| ------------- | ----- | ------------------------------- |
| `worker_name` | `str` | Name of the agent to deactivate |

### watch\_workers

```python theme={null}
async def watch_workers(self, *worker_names: str) -> None
```

Request notification when one or more agents register. For each name: if the agent is already registered, `on_worker_ready` fires immediately. Otherwise it fires when the agent eventually registers.

| Parameter       | Type  | Description                      |
| --------------- | ----- | -------------------------------- |
| `*worker_names` | `str` | Names of the agents to watch for |

### end

```python theme={null}
async def end(self, *, reason: str | None = None) -> None
```

Request a graceful end of the session.

| Parameter | Type          | Default | Description                      |
| --------- | ------------- | ------- | -------------------------------- |
| `reason`  | `str \| None` | `None`  | Human-readable reason for ending |

### cancel

```python theme={null}
async def cancel(self, *, reason: str | None = None) -> None
```

Request an immediate cancellation of all agents.

| Parameter | Type          | Default | Description                          |
| --------- | ------------- | ------- | ------------------------------------ |
| `reason`  | `str \| None` | `None`  | Human-readable reason for cancelling |

### wait

```python theme={null}
async def wait(self) -> None
```

Wait for this agent to finish.

## Job Coordination

### request\_job

```python theme={null}
async def request_job(
    self,
    worker_name: str,
    *,
    name: str | None = None,
    payload: dict | None = None,
    timeout: float | None = None,
) -> str
```

Send a job request to a single agent (fire-and-forget). Waits for the agent to be ready before sending the request. Does not wait for the job to complete; use callbacks (`on_job_response`, `on_job_completed`) or `job()` for that.

| Parameter     | Type            | Default | Description                                          |
| ------------- | --------------- | ------- | ---------------------------------------------------- |
| `worker_name` | `str`           |         | Name of the agent to send the job to                 |
| `name`        | `str \| None`   | `None`  | Job name for routing to a named `@job` handler       |
| `payload`     | `dict \| None`  | `None`  | Structured data describing the work                  |
| `timeout`     | `float \| None` | `None`  | Timeout in seconds; auto-cancels after this duration |

**Returns:** The generated `job_id`.

### job

```python theme={null}
def job(
    self,
    worker_name: str,
    *,
    name: str | None = None,
    payload: dict | None = None,
    timeout: float | None = None,
) -> JobContext
```

Create a single-agent job context manager. Waits for the agent to be ready, sends a job request, and waits for the response on exit. Supports `async for` inside the block to receive intermediate events.

| Parameter     | Type            | Default | Description                                    |
| ------------- | --------------- | ------- | ---------------------------------------------- |
| `worker_name` | `str`           |         | Name of the agent to send the job to           |
| `name`        | `str \| None`   | `None`  | Job name for routing to a named `@job` handler |
| `payload`     | `dict \| None`  | `None`  | Structured data describing the work            |
| `timeout`     | `float \| None` | `None`  | Timeout in seconds                             |

**Returns:** A [`JobContext`](/api-reference/server/workers/types#jobcontext) to use with `async with`.

```python theme={null}
async with self.job("worker", payload=data) as j:
    async for event in j:
        if event.type == JobEvent.UPDATE:
            print(event.data)

print(j.response)
```

### request\_job\_group

```python theme={null}
async def request_job_group(
    self,
    *worker_names: str,
    name: str | None = None,
    payload: dict | None = None,
    timeout: float | None = None,
    cancel_on_error: bool = True,
) -> str
```

Send a job request to multiple agents (fire-and-forget). Waits for all agents to be ready before sending requests.

| Parameter         | Type            | Default | Description                                    |
| ----------------- | --------------- | ------- | ---------------------------------------------- |
| `*worker_names`   | `str`           |         | Names of the agents to send the job to         |
| `name`            | `str \| None`   | `None`  | Job name for routing to named `@job` handlers  |
| `payload`         | `dict \| None`  | `None`  | Structured data describing the work            |
| `timeout`         | `float \| None` | `None`  | Timeout in seconds                             |
| `cancel_on_error` | `bool`          | `True`  | Whether to cancel the group if a worker errors |

**Returns:** The generated `job_id` shared by all agents in the group.

### job\_group

```python theme={null}
def job_group(
    self,
    *worker_names: str,
    name: str | None = None,
    payload: dict | None = None,
    timeout: float | None = None,
    cancel_on_error: bool = True,
) -> JobGroupContext
```

Create a job group context manager. Waits for agents to be ready, sends job requests, and waits for all responses on exit. Supports `async for` inside the block to receive intermediate events.

| Parameter         | Type            | Default | Description                                    |
| ----------------- | --------------- | ------- | ---------------------------------------------- |
| `*worker_names`   | `str`           |         | Names of the agents to send the job to         |
| `name`            | `str \| None`   | `None`  | Job name for routing to named `@job` handlers  |
| `payload`         | `dict \| None`  | `None`  | Structured data describing the work            |
| `timeout`         | `float \| None` | `None`  | Timeout in seconds                             |
| `cancel_on_error` | `bool`          | `True`  | Whether to cancel the group if a worker errors |

**Returns:** A [`JobGroupContext`](/api-reference/server/workers/types#jobgroupcontext) to use with `async with`.

```python theme={null}
async with self.job_group("w1", "w2", payload=data) as jg:
    async for event in jg:
        if event.type == JobGroupEvent.UPDATE:
            print(f"{event.worker_name}: {event.data}")

for name, result in jg.responses.items():
    print(name, result)
```

### cancel\_job\_group

```python theme={null}
async def cancel_job_group(self, job_id: str, *, reason: str | None = None) -> None
```

Cancel a running job group.

| Parameter | Type          | Default | Description                            |
| --------- | ------------- | ------- | -------------------------------------- |
| `job_id`  | `str`         |         | The job identifier to cancel           |
| `reason`  | `str \| None` | `None`  | Human-readable reason for cancellation |

### request\_job\_update

```python theme={null}
async def request_job_update(self, job_id: str, worker_name: str) -> None
```

Request a progress update from a worker agent.

| Parameter     | Type  | Description                                 |
| ------------- | ----- | ------------------------------------------- |
| `job_id`      | `str` | The job identifier                          |
| `worker_name` | `str` | Name of the agent to request an update from |

### send\_job\_response

```python theme={null}
async def send_job_response(
    self,
    job_id: str,
    response: dict | None = None,
    *,
    status: JobStatus = JobStatus.COMPLETED,
    urgent: bool = False,
) -> None
```

Send a job response back to the requester. After sending, the job is removed from the set of active jobs.

| Parameter  | Type                                                         | Default     | Description                  |
| ---------- | ------------------------------------------------------------ | ----------- | ---------------------------- |
| `job_id`   | `str`                                                        |             | The job being responded to   |
| `response` | `dict \| None`                                               | `None`      | Result data                  |
| `status`   | [`JobStatus`](/api-reference/server/workers/types#jobstatus) | `COMPLETED` | Completion status            |
| `urgent`   | `bool`                                                       | `False`     | Deliver with system priority |

### send\_job\_update

```python theme={null}
async def send_job_update(
    self,
    job_id: str,
    update: dict | None = None,
    *,
    urgent: bool = False,
) -> None
```

Send a progress update to the requester.

| Parameter | Type           | Default | Description                  |
| --------- | -------------- | ------- | ---------------------------- |
| `job_id`  | `str`          |         | The job being updated        |
| `update`  | `dict \| None` | `None`  | Progress data                |
| `urgent`  | `bool`         | `False` | Deliver with system priority |

### send\_job\_stream\_start

```python theme={null}
async def send_job_stream_start(self, job_id: str, data: dict | None = None) -> None
```

Begin streaming job results back to the requester.

| Parameter | Type           | Default | Description                        |
| --------- | -------------- | ------- | ---------------------------------- |
| `job_id`  | `str`          |         | The job being streamed             |
| `data`    | `dict \| None` | `None`  | Optional metadata about the stream |

### send\_job\_stream\_data

```python theme={null}
async def send_job_stream_data(self, job_id: str, data: dict | None = None) -> None
```

Send a streaming chunk to the requester.

| Parameter | Type           | Default | Description            |
| --------- | -------------- | ------- | ---------------------- |
| `job_id`  | `str`          |         | The job being streamed |
| `data`    | `dict \| None` | `None`  | The chunk payload      |

### send\_job\_stream\_end

```python theme={null}
async def send_job_stream_end(self, job_id: str, data: dict | None = None) -> None
```

End the current stream and mark this agent's job as complete.

| Parameter | Type           | Default | Description             |
| --------- | -------------- | ------- | ----------------------- |
| `job_id`  | `str`          |         | The job being streamed  |
| `data`    | `dict \| None` | `None`  | Optional final metadata |

### create\_job\_group\_and\_request\_job

```python theme={null}
async def create_job_group_and_request_job(
    self,
    worker_names: list[str],
    *,
    name: str | None = None,
    payload: dict | None = None,
    timeout: float | None = None,
    cancel_on_error: bool = True,
) -> JobGroup
```

Wait for agents to be ready, create a job group, and send requests. Does not wait for the group to complete; call `group.wait()` or use `job_group()` for that. Used internally by `job()` and `job_group()`.

**Returns:** The created [`JobGroup`](/api-reference/server/workers/types#jobgroup).

## Job Hooks

Override these methods to handle job events. Always call `super()` when overriding.

### on\_job\_request

```python theme={null}
async def on_job_request(self, message: BusJobRequestMessage) -> None
```

Called when this agent receives a job request that does not match a named `@job` handler. Override to perform work. Use `send_job_update()` to report progress and `send_job_response()` to return results.

### on\_job\_response

```python theme={null}
async def on_job_response(self, message: BusJobResponseMessage) -> None
```

Called when a worker agent sends a response. Override to process individual results as they arrive.

### on\_job\_update

```python theme={null}
async def on_job_update(self, message: BusJobUpdateMessage) -> None
```

Called when a worker agent sends a progress update.

### on\_job\_update\_requested

```python theme={null}
async def on_job_update_requested(self, message: BusJobUpdateRequestMessage) -> None
```

Called when the requester asks for a progress update. Override to send back a progress update via `send_job_update()`.

### on\_job\_completed

```python theme={null}
async def on_job_completed(self, result: JobGroupResponse) -> None
```

Called when all agents in a job group have responded.

### on\_job\_error

```python theme={null}
async def on_job_error(self, message: BusJobResponseMessage) -> None
```

Called when a job group is cancelled due to a worker error. Fires when a worker responds with `ERROR` or `FAILED` status and `cancel_on_error` is set.

### on\_job\_stream\_start

```python theme={null}
async def on_job_stream_start(self, message: BusJobStreamStartMessage) -> None
```

Called when a worker agent begins streaming.

### on\_job\_stream\_data

```python theme={null}
async def on_job_stream_data(self, message: BusJobStreamDataMessage) -> None
```

Called for each streaming chunk from a worker agent.

### on\_job\_stream\_end

```python theme={null}
async def on_job_stream_end(self, message: BusJobStreamEndMessage) -> None
```

Called when a worker agent finishes streaming.

### on\_job\_cancelled

```python theme={null}
async def on_job_cancelled(self, message: BusJobCancelMessage) -> None
```

Called when this agent's job is cancelled by the requester. Override to clean up resources or stop in-progress work.

## Bus

### send\_bus\_message

```python theme={null}
async def send_bus_message(self, message: BusMessage) -> None
```

Send a message on the bus.

| Parameter | Type                                               | Description             |
| --------- | -------------------------------------------------- | ----------------------- |
| `message` | [`BusMessage`](/api-reference/server/bus/messages) | The bus message to send |

### send\_bus\_error\_message

```python theme={null}
async def send_bus_error_message(self, error: str) -> None
```

Report an error on the bus. Child agents send a local-only message to the parent. Root agents broadcast over the network.

| Parameter | Type  | Description              |
| --------- | ----- | ------------------------ |
| `error`   | `str` | Description of the error |

### on\_bus\_message

```python theme={null}
async def on_bus_message(self, message: BusMessage) -> None
```

Called for every bus message after built-in lifecycle handling. Override to handle custom message types.

| Parameter | Type                                               | Description                |
| --------- | -------------------------------------------------- | -------------------------- |
| `message` | [`BusMessage`](/api-reference/server/bus/messages) | The bus message to process |

## Decorators

### @job

Mark an agent method as a job handler. Decorated methods are automatically collected at initialization and dispatched when matching job requests arrive. Each request runs in its own asyncio task so the bus message loop is never blocked.

```python theme={null}
from pipecat.pipeline.job_decorator import job
```

The decorator requires a `name` argument:

```python theme={null}
@job(name="research")
async def on_research(self, message):
    result = await do_research(message.payload)
    await self.send_job_response(message.job_id, result)


@job(name="write", sequential=True)
async def on_write(self, message):
    result = await do_write(message.payload)
    await self.send_job_response(message.job_id, result)
```

<ParamField path="name" type="str" required>
  Job name to match. The handler only receives requests with a matching name.
</ParamField>

<ParamField path="sequential" type="bool" default="False">
  When `True`, requests with this name run one at a time in FIFO order. When
  `False` (the default), multiple requests run concurrently. The wait time
  counts against the requester's timeout.
</ParamField>

Job handler methods receive a [`BusJobRequestMessage`](/api-reference/server/bus/messages#busjobrequestmessage).

### @worker\_ready

Mark a method as a handler for a specific agent becoming ready. Decorated methods are collected at initialization; when the agent starts, it calls `watch_workers` for each handler, and the method is called when the watched agent registers.

```python theme={null}
from pipecat.pipeline.worker_ready_decorator import worker_ready
```

```python theme={null}
@worker_ready(name="greeter")
async def on_greeter_ready(self, data: WorkerReadyData) -> None:
    await self.activate_worker("greeter", args=...)
```

<ParamField path="name" type="str" required>
  The name of the agent to watch.
</ParamField>

The handler receives a [`WorkerReadyData`](/api-reference/server/workers/types#workerreadydata) instance.
