> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Types

> Shared types, enums, and data classes

## TaskStatus

```python theme={null}
from pipecat_subagents.agents.task_context import TaskStatus
```

Status of a completed task. Inherits from `str` so values compare naturally with plain strings.

| Value                  | Description                                        |
| ---------------------- | -------------------------------------------------- |
| `TaskStatus.COMPLETED` | The task finished successfully                     |
| `TaskStatus.CANCELLED` | The task was cancelled by the requester            |
| `TaskStatus.FAILED`    | The task failed due to a logical or business error |
| `TaskStatus.ERROR`     | The task encountered an unexpected runtime error   |

## AgentActivationArgs

```python theme={null}
from pipecat_subagents.agents.base_agent import AgentActivationArgs
```

Base activation arguments for any agent.

| Field      | Type   | Default | Description |                                                   |
| ---------- | ------ | ------- | ----------- | ------------------------------------------------- |
| `metadata` | \`dict | None\`  | `None`      | Optional structured data passed during activation |

### Methods

```python theme={null}
# Create from a dict (ignores unknown keys)
args = AgentActivationArgs.from_dict({"metadata": {...}})

# Convert to a dict (excludes None values)
data = args.to_dict()
```

## LLMAgentActivationArgs

```python theme={null}
from pipecat_subagents.agents.llm_agent import LLMAgentActivationArgs
```

Activation arguments for [`LLMAgent`](/api-reference/pipecat-subagents/llm-agent). Extends [`AgentActivationArgs`](#agentactivationargs).

| Field      | Type   | Default | Description |                                                                                            |
| ---------- | ------ | ------- | ----------- | ------------------------------------------------------------------------------------------ |
| `metadata` | \`dict | None\`  | `None`      | Optional structured data passed during activation                                          |
| `messages` | \`list | None\`  | `None`      | LLM context messages to inject on activation                                               |
| `run_llm`  | \`bool | None\`  | `None`      | Whether to run the LLM after appending messages. Defaults to `True` when `messages` is set |

## TaskContext

```python theme={null}
from pipecat_subagents.agents.task_context import TaskContext
```

Async context manager and iterator for a single-agent task. Sends a task request on enter, waits for the response on exit. Supports `async for` to receive intermediate events.

On normal completion, the result is available via `response`. On worker error or timeout, raises [`TaskError`](/api-reference/pipecat-subagents/exceptions#taskerror).

### Properties

| Property   | Type   | Description                   |
| ---------- | ------ | ----------------------------- |
| `task_id`  | `str`  | The task identifier           |
| `response` | `dict` | The worker's response payload |

### Usage

```python theme={null}
async with self.task("worker", payload=data) as t:
    async for event in t:
        print(f"[{event.type}]: {event.data}")

print(t.response)
```

## TaskGroupContext

```python theme={null}
from pipecat_subagents.agents.task_context import TaskGroupContext
```

Async context manager and iterator for structured task group execution. Sends task requests on enter, waits for all responses on exit. Supports `async for` to receive intermediate events.

On normal completion, results are available via `responses`. On worker error (with `cancel_on_error=True`) or timeout, raises [`TaskGroupError`](/api-reference/pipecat-subagents/exceptions#taskgrouperror).

### Properties

| Property    | Type              | Description                               |
| ----------- | ----------------- | ----------------------------------------- |
| `task_id`   | `str`             | The shared task identifier for this group |
| `responses` | `dict[str, dict]` | Collected responses keyed by agent name   |

### Usage

```python theme={null}
async with self.task_group("w1", "w2", payload=data) as tg:
    async for event in tg:
        print(f"{event.agent_name} [{event.type}]: {event.data}")

for name, result in tg.responses.items():
    print(name, result)
```

## TaskGroupResponse

```python theme={null}
from pipecat_subagents.agents.task_context import TaskGroupResponse
```

Collected results from a completed task group. Passed to `on_task_completed`.

| Field       | Type              | Description                             |
| ----------- | ----------------- | --------------------------------------- |
| `task_id`   | `str`             | The shared task identifier              |
| `responses` | `dict[str, dict]` | Collected responses keyed by agent name |

## TaskGroupEvent

```python theme={null}
from pipecat_subagents.agents.task_context import TaskGroupEvent
```

An event received from a worker during task group execution.

| Field        | Type   | Description                               |                        |
| ------------ | ------ | ----------------------------------------- | ---------------------- |
| `type`       | `str`  | The event type (see constants below)      |                        |
| `agent_name` | `str`  | The name of the agent that sent the event |                        |
| `data`       | \`dict | None\`                                    | Optional event payload |

### Event Type Constants

| Constant                      | Value            | Description                   |
| ----------------------------- | ---------------- | ----------------------------- |
| `TaskGroupEvent.UPDATE`       | `"update"`       | Progress update from a worker |
| `TaskGroupEvent.STREAM_START` | `"stream_start"` | Worker started streaming      |
| `TaskGroupEvent.STREAM_DATA`  | `"stream_data"`  | Streaming data chunk          |
| `TaskGroupEvent.STREAM_END`   | `"stream_end"`   | Worker finished streaming     |

## AgentReadyData

```python theme={null}
from pipecat_subagents.types import AgentReadyData
```

Information about a registered agent. Passed to `on_agent_ready` and `@agent_ready` handlers.

| Field        | Type  | Description                                |
| ------------ | ----- | ------------------------------------------ |
| `agent_name` | `str` | The name of the agent                      |
| `runner`     | `str` | The name of the runner managing this agent |

## AgentErrorData

```python theme={null}
from pipecat_subagents.types import AgentErrorData
```

Information about an agent error. Passed to `on_agent_error`.

| Field        | Type  | Description                        |
| ------------ | ----- | ---------------------------------- |
| `agent_name` | `str` | The name of the agent that errored |
| `error`      | `str` | Description of the error           |

## AgentRegistryEntry

```python theme={null}
from pipecat_subagents.types import AgentRegistryEntry
```

Information about an agent in a registry snapshot.

| Field        | Type    | Default | Description                           |                                                     |
| ------------ | ------- | ------- | ------------------------------------- | --------------------------------------------------- |
| `name`       | `str`   |         | The agent's name                      |                                                     |
| `parent`     | \`str   | None\`  | `None`                                | Name of the parent agent, or `None` for root agents |
| `active`     | `bool`  | `False` | Whether the agent is currently active |                                                     |
| `bridged`    | `bool`  | `False` | Whether the agent is bridged          |                                                     |
| `started_at` | \`float | None\`  | `None`                                | Unix timestamp when the agent became ready          |

## AgentRegistry

```python theme={null}
from pipecat_subagents.registry import AgentRegistry
```

Tracks all known agents across local and remote runners. Owned by the [`AgentRunner`](/api-reference/pipecat-subagents/agent-runner) and shared with its agents.

### Properties

| Property        | Type        | Description                                     |
| --------------- | ----------- | ----------------------------------------------- |
| `runner_name`   | `str`       | The name of the runner that owns this registry  |
| `local_agents`  | `list[str]` | Names of agents registered under this runner    |
| `remote_agents` | `list[str]` | Names of agents registered under remote runners |

### Methods

#### get

```python theme={null}
def get(self, agent_name: str) -> AgentReadyData | None
```

Look up a registered agent by name.

#### watch

```python theme={null}
async def watch(self, agent_name: str, handler: WatchHandler) -> None
```

Watch for a specific agent's registration. If the agent is already registered, the handler fires immediately.

| Parameter    | Type                                    | Description                                  |
| ------------ | --------------------------------------- | -------------------------------------------- |
| `agent_name` | `str`                                   | The agent name to watch for                  |
| `handler`    | `Callable[[AgentReadyData], Coroutine]` | Async callable invoked with the agent's data |

#### register

```python theme={null}
async def register(self, agent_data: AgentReadyData) -> bool
```

Register an agent. Returns `True` if the agent was newly registered, `False` if already known.
