> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Types

> Shared types, enums, and data classes

## JobStatus

```python theme={null}
from pipecat.pipeline.job_context import JobStatus
```

Status of a completed job. Inherits from `str` so values compare naturally with plain strings.

| Value                 | Description                                       |
| --------------------- | ------------------------------------------------- |
| `JobStatus.COMPLETED` | The job finished successfully                     |
| `JobStatus.CANCELLED` | The job was cancelled by the requester            |
| `JobStatus.FAILED`    | The job failed due to a logical or business error |
| `JobStatus.ERROR`     | The job encountered an unexpected runtime error   |

## WorkerActivationArgs

```python theme={null}
from pipecat.pipeline.base_worker import WorkerActivationArgs
```

Base activation arguments for any agent.

| Field      | Type           | Default | Description                                       |
| ---------- | -------------- | ------- | ------------------------------------------------- |
| `metadata` | `dict \| None` | `None`  | Optional structured data passed during activation |

### Methods

```python theme={null}
# Create from a dict (ignores unknown keys)
args = WorkerActivationArgs.from_dict({"metadata": {...}})

# Convert to a dict (excludes None values)
data = args.to_dict()
```

## LLMWorkerActivationArgs

```python theme={null}
from pipecat.workers.llm import LLMWorkerActivationArgs
```

Activation arguments for [`LLMWorker`](/api-reference/server/workers/llm-worker). Extends [`WorkerActivationArgs`](#workeractivationargs).

| Field      | Type           | Default | Description                                                                                |
| ---------- | -------------- | ------- | ------------------------------------------------------------------------------------------ |
| `metadata` | `dict \| None` | `None`  | Optional structured data passed during activation                                          |
| `messages` | `list \| None` | `None`  | LLM context messages to inject on activation                                               |
| `run_llm`  | `bool \| None` | `None`  | Whether to run the LLM after appending messages. Defaults to `True` when `messages` is set |

## JobContext

```python theme={null}
from pipecat.pipeline.job_context import JobContext
```

Async context manager and iterator for a single-agent job. Sends a job request on enter, waits for the response on exit. Supports `async for` to receive intermediate events.

On normal completion, the result is available via `response`. On worker error or timeout, raises [`JobError`](/api-reference/server/workers/exceptions#joberror).

### Properties

| Property   | Type   | Description                   |
| ---------- | ------ | ----------------------------- |
| `job_id`   | `str`  | The job identifier            |
| `response` | `dict` | The worker's response payload |

### Usage

```python theme={null}
async with self.job("worker", payload=data) as j:
    async for event in j:
        print(f"[{event.type}]: {event.data}")

print(j.response)
```

## JobGroupContext

```python theme={null}
from pipecat.pipeline.job_context import JobGroupContext
```

Async context manager and iterator for structured job group execution. Sends job requests on enter, waits for all responses on exit. Supports `async for` to receive intermediate events.

On normal completion, results are available via `responses`. On worker error (with `cancel_on_error=True`) or timeout, raises [`JobGroupError`](/api-reference/server/workers/exceptions#jobgrouperror).

### Properties

| Property    | Type              | Description                              |
| ----------- | ----------------- | ---------------------------------------- |
| `job_id`    | `str`             | The shared job identifier for this group |
| `responses` | `dict[str, dict]` | Collected responses keyed by worker name |

### Usage

```python theme={null}
async with self.job_group("w1", "w2", payload=data) as jg:
    async for event in jg:
        print(f"{event.worker_name} [{event.type}]: {event.data}")

for name, result in jg.responses.items():
    print(name, result)
```

## JobEvent

```python theme={null}
from pipecat.pipeline.job_context import JobEvent
```

An event received from a worker during a single-agent job. Yielded by `async for` over a [`JobContext`](#jobcontext).

| Field  | Type           | Description                          |
| ------ | -------------- | ------------------------------------ |
| `type` | `str`          | The event type (see constants below) |
| `data` | `dict \| None` | Optional event payload               |

### Event Type Constants

| Constant                | Value            | Description                   |
| ----------------------- | ---------------- | ----------------------------- |
| `JobEvent.UPDATE`       | `"update"`       | Progress update from a worker |
| `JobEvent.STREAM_START` | `"stream_start"` | Worker started streaming      |
| `JobEvent.STREAM_DATA`  | `"stream_data"`  | Streaming data chunk          |
| `JobEvent.STREAM_END`   | `"stream_end"`   | Worker finished streaming     |

## JobGroupResponse

```python theme={null}
from pipecat.pipeline.job_context import JobGroupResponse
```

Collected results from a completed job group. Passed to `on_job_completed`.

| Field       | Type              | Description                              |
| ----------- | ----------------- | ---------------------------------------- |
| `job_id`    | `str`             | The shared job identifier                |
| `responses` | `dict[str, dict]` | Collected responses keyed by worker name |

## JobGroup

```python theme={null}
from pipecat.pipeline.job_context import JobGroup
```

Tracks a group of agents launched together by a job request. Exposed via [`BaseWorker.job_groups`](/api-reference/server/workers/base-worker#job_groups) and returned by `create_job_group_and_request_job`.

| Field             | Type                   | Default | Description                                     |
| ----------------- | ---------------------- | ------- | ----------------------------------------------- |
| `job_id`          | `str`                  |         | Shared identifier for all agents in this group  |
| `worker_names`    | `set[str]`             |         | Names of the agents in the group                |
| `responses`       | `dict[str, dict]`      | `{}`    | Collected responses keyed by worker name        |
| `timeout_task`    | `asyncio.Task \| None` | `None`  | Optional task that cancels the group on timeout |
| `cancel_on_error` | `bool`                 | `True`  | Whether to cancel the group if a worker errors  |

### Properties

| Property  | Type   | Description                               |
| --------- | ------ | ----------------------------------------- |
| `is_done` | `bool` | Whether the group has completed or failed |

### Methods

```python theme={null}
# Wait for all agents in the group to respond (raises JobGroupError on failure)
await group.wait()
```

## JobGroupEvent

```python theme={null}
from pipecat.pipeline.job_context import JobGroupEvent
```

An event received from a worker during job group execution. Yielded by `async for` over a [`JobGroupContext`](#jobgroupcontext).

| Field         | Type           | Description                                |
| ------------- | -------------- | ------------------------------------------ |
| `type`        | `str`          | The event type (see constants below)       |
| `worker_name` | `str`          | The name of the worker that sent the event |
| `data`        | `dict \| None` | Optional event payload                     |

### Event Type Constants

| Constant                     | Value            | Description                   |
| ---------------------------- | ---------------- | ----------------------------- |
| `JobGroupEvent.UPDATE`       | `"update"`       | Progress update from a worker |
| `JobGroupEvent.STREAM_START` | `"stream_start"` | Worker started streaming      |
| `JobGroupEvent.STREAM_DATA`  | `"stream_data"`  | Streaming data chunk          |
| `JobGroupEvent.STREAM_END`   | `"stream_end"`   | Worker finished streaming     |

## WorkerReadyData

```python theme={null}
from pipecat.registry.types import WorkerReadyData
```

Information about a registered agent. Passed to `on_worker_ready` and `@worker_ready` handlers.

| Field         | Type  | Description                                |
| ------------- | ----- | ------------------------------------------ |
| `worker_name` | `str` | The name of the agent                      |
| `runner`      | `str` | The name of the runner managing this agent |

## WorkerErrorData

```python theme={null}
from pipecat.registry.types import WorkerErrorData
```

Information about an agent error. Passed to `on_worker_failed`.

| Field         | Type  | Description                        |
| ------------- | ----- | ---------------------------------- |
| `worker_name` | `str` | The name of the agent that errored |
| `error`       | `str` | Description of the error           |

## WorkerRegistryEntry

```python theme={null}
from pipecat.registry.types import WorkerRegistryEntry
```

Information about an agent in a registry snapshot.

| Field        | Type            | Default | Description                                         |
| ------------ | --------------- | ------- | --------------------------------------------------- |
| `name`       | `str`           |         | The agent's name                                    |
| `parent`     | `str \| None`   | `None`  | Name of the parent agent, or `None` for root agents |
| `active`     | `bool`          | `False` | Whether the agent is currently active               |
| `bridged`    | `bool`          | `False` | Whether the agent is bridged                        |
| `started_at` | `float \| None` | `None`  | Unix timestamp when the agent became ready          |

## WorkerRegistry

```python theme={null}
from pipecat.registry import WorkerRegistry
```

Tracks all known agents across local and remote runners. Owned by the [`WorkerRunner`](/api-reference/server/workers/runner) and shared with its agents.

### Properties

| Property         | Type        | Description                                     |
| ---------------- | ----------- | ----------------------------------------------- |
| `runner_name`    | `str`       | The name of the runner that owns this registry  |
| `local_workers`  | `list[str]` | Names of agents registered under this runner    |
| `remote_workers` | `list[str]` | Names of agents registered under remote runners |

### Methods

#### get

```python theme={null}
def get(self, worker_name: str) -> WorkerReadyData | None
```

Look up a registered agent by name.

#### watch

```python theme={null}
async def watch(self, worker_name: str, handler: WatchHandler) -> None
```

Watch for a specific agent's registration. If the agent is already registered, the handler fires immediately.

| Parameter     | Type                                     | Description                                  |
| ------------- | ---------------------------------------- | -------------------------------------------- |
| `worker_name` | `str`                                    | The agent name to watch for                  |
| `handler`     | `Callable[[WorkerReadyData], Coroutine]` | Async callable invoked with the agent's data |

#### register

```python theme={null}
async def register(self, worker_data: WorkerReadyData) -> bool
```

Register an agent. Returns `True` if the agent was newly registered, `False` if already known.
