> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Task Coordination

> Dispatch work to other agents and collect results.

## What are tasks?

Tasks let one agent dispatch work to other agents. While handoff transfers the conversation, tasks are for background work -- an agent asks workers to do something and collects their results.

Use cases for tasks:

* Researching a topic from multiple perspectives in parallel
* Running a code analysis while talking to the user
* Fetching data from multiple sources simultaneously

## Single task

The simplest form is `self.task()`, which sends a task to one agent and waits for the response:

```python theme={null}
async with self.task("worker", payload={"question": "What is Pipecat?"}, timeout=30) as t:
    pass

print(t.response)
```

`self.task()` is a context manager that handles the full lifecycle: it waits for the agent to be ready, sends the request, and collects the response. Under the hood, this is equivalent to calling `request_task()` (fire-and-forget) and then handling `on_task_response()` and `on_task_completed()` yourself.

## Task group

When you need to dispatch work to multiple agents in parallel, use `self.task_group()`:

```python theme={null}
async with self.task_group("worker1", "worker2", "worker3", payload={"topic": "AI safety"}, timeout=30) as tg:
    pass

for agent_name, response in tg.responses.items():
    print(f"{agent_name}: {response}")
```

`self.task_group()` works the same way but for multiple agents: it waits for all of them to be ready, sends a request to each, and collects all responses. Under the hood, this is equivalent to calling `request_task_group()` and then handling `on_task_response()` for each agent and a final `on_task_completed()` yourself.

The same payload is sent to every agent. If you need different arguments per agent, you can structure the payload so each worker reads its own key:

```python theme={null}
async with self.task_group("researcher", "fact_checker", payload={
    "researcher": {"topic": "AI safety", "depth": "detailed"},
    "fact_checker": {"claims": ["AI can self-improve", "AGI is near"]},
}, timeout=30) as tg:
    pass
```

## Handling task requests

Workers handle incoming tasks in two ways.

### The @task decorator

The `@task` decorator marks a method as a task handler. The framework automatically dispatches matching requests to it. The worker passes `message.task_id` when sending the response:

```python theme={null}
from pipecat_subagents.agents import task

class MyWorker(BaseAgent):
    @task
    async def on_task_handler(self, message: BusTaskRequestMessage):
        result = await self._do_work(message.payload)
        await self.send_task_response(message.task_id, result)
```

You can use named tasks to route different types of work to different handlers:

```python theme={null}
class MyWorker(BaseAgent):
    @task(name="research")
    async def on_research(self, message: BusTaskRequestMessage):
        await self.send_task_response(message.task_id, {"answer": "..."})

    @task(name="summarize")
    async def on_summarize(self, message: BusTaskRequestMessage):
        await self.send_task_response(message.task_id, {"summary": "..."})
```

The requester specifies the task name when dispatching:

```python theme={null}
async with self.task("worker", name="research", payload={"topic": "AI"}) as t:
    pass
```

Each request runs in its own asyncio task, so multiple requests to the same handler are executed concurrently without blocking the bus message loop.

### Overriding on\_task\_request

Alternatively, you can override `on_task_request()` directly without the `@task` decorator:

```python theme={null}
class MyWorker(BaseAgent):
    async def on_task_request(self, message: BusTaskRequestMessage) -> None:
        await super().on_task_request(message)
        result = await self._do_work(message.payload)
        await self.send_task_response(message.task_id, result)
```

This is useful when you need custom routing logic or want to integrate with an existing pipeline, as shown in the example below.

<Note>
  `send_task_response()`, `send_task_update()`, and `send_task_stream_*()` all require an explicit `task_id`. This lets a worker handle multiple concurrent tasks and respond to each one correctly. For simple handlers, pass `message.task_id` from the request. For asynchronous responses (see the example below), track the `task_id` yourself until you're ready to respond.
</Note>

## Building a task system

Let's build a debate system where a moderator dispatches a topic to three workers, each arguing from a different perspective.

### Worker agents

Each worker runs its own LLM pipeline. The LLM response arrives asynchronously through an event handler, so the worker tracks the current task ID until it has something to respond with:

```python theme={null}
from pipecat_subagents.agents import LLMAgent
from pipecat_subagents.bus.messages import BusTaskRequestMessage

class DebateWorker(LLMAgent):
    def __init__(self, name, *, bus, role):
        super().__init__(name, bus=bus)
        self._role = role
        self._current_task_id = ""

    def build_llm(self) -> LLMService:
        return OpenAILLMService(
            api_key=os.getenv("OPENAI_API_KEY"),
            settings=OpenAILLMSettings(system_instruction=f"You argue as a {self._role}."),
        )

    async def build_pipeline(self) -> Pipeline:
        llm = self.build_llm()
        context = LLMContext()
        user_agg, assistant_agg = LLMContextAggregatorPair(context)

        @assistant_agg.event_handler("on_assistant_turn_stopped")
        async def on_assistant_turn_stopped(aggregator, message):
            text = message.content
            if self._current_task_id:
                task_id = self._current_task_id
                await self.send_task_response(task_id, {"role": self._role, "text": text})
                self._current_task_id = ""

        return Pipeline([user_agg, llm, assistant_agg])

    async def on_task_request(self, message: BusTaskRequestMessage) -> None:
        await super().on_task_request(message)
        self._current_task_id = message.task_id
        await self.queue_frame(
            LLMMessagesAppendFrame(
                messages=[{"role": "developer", "content": f"Topic: {message.payload['topic']}"}],
                run_llm=True,
            )
        )
```

The worker:

1. Receives a task request and stores the `task_id`
2. Injects the topic into its LLM context and runs the LLM
3. When the LLM finishes its turn, the event handler sends the response with the stored `task_id`

### Coordinator agent

The coordinator creates workers, triggers tasks via a tool, and synthesizes results:

```python theme={null}
class ModeratorAgent(LLMAgent):
    def __init__(self, name, *, bus):
        super().__init__(name, bus=bus, bridged=())
        self._workers = []

    def build_llm(self) -> LLMService:
        return OpenAILLMService(
            api_key=os.getenv("OPENAI_API_KEY"),
            settings=OpenAILLMSettings(
                system_instruction=(
                    "You are a debate moderator. When the user gives a topic, "
                    "call the debate tool. Then synthesize the results."
                ),
            ),
        )

    async def on_ready(self) -> None:
        await super().on_ready()
        for role in ("advocate", "critic", "analyst"):
            worker = DebateWorker(role, bus=self.bus, role=role)
            self._workers.append(worker)
            await self.add_agent(worker)

    @tool(cancel_on_interruption=False)
    async def debate(self, params: FunctionCallParams, topic: str):
        """Analyze a topic from multiple perspectives.

        Args:
            topic (str): The topic to debate.
        """
        worker_names = [str(w) for w in self._workers]
        async with self.task_group(*worker_names, payload={"topic": topic}, timeout=30) as tg:
            pass

        result = "\n\n".join(
            f"{r['role'].upper()}: {r['text']}" for r in tg.responses.values()
        )
        await params.result_callback(result)
```

When the user says "debate whether AI should be regulated," the moderator:

1. The LLM calls the `debate` tool
2. `task_group()` sends the topic to all three workers in parallel
3. Each worker runs its own LLM and responds with its perspective
4. The moderator collects all responses and returns them to the LLM
5. The LLM synthesizes a balanced summary and speaks it to the user

## Task lifecycle

<Steps>
  <Step title="Request">
    Parent calls `task()` or `task_group()`. The framework waits for workers to be ready, then sends a task request to each.
  </Step>

  <Step title="Process">
    Workers receive the request in `on_task_request()` and do their work.
  </Step>

  <Step title="Respond">
    Workers call `send_task_response(task_id, ...)` with their results.
  </Step>

  <Step title="Complete">
    When all workers respond (or timeout occurs), the context manager exits and results are available.
  </Step>
</Steps>

The framework also supports fire-and-forget tasks, progress updates, and streaming.

## Task cancellation

Tasks can be cancelled in several ways. The framework handles cleanup automatically and notifies workers so they can stop in-progress work.

### Automatic cancellation with context managers

When using `task()` or `task_group()`, cancellation happens automatically if the context block raises an exception. This includes tool interruptions (`CancelledError`) when using `cancel_on_interruption=True`:

```python theme={null}
# If an exception or CancelledError is raised inside the block,
# all workers are cancelled automatically
async with self.task_group("w1", "w2", payload=data) as tg:
    # ... if this raises, workers get cancelled
    pass
```

### Worker errors

By default (`cancel_on_error=True`), if any worker responds with an error status, the remaining workers in the group are cancelled and `TaskGroupError` is raised:

```python theme={null}
try:
    async with self.task_group("w1", "w2", payload=data) as tg:
        pass
except TaskGroupError as e:
    # A worker errored -- remaining workers were cancelled
    pass
```

### Manual cancellation with cancel\_task()

For fire-and-forget tasks (using `request_task()`), you manage cancellation yourself by tracking task IDs:

```python theme={null}
task_ids = []
try:
    task_ids.append(await self.request_task("w1", payload={"job": 1}))
    task_ids.append(await self.request_task("w2", payload={"job": 2}))
    # ...
except asyncio.CancelledError:
    for tid in task_ids:
        await self.cancel_task(tid, reason="tool cancelled")
```

### Handling cancellation on the worker side

When a task is cancelled, the worker's `on_task_cancelled` hook fires. The framework automatically sends a `CANCELLED` response back to the requester, so you only need to override this hook if you have resources to clean up:

```python theme={null}
class MyWorker(BaseAgent):
    async def on_task_cancelled(self, message: BusTaskCancelMessage) -> None:
        # Optional: clean up resources, stop in-progress work
        logger.info(f"Task {message.task_id} cancelled: {message.reason}")
```

### Agent shutdown

When a worker agent stops with tasks still in flight, it automatically sends a `CANCELLED` response for each active task so requesters aren't left waiting on a timeout.

## What's next

You now understand the core concepts of Pipecat Subagents: agents, the bus, handoff, and tasks. The Fundamentals section covers specific patterns in depth.

<Card title="What's Next" icon="arrow-right" href="/subagents/learn/whats-next">
  Explore Fundamentals and advanced patterns
</Card>
