Skip to main content

TaskStatus

from pipecat_subagents.agents.task_context import TaskStatus
Status of a completed task. Inherits from str so values compare naturally with plain strings.
ValueDescription
TaskStatus.COMPLETEDThe task finished successfully
TaskStatus.CANCELLEDThe task was cancelled by the requester
TaskStatus.FAILEDThe task failed due to a logical or business error
TaskStatus.ERRORThe task encountered an unexpected runtime error

AgentActivationArgs

from pipecat_subagents.agents.base_agent import AgentActivationArgs
Base activation arguments for any agent.
FieldTypeDefaultDescription
metadataOptional[dict]NoneOptional structured data passed during activation

Methods

# Create from a dict (ignores unknown keys)
args = AgentActivationArgs.from_dict({"metadata": {...}})

# Convert to a dict (excludes None values)
data = args.to_dict()

LLMAgentActivationArgs

from pipecat_subagents.agents.llm_agent import LLMAgentActivationArgs
Activation arguments for LLMAgent. Extends AgentActivationArgs.
FieldTypeDefaultDescription
metadataOptional[dict]NoneOptional structured data passed during activation
messagesOptional[list]NoneLLM context messages to inject on activation
run_llmOptional[bool]NoneWhether to run the LLM after appending messages. Defaults to True when messages is set

TaskContext

from pipecat_subagents.agents.task_context import TaskContext
Async context manager and iterator for a single-agent task. Sends a task request on enter, waits for the response on exit. Supports async for to receive intermediate events. On normal completion, the result is available via response. On worker error or timeout, raises TaskError.

Properties

PropertyTypeDescription
task_idstrThe task identifier
responsedictThe worker’s response payload

Usage

async with self.task("worker", payload=data) as t:
    async for event in t:
        print(f"[{event.type}]: {event.data}")

print(t.response)

TaskGroupContext

from pipecat_subagents.agents.task_context import TaskGroupContext
Async context manager and iterator for structured task group execution. Sends task requests on enter, waits for all responses on exit. Supports async for to receive intermediate events. On normal completion, results are available via responses. On worker error (with cancel_on_error=True) or timeout, raises TaskGroupError.

Properties

PropertyTypeDescription
task_idstrThe shared task identifier for this group
responsesdict[str, dict]Collected responses keyed by agent name

Usage

async with self.task_group("w1", "w2", payload=data) as tg:
    async for event in tg:
        print(f"{event.agent_name} [{event.type}]: {event.data}")

for name, result in tg.responses.items():
    print(name, result)

TaskGroupResponse

from pipecat_subagents.agents.task_context import TaskGroupResponse
Collected results from a completed task group. Passed to on_task_completed.
FieldTypeDescription
task_idstrThe shared task identifier
responsesdict[str, dict]Collected responses keyed by agent name

TaskGroupEvent

from pipecat_subagents.agents.task_context import TaskGroupEvent
An event received from a worker during task group execution.
FieldTypeDescription
typestrThe event type (see constants below)
agent_namestrThe name of the agent that sent the event
dataOptional[dict]Optional event payload

Event Type Constants

ConstantValueDescription
TaskGroupEvent.UPDATE"update"Progress update from a worker
TaskGroupEvent.STREAM_START"stream_start"Worker started streaming
TaskGroupEvent.STREAM_DATA"stream_data"Streaming data chunk
TaskGroupEvent.STREAM_END"stream_end"Worker finished streaming

AgentReadyData

from pipecat_subagents.types import AgentReadyData
Information about a registered agent. Passed to on_agent_ready and @agent_ready handlers.
FieldTypeDescription
agent_namestrThe name of the agent
runnerstrThe name of the runner managing this agent

AgentErrorData

from pipecat_subagents.types import AgentErrorData
Information about an agent error. Passed to on_agent_error.
FieldTypeDescription
agent_namestrThe name of the agent that errored
errorstrDescription of the error

AgentRegistryEntry

from pipecat_subagents.types import AgentRegistryEntry
Information about an agent in a registry snapshot.
FieldTypeDefaultDescription
namestrThe agent’s name
parentOptional[str]NoneName of the parent agent, or None for root agents
activeboolFalseWhether the agent is currently active
bridgedboolFalseWhether the agent is bridged
started_atOptional[float]NoneUnix timestamp when the agent became ready

AgentRegistry

from pipecat_subagents.registry import AgentRegistry
Tracks all known agents across local and remote runners. Owned by the AgentRunner and shared with its agents.

Properties

PropertyTypeDescription
runner_namestrThe name of the runner that owns this registry
local_agentslist[str]Names of agents registered under this runner
remote_agentslist[str]Names of agents registered under remote runners

Methods

get

def get(self, agent_name: str) -> Optional[AgentReadyData]
Look up a registered agent by name.

watch

async def watch(self, agent_name: str, handler: WatchHandler) -> None
Watch for a specific agent’s registration. If the agent is already registered, the handler fires immediately.
ParameterTypeDescription
agent_namestrThe agent name to watch for
handlerCallable[[AgentReadyData], Coroutine]Async callable invoked with the agent’s data

register

async def register(self, agent_data: AgentReadyData) -> bool
Register an agent. Returns True if the agent was newly registered, False if already known.