Skip to main content

@tool

Mark an agent method as an LLM tool. On LLMAgent subclasses, decorated methods are automatically registered with the LLM and included in build_tools(). On FlowsAgent subclasses, decorated methods are automatically collected as global functions available at every flow node.
from pipecat_subagents.agents import tool
Can be used with or without arguments:
@tool
async def my_tool(self, params, arg: str):
    ...

@tool(cancel_on_interruption=False, timeout=60)
async def my_tool(self, params, arg: str):
    ...

Parameters

cancel_on_interruption
bool
default:"True"
Whether to cancel this tool call when an interruption occurs. Only applies to LLMAgent tools.
timeout
Optional[float]
default:"None"
Timeout in seconds for this tool call. Defaults to None (uses the LLM service default).

Method Signature

Tool methods receive the LLM function call parameters object as their first argument (after self), followed by the tool’s declared parameters:
@tool
async def get_weather(self, params, location: str, unit: str = "celsius"):
    """Get the current weather for a location."""
    result = await fetch_weather(location, unit)
    await params.result_callback(result)
Tool methods must call params.result_callback() to return a result to the LLM. The method signature (parameter names, types, and docstring) is automatically used to generate the tool schema.

@task

Mark an agent method as a task handler. Decorated methods are automatically collected by BaseAgent at initialization and dispatched when matching task requests arrive.
from pipecat_subagents.agents import task
Can be used with or without arguments:
# Default handler (receives unnamed requests)
@task
async def on_task_request(self, message):
    result = await do_work(message.payload)
    await self.send_task_response(result)

# Named handler (receives only "research" requests)
@task(name="research")
async def on_research(self, message):
    result = await do_research(message.payload)
    await self.send_task_response(result)

# Parallel handler (each request runs concurrently)
@task(name="research", parallel=True)
async def on_research(self, message):
    result = await do_research(message.payload)
    await self.send_task_response(result)

Parameters

name
Optional[str]
default:"None"
Task name to match. When set, this handler only receives requests with a matching name. When None, handles all unnamed requests (or requests with no matching named handler).
parallel
bool
default:"False"
When True, each request runs in a separate asyncio task for concurrent execution.

Method Signature

Task handler methods receive a BusTaskRequestMessage:
@task(name="analyze")
async def on_analyze(self, message: BusTaskRequestMessage):
    data = message.payload
    # Send progress updates
    await self.send_task_update({"progress": 50})
    # Send final response
    await self.send_task_response({"result": "done"})

@agent_ready

Mark a method as a handler for a specific agent becoming ready. Decorated methods are automatically collected by BaseAgent at initialization. When on_ready fires, the agent calls watch_agent for each decorated handler. When the watched agent registers, the decorated method is called.
from pipecat_subagents.agents import agent_ready
@agent_ready(name="greeter")
async def on_greeter_ready(self, data: AgentReadyData) -> None:
    await self.activate_agent("greeter", args=...)

Parameters

name
str
required
The name of the agent to watch.

Method Signature

Agent-ready handler methods receive an AgentReadyData instance:
@agent_ready(name="worker")
async def on_worker_ready(self, data: AgentReadyData) -> None:
    print(f"Agent {data.agent_name} is ready on runner {data.runner}")