Skip to main content

What are tasks?

Tasks let one agent dispatch work to other agents. While handoff transfers the conversation, tasks are for background work — an agent asks workers to do something and collects their results. Use cases for tasks:
  • Researching a topic from multiple perspectives in parallel
  • Running a code analysis while talking to the user
  • Fetching data from multiple sources simultaneously

Single task

The simplest form is self.task(), which sends a task to one agent and waits for the response:
async with self.task("worker", payload={"question": "What is Pipecat?"}, timeout=30) as t:
    pass

print(t.response)
self.task() is a context manager that handles the full lifecycle: it waits for the agent to be ready, sends the request, and collects the response. Under the hood, this is equivalent to calling request_task() (fire-and-forget) and then handling on_task_response() and on_task_completed() yourself.

Task group

When you need to dispatch work to multiple agents in parallel, use self.task_group():
async with self.task_group("worker1", "worker2", "worker3", payload={"topic": "AI safety"}, timeout=30) as tg:
    pass

for agent_name, response in tg.responses.items():
    print(f"{agent_name}: {response}")
self.task_group() works the same way but for multiple agents: it waits for all of them to be ready, sends a request to each, and collects all responses. Under the hood, this is equivalent to calling request_task_group() and then handling on_task_response() for each agent and a final on_task_completed() yourself. The same payload is sent to every agent. If you need different arguments per agent, you can structure the payload so each worker reads its own key:
async with self.task_group("researcher", "fact_checker", payload={
    "researcher": {"topic": "AI safety", "depth": "detailed"},
    "fact_checker": {"claims": ["AI can self-improve", "AGI is near"]},
}, timeout=30) as tg:
    pass

Handling task requests

Workers handle incoming tasks in two ways.

The @task decorator

The @task decorator marks a method as a task handler. The framework automatically dispatches matching requests to it. The worker passes message.task_id when sending the response:
from pipecat_subagents.agents import task

class MyWorker(BaseAgent):
    @task
    async def on_task_handler(self, message: BusTaskRequestMessage):
        result = await self._do_work(message.payload)
        await self.send_task_response(message.task_id, result)
You can use named tasks to route different types of work to different handlers:
class MyWorker(BaseAgent):
    @task(name="research")
    async def on_research(self, message: BusTaskRequestMessage):
        await self.send_task_response(message.task_id, {"answer": "..."})

    @task(name="summarize")
    async def on_summarize(self, message: BusTaskRequestMessage):
        await self.send_task_response(message.task_id, {"summary": "..."})
The requester specifies the task name when dispatching:
async with self.task("worker", name="research", payload={"topic": "AI"}) as t:
    pass
Set parallel=True to allow concurrent execution of multiple requests:
@task(parallel=True)
async def on_task_handler(self, message: BusTaskRequestMessage):
    # Each request runs in its own asyncio task
    await self.send_task_response(message.task_id, {"done": True})

Overriding on_task_request

Alternatively, you can override on_task_request() directly without the @task decorator:
class MyWorker(BaseAgent):
    async def on_task_request(self, message: BusTaskRequestMessage) -> None:
        await super().on_task_request(message)
        result = await self._do_work(message.payload)
        await self.send_task_response(message.task_id, result)
This is useful when you need custom routing logic or want to integrate with an existing pipeline, as shown in the example below.
send_task_response(), send_task_update(), and send_task_stream_*() all require an explicit task_id. This lets a worker handle multiple concurrent tasks — typically with @task(parallel=True) — and respond to each one correctly. For simple handlers, pass message.task_id from the request. For asynchronous responses (see the example below), track the task_id yourself until you’re ready to respond.

Building a task system

Let’s build a debate system where a moderator dispatches a topic to three workers, each arguing from a different perspective.

Worker agents

Each worker runs its own LLM pipeline. The LLM response arrives asynchronously through an event handler, so the worker tracks the current task ID until it has something to respond with:
from pipecat_subagents.agents import LLMAgent
from pipecat_subagents.bus.messages import BusTaskRequestMessage

class DebateWorker(LLMAgent):
    def __init__(self, name, *, bus, role):
        super().__init__(name, bus=bus)
        self._role = role
        self._current_task_id = ""

    def build_llm(self) -> LLMService:
        return OpenAILLMService(
            api_key=os.getenv("OPENAI_API_KEY"),
            settings=OpenAILLMSettings(system_instruction=f"You argue as a {self._role}."),
        )

    async def build_pipeline(self) -> Pipeline:
        llm = self.build_llm()
        context = LLMContext()
        user_agg, assistant_agg = LLMContextAggregatorPair(context)

        @assistant_agg.event_handler("on_assistant_turn_stopped")
        async def on_assistant_turn_stopped(aggregator, message):
            text = message.content
            if self._current_task_id:
                task_id = self._current_task_id
                await self.send_task_response(task_id, {"role": self._role, "text": text})
                self._current_task_id = ""

        return Pipeline([user_agg, llm, assistant_agg])

    async def on_task_request(self, message: BusTaskRequestMessage) -> None:
        await super().on_task_request(message)
        self._current_task_id = message.task_id
        await self.queue_frame(
            LLMMessagesAppendFrame(
                messages=[{"role": "developer", "content": f"Topic: {message.payload['topic']}"}],
                run_llm=True,
            )
        )
The worker:
  1. Receives a task request and stores the task_id
  2. Injects the topic into its LLM context and runs the LLM
  3. When the LLM finishes its turn, the event handler sends the response with the stored task_id

Coordinator agent

The coordinator creates workers, triggers tasks via a tool, and synthesizes results:
class ModeratorAgent(LLMAgent):
    def __init__(self, name, *, bus):
        super().__init__(name, bus=bus, bridged=())
        self._workers = []

    def build_llm(self) -> LLMService:
        return OpenAILLMService(
            api_key=os.getenv("OPENAI_API_KEY"),
            settings=OpenAILLMSettings(
                system_instruction=(
                    "You are a debate moderator. When the user gives a topic, "
                    "call the debate tool. Then synthesize the results."
                ),
            ),
        )

    async def on_ready(self) -> None:
        await super().on_ready()
        for role in ("advocate", "critic", "analyst"):
            worker = DebateWorker(role, bus=self.bus, role=role)
            self._workers.append(worker)
            await self.add_agent(worker)

    @tool(cancel_on_interruption=False)
    async def debate(self, params: FunctionCallParams, topic: str):
        """Analyze a topic from multiple perspectives.

        Args:
            topic (str): The topic to debate.
        """
        worker_names = [str(w) for w in self._workers]
        async with self.task_group(*worker_names, payload={"topic": topic}, timeout=30) as tg:
            pass

        result = "\n\n".join(
            f"{r['role'].upper()}: {r['text']}" for r in tg.responses.values()
        )
        await params.result_callback(result)
When the user says “debate whether AI should be regulated,” the moderator:
  1. The LLM calls the debate tool
  2. task_group() sends the topic to all three workers in parallel
  3. Each worker runs its own LLM and responds with its perspective
  4. The moderator collects all responses and returns them to the LLM
  5. The LLM synthesizes a balanced summary and speaks it to the user

Task lifecycle

1

Request

Parent calls task() or task_group(). The framework waits for workers to be ready, then sends a task request to each.
2

Process

Workers receive the request in on_task_request() and do their work.
3

Respond

Workers call send_task_response(task_id, ...) with their results.
4

Complete

When all workers respond (or timeout occurs), the context manager exits and results are available.
The framework also supports fire-and-forget tasks, progress updates, and streaming.

Task cancellation

Tasks can be cancelled in several ways. The framework handles cleanup automatically and notifies workers so they can stop in-progress work.

Automatic cancellation with context managers

When using task() or task_group(), cancellation happens automatically if the context block raises an exception. This includes tool interruptions (CancelledError) when using cancel_on_interruption=True:
# If an exception or CancelledError is raised inside the block,
# all workers are cancelled automatically
async with self.task_group("w1", "w2", payload=data) as tg:
    # ... if this raises, workers get cancelled
    pass

Worker errors

By default (cancel_on_error=True), if any worker responds with an error status, the remaining workers in the group are cancelled and TaskGroupError is raised:
try:
    async with self.task_group("w1", "w2", payload=data) as tg:
        pass
except TaskGroupError as e:
    # A worker errored -- remaining workers were cancelled
    pass

Manual cancellation with cancel_task()

For fire-and-forget tasks (using request_task()), you manage cancellation yourself by tracking task IDs:
task_ids = []
try:
    task_ids.append(await self.request_task("w1", payload={"job": 1}))
    task_ids.append(await self.request_task("w2", payload={"job": 2}))
    # ...
except asyncio.CancelledError:
    for tid in task_ids:
        await self.cancel_task(tid, reason="tool cancelled")

Handling cancellation on the worker side

When a task is cancelled, the worker’s on_task_cancelled hook fires. The framework automatically sends a CANCELLED response back to the requester, so you only need to override this hook if you have resources to clean up:
class MyWorker(BaseAgent):
    async def on_task_cancelled(self, message: BusTaskCancelMessage) -> None:
        # Optional: clean up resources, stop in-progress work
        logger.info(f"Task {message.task_id} cancelled: {message.reason}")

Agent shutdown

When a worker agent stops with tasks still in flight, it automatically sends a CANCELLED response for each active task so requesters aren’t left waiting on a timeout.

What’s next

You now understand the core concepts of Pipecat Subagents: agents, the bus, handoff, and tasks. The Fundamentals section covers specific patterns in depth.

What's Next

Explore Fundamentals and advanced patterns