Documentation Index
Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
Use this file to discover all available pages before exploring further.
TaskStatus
from pipecat_subagents.agents.task_context import TaskStatus
Status of a completed task. Inherits from str so values compare naturally with plain strings.
| Value | Description |
|---|
TaskStatus.COMPLETED | The task finished successfully |
TaskStatus.CANCELLED | The task was cancelled by the requester |
TaskStatus.FAILED | The task failed due to a logical or business error |
TaskStatus.ERROR | The task encountered an unexpected runtime error |
AgentActivationArgs
from pipecat_subagents.agents.base_agent import AgentActivationArgs
Base activation arguments for any agent.
| Field | Type | Default | Description | |
|---|
metadata | `dict | None` | None | Optional structured data passed during activation |
Methods
# Create from a dict (ignores unknown keys)
args = AgentActivationArgs.from_dict({"metadata": {...}})
# Convert to a dict (excludes None values)
data = args.to_dict()
LLMAgentActivationArgs
from pipecat_subagents.agents.llm_agent import LLMAgentActivationArgs
Activation arguments for LLMAgent. Extends AgentActivationArgs.
| Field | Type | Default | Description | |
|---|
metadata | `dict | None` | None | Optional structured data passed during activation |
messages | `list | None` | None | LLM context messages to inject on activation |
run_llm | `bool | None` | None | Whether to run the LLM after appending messages. Defaults to True when messages is set |
TaskContext
from pipecat_subagents.agents.task_context import TaskContext
Async context manager and iterator for a single-agent task. Sends a task request on enter, waits for the response on exit. Supports async for to receive intermediate events.
On normal completion, the result is available via response. On worker error or timeout, raises TaskError.
Properties
| Property | Type | Description |
|---|
task_id | str | The task identifier |
response | dict | The worker’s response payload |
Usage
async with self.task("worker", payload=data) as t:
async for event in t:
print(f"[{event.type}]: {event.data}")
print(t.response)
TaskGroupContext
from pipecat_subagents.agents.task_context import TaskGroupContext
Async context manager and iterator for structured task group execution. Sends task requests on enter, waits for all responses on exit. Supports async for to receive intermediate events.
On normal completion, results are available via responses. On worker error (with cancel_on_error=True) or timeout, raises TaskGroupError.
Properties
| Property | Type | Description |
|---|
task_id | str | The shared task identifier for this group |
responses | dict[str, dict] | Collected responses keyed by agent name |
Usage
async with self.task_group("w1", "w2", payload=data) as tg:
async for event in tg:
print(f"{event.agent_name} [{event.type}]: {event.data}")
for name, result in tg.responses.items():
print(name, result)
TaskGroupResponse
from pipecat_subagents.agents.task_context import TaskGroupResponse
Collected results from a completed task group. Passed to on_task_completed.
| Field | Type | Description |
|---|
task_id | str | The shared task identifier |
responses | dict[str, dict] | Collected responses keyed by agent name |
TaskGroupEvent
from pipecat_subagents.agents.task_context import TaskGroupEvent
An event received from a worker during task group execution.
| Field | Type | Description | |
|---|
type | str | The event type (see constants below) | |
agent_name | str | The name of the agent that sent the event | |
data | `dict | None` | Optional event payload |
Event Type Constants
| Constant | Value | Description |
|---|
TaskGroupEvent.UPDATE | "update" | Progress update from a worker |
TaskGroupEvent.STREAM_START | "stream_start" | Worker started streaming |
TaskGroupEvent.STREAM_DATA | "stream_data" | Streaming data chunk |
TaskGroupEvent.STREAM_END | "stream_end" | Worker finished streaming |
AgentReadyData
from pipecat_subagents.types import AgentReadyData
Information about a registered agent. Passed to on_agent_ready and @agent_ready handlers.
| Field | Type | Description |
|---|
agent_name | str | The name of the agent |
runner | str | The name of the runner managing this agent |
AgentErrorData
from pipecat_subagents.types import AgentErrorData
Information about an agent error. Passed to on_agent_error.
| Field | Type | Description |
|---|
agent_name | str | The name of the agent that errored |
error | str | Description of the error |
AgentRegistryEntry
from pipecat_subagents.types import AgentRegistryEntry
Information about an agent in a registry snapshot.
| Field | Type | Default | Description | |
|---|
name | str | | The agent’s name | |
parent | `str | None` | None | Name of the parent agent, or None for root agents |
active | bool | False | Whether the agent is currently active | |
bridged | bool | False | Whether the agent is bridged | |
started_at | `float | None` | None | Unix timestamp when the agent became ready |
AgentRegistry
from pipecat_subagents.registry import AgentRegistry
Tracks all known agents across local and remote runners. Owned by the AgentRunner and shared with its agents.
Properties
| Property | Type | Description |
|---|
runner_name | str | The name of the runner that owns this registry |
local_agents | list[str] | Names of agents registered under this runner |
remote_agents | list[str] | Names of agents registered under remote runners |
Methods
get
def get(self, agent_name: str) -> AgentReadyData | None
Look up a registered agent by name.
watch
async def watch(self, agent_name: str, handler: WatchHandler) -> None
Watch for a specific agent’s registration. If the agent is already registered, the handler fires immediately.
| Parameter | Type | Description |
|---|
agent_name | str | The agent name to watch for |
handler | Callable[[AgentReadyData], Coroutine] | Async callable invoked with the agent’s data |
register
async def register(self, agent_data: AgentReadyData) -> bool
Register an agent. Returns True if the agent was newly registered, False if already known.