> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Job Coordination

> Dispatch work to other agents and collect results.

## What are jobs?

Jobs let one agent dispatch work to other agents. While handoff transfers the conversation, jobs are for background work -- an agent asks other agents to do something and collects their results.

Use cases for jobs:

* Researching a topic from multiple perspectives in parallel
* Running a code analysis while talking to the user
* Fetching data from multiple sources simultaneously

## Single job

The simplest form is `self.job()`, which sends a job to one agent and waits for the response:

```python theme={null}
async with self.job("worker", payload={"question": "What is Pipecat?"}, timeout=30) as j:
    pass

print(j.response)
```

`self.job()` is a context manager that handles the full lifecycle: it waits for the agent to be ready, sends the request, and collects the response. Under the hood, this is equivalent to calling `request_job()` (fire-and-forget) and then handling `on_job_response()` and `on_job_completed()` yourself.

## Job group

When you need to dispatch work to multiple agents in parallel, use `self.job_group()`:

```python theme={null}
async with self.job_group("worker1", "worker2", "worker3", payload={"topic": "AI safety"}, timeout=30) as jg:
    pass

for worker_name, response in jg.responses.items():
    print(f"{worker_name}: {response}")
```

`self.job_group()` works the same way but for multiple agents: it waits for all of them to be ready, sends a request to each, and collects all responses. Under the hood, this is equivalent to calling `request_job_group()` and then handling `on_job_response()` for each agent and a final `on_job_completed()` yourself.

The same payload is sent to every agent. If you need different arguments per agent, you can structure the payload so each one reads its own key:

```python theme={null}
async with self.job_group("researcher", "fact_checker", payload={
    "researcher": {"topic": "AI safety", "depth": "detailed"},
    "fact_checker": {"claims": ["AI can self-improve", "AGI is near"]},
}, timeout=30) as jg:
    pass
```

## Handling job requests

Agents handle incoming jobs in two ways.

### The @job decorator

The `@job` decorator marks a method as a job handler. The framework automatically dispatches matching requests to it. The decorator requires a `name` argument. The handler responds with `message.job_id`:

```python theme={null}
from pipecat.pipeline.job_decorator import job

class MyWorker(BaseWorker):
    @job(name="process")
    async def on_process(self, message: BusJobRequestMessage):
        result = await self._do_work(message.payload)
        await self.send_job_response(message.job_id, result)
```

You can use different job names to route work to different handlers:

```python theme={null}
class MyWorker(BaseWorker):
    @job(name="research")
    async def on_research(self, message: BusJobRequestMessage):
        await self.send_job_response(message.job_id, {"answer": "..."})

    @job(name="summarize")
    async def on_summarize(self, message: BusJobRequestMessage):
        await self.send_job_response(message.job_id, {"summary": "..."})
```

The requester specifies the job name when dispatching:

```python theme={null}
async with self.job("worker", name="research", payload={"topic": "AI"}) as j:
    pass
```

Each request runs in its own asyncio task, so multiple requests to the same handler are executed concurrently without blocking the bus message loop.

### Overriding on\_job\_request

Alternatively, you can override `on_job_request()` directly without the `@job` decorator:

```python theme={null}
class MyWorker(BaseWorker):
    async def on_job_request(self, message: BusJobRequestMessage) -> None:
        await super().on_job_request(message)
        result = await self._do_work(message.payload)
        await self.send_job_response(message.job_id, result)
```

This is useful when you need custom routing logic or want to integrate with an existing pipeline, as shown in the example below.

<Note>
  `send_job_response()`, `send_job_update()`, and `send_job_stream_*()` all
  require an explicit `job_id`. This lets an agent handle multiple concurrent
  jobs and respond to each one correctly. For simple handlers, pass
  `message.job_id` from the request. For asynchronous responses (see the example
  below), track the `job_id` yourself until you're ready to respond.
</Note>

## Building a job system

Let's build a debate system where a moderator dispatches a topic to three agents, each arguing from a different perspective.

### Worker agents

Each debate agent runs its own LLM pipeline. The LLM response arrives asynchronously through an event handler, so the agent tracks the current job ID until it has something to respond with:

```python theme={null}
from pipecat.workers.llm import LLMContextWorker
from pipecat.bus.messages import BusJobRequestMessage
from pipecat.frames.frames import LLMMessagesAppendFrame

class DebateWorker(LLMContextWorker):
    def __init__(self, role: str):
        llm = OpenAILLMService(
            api_key=os.getenv("OPENAI_API_KEY"),
            settings=OpenAILLMService.Settings(system_instruction=f"You argue as a {role}."),
        )
        super().__init__(role, llm=llm)
        self._role = role
        self._current_job_id: str | None = None

        @self.assistant_aggregator.event_handler("on_assistant_turn_stopped")
        async def on_assistant_turn_stopped(aggregator, message):
            if self._current_job_id:
                job_id = self._current_job_id
                self._current_job_id = None
                await self.send_job_response(job_id, {"role": self._role, "text": message.content})

    async def on_job_request(self, message: BusJobRequestMessage) -> None:
        await super().on_job_request(message)
        self._current_job_id = message.job_id
        await self.queue_frame(
            LLMMessagesAppendFrame(
                messages=[{"role": "developer", "content": f"Topic: {message.payload['topic']}"}],
                run_llm=True,
            )
        )
```

<Note>
  `LLMContextWorker` extends `LLMWorker` with a built-in `LLMContext` and
  aggregator pair. It builds the pipeline as `[user_aggregator, llm,
      assistant_aggregator]` automatically, so you don't need to wire the context
  plumbing yourself. Access the aggregators via `self.user_aggregator` and
  `self.assistant_aggregator`.
</Note>

The agent:

1. Receives a job request and stores the `job_id`
2. Injects the topic into its LLM context and runs the LLM
3. When the LLM finishes its turn, the event handler sends the response with the stored `job_id`

### Coordinator agent

The moderator triggers jobs via a tool and synthesizes the results:

```python theme={null}
from pipecat.workers.llm import LLMWorker, tool

class ModeratorAgent(LLMWorker):
    @tool(cancel_on_interruption=False)
    async def debate(self, params: FunctionCallParams, topic: str):
        """Analyze a topic from multiple perspectives.

        Args:
            topic (str): The topic to debate.
        """
        async with self.job_group("advocate", "critic", "analyst", payload={"topic": topic}, timeout=30) as jg:
            pass

        result = "\n\n".join(
            f"{r['role'].upper()}: {r['text']}" for r in jg.responses.values()
        )
        await params.result_callback(result)
```

Create the debate agents and the moderator, then register them all with `runner.add_workers(ModeratorAgent("moderator", llm=...), DebateWorker("advocate"), DebateWorker("critic"), DebateWorker("analyst"))`.

When the user says "debate whether AI should be regulated," the moderator:

1. The LLM calls the `debate` tool
2. `job_group()` sends the topic to all three agents in parallel
3. Each agent runs its own LLM and responds with its perspective
4. The moderator collects all responses and returns them to the LLM
5. The LLM synthesizes a balanced summary and speaks it to the user

## Job lifecycle

<Steps>
  <Step title="Request">
    The requester calls `job()` or `job_group()`. The framework waits for agents
    to be ready, then sends a job request to each.
  </Step>

  <Step title="Process">
    Agents receive the request in `on_job_request()` and do their work.
  </Step>

  <Step title="Respond">
    Agents call `send_job_response(job_id, ...)` with their results.
  </Step>

  <Step title="Complete">
    When all agents respond (or timeout occurs), the context manager exits and
    results are available.
  </Step>
</Steps>

The framework also supports fire-and-forget jobs, progress updates, and streaming.

## Job cancellation

Jobs can be cancelled in several ways. The framework handles cleanup automatically and notifies agents so they can stop in-progress work.

### Automatic cancellation with context managers

When using `job()` or `job_group()`, cancellation happens automatically if the context block raises an exception. This includes tool interruptions (`CancelledError`) when using `cancel_on_interruption=True`:

```python theme={null}
# If an exception or CancelledError is raised inside the block,
# all agents are cancelled automatically
async with self.job_group("w1", "w2", payload=data) as jg:
    # ... if this raises, the agents get cancelled
    pass
```

### Worker errors

By default (`cancel_on_error=True`), if any agent responds with an error status, the remaining agents in the group are cancelled and `JobGroupError` is raised:

```python theme={null}
try:
    async with self.job_group("w1", "w2", payload=data) as jg:
        pass
except JobGroupError as e:
    # An agent errored -- remaining agents were cancelled
    pass
```

### Manual cancellation

For fire-and-forget jobs (using `request_job()`), you manage cancellation yourself by tracking job IDs:

```python theme={null}
job_ids = []
try:
    job_ids.append(await self.request_job("w1", payload={"job": 1}))
    job_ids.append(await self.request_job("w2", payload={"job": 2}))
    # ...
except asyncio.CancelledError:
    for jid in job_ids:
        await self.cancel_job_group(jid, reason="tool cancelled")
```

### Handling cancellation on the agent side

When a job is cancelled, the agent's `on_job_cancelled` hook fires. The framework automatically sends a `CANCELLED` response back to the requester, so you only need to override this hook if you have resources to clean up:

```python theme={null}
class MyWorker(BaseWorker):
    async def on_job_cancelled(self, message: BusJobCancelMessage) -> None:
        # Optional: clean up resources, stop in-progress work
        logger.info(f"Job {message.job_id} cancelled: {message.reason}")
```

### Agent shutdown

When an agent stops with jobs still in flight, it automatically sends a `CANCELLED` response for each active job so requesters aren't left waiting on a timeout.

## What's next

You can now build agents, hand off between them, and dispatch jobs. So far everything has run in a single process. Next, let's scale across processes and machines.

<Card title="Distributed Agents" icon="arrow-right" href="/pipecat/learn/distributed-agents">
  Run agents across processes and machines on a shared bus
</Card>
