Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt

Use this file to discover all available pages before exploring further.

What are jobs?

Jobs let one agent dispatch work to other agents. While handoff transfers the conversation, jobs are for background work — an agent asks other agents to do something and collects their results. Use cases for jobs:
  • Researching a topic from multiple perspectives in parallel
  • Running a code analysis while talking to the user
  • Fetching data from multiple sources simultaneously

Single job

The simplest form is self.job(), which sends a job to one agent and waits for the response:
async with self.job("worker", payload={"question": "What is Pipecat?"}, timeout=30) as j:
    pass

print(j.response)
self.job() is a context manager that handles the full lifecycle: it waits for the agent to be ready, sends the request, and collects the response. Under the hood, this is equivalent to calling request_job() (fire-and-forget) and then handling on_job_response() and on_job_completed() yourself.

Job group

When you need to dispatch work to multiple agents in parallel, use self.job_group():
async with self.job_group("worker1", "worker2", "worker3", payload={"topic": "AI safety"}, timeout=30) as jg:
    pass

for worker_name, response in jg.responses.items():
    print(f"{worker_name}: {response}")
self.job_group() works the same way but for multiple agents: it waits for all of them to be ready, sends a request to each, and collects all responses. Under the hood, this is equivalent to calling request_job_group() and then handling on_job_response() for each agent and a final on_job_completed() yourself. The same payload is sent to every agent. If you need different arguments per agent, you can structure the payload so each one reads its own key:
async with self.job_group("researcher", "fact_checker", payload={
    "researcher": {"topic": "AI safety", "depth": "detailed"},
    "fact_checker": {"claims": ["AI can self-improve", "AGI is near"]},
}, timeout=30) as jg:
    pass

Handling job requests

Agents handle incoming jobs in two ways.

The @job decorator

The @job decorator marks a method as a job handler. The framework automatically dispatches matching requests to it. The decorator requires a name argument. The handler responds with message.job_id:
from pipecat.pipeline.job_decorator import job

class MyWorker(BaseWorker):
    @job(name="process")
    async def on_process(self, message: BusJobRequestMessage):
        result = await self._do_work(message.payload)
        await self.send_job_response(message.job_id, result)
You can use different job names to route work to different handlers:
class MyWorker(BaseWorker):
    @job(name="research")
    async def on_research(self, message: BusJobRequestMessage):
        await self.send_job_response(message.job_id, {"answer": "..."})

    @job(name="summarize")
    async def on_summarize(self, message: BusJobRequestMessage):
        await self.send_job_response(message.job_id, {"summary": "..."})
The requester specifies the job name when dispatching:
async with self.job("worker", name="research", payload={"topic": "AI"}) as j:
    pass
Each request runs in its own asyncio task, so multiple requests to the same handler are executed concurrently without blocking the bus message loop.

Overriding on_job_request

Alternatively, you can override on_job_request() directly without the @job decorator:
class MyWorker(BaseWorker):
    async def on_job_request(self, message: BusJobRequestMessage) -> None:
        await super().on_job_request(message)
        result = await self._do_work(message.payload)
        await self.send_job_response(message.job_id, result)
This is useful when you need custom routing logic or want to integrate with an existing pipeline, as shown in the example below.
send_job_response(), send_job_update(), and send_job_stream_*() all require an explicit job_id. This lets an agent handle multiple concurrent jobs and respond to each one correctly. For simple handlers, pass message.job_id from the request. For asynchronous responses (see the example below), track the job_id yourself until you’re ready to respond.

Building a job system

Let’s build a debate system where a moderator dispatches a topic to three agents, each arguing from a different perspective.

Worker agents

Each debate agent runs its own LLM pipeline. The LLM response arrives asynchronously through an event handler, so the agent tracks the current job ID until it has something to respond with:
from pipecat.workers.llm import LLMContextWorker
from pipecat.bus.messages import BusJobRequestMessage
from pipecat.frames.frames import LLMMessagesAppendFrame

class DebateWorker(LLMContextWorker):
    def __init__(self, role: str):
        llm = OpenAILLMService(
            api_key=os.getenv("OPENAI_API_KEY"),
            settings=OpenAILLMService.Settings(system_instruction=f"You argue as a {role}."),
        )
        super().__init__(role, llm=llm)
        self._role = role
        self._current_job_id: str | None = None

        @self.assistant_aggregator.event_handler("on_assistant_turn_stopped")
        async def on_assistant_turn_stopped(aggregator, message):
            if self._current_job_id:
                job_id = self._current_job_id
                self._current_job_id = None
                await self.send_job_response(job_id, {"role": self._role, "text": message.content})

    async def on_job_request(self, message: BusJobRequestMessage) -> None:
        await super().on_job_request(message)
        self._current_job_id = message.job_id
        await self.queue_frame(
            LLMMessagesAppendFrame(
                messages=[{"role": "developer", "content": f"Topic: {message.payload['topic']}"}],
                run_llm=True,
            )
        )
LLMContextWorker extends LLMWorker with a built-in LLMContext and aggregator pair. It builds the pipeline as [user_aggregator, llm, assistant_aggregator] automatically, so you don’t need to wire the context plumbing yourself. Access the aggregators via self.user_aggregator and self.assistant_aggregator.
The agent:
  1. Receives a job request and stores the job_id
  2. Injects the topic into its LLM context and runs the LLM
  3. When the LLM finishes its turn, the event handler sends the response with the stored job_id

Coordinator agent

The moderator triggers jobs via a tool and synthesizes the results:
from pipecat.workers.llm import LLMWorker, tool

class ModeratorAgent(LLMWorker):
    @tool(cancel_on_interruption=False)
    async def debate(self, params: FunctionCallParams, topic: str):
        """Analyze a topic from multiple perspectives.

        Args:
            topic (str): The topic to debate.
        """
        async with self.job_group("advocate", "critic", "analyst", payload={"topic": topic}, timeout=30) as jg:
            pass

        result = "\n\n".join(
            f"{r['role'].upper()}: {r['text']}" for r in jg.responses.values()
        )
        await params.result_callback(result)
Create the debate agents and the moderator, then register them all with runner.add_workers(ModeratorAgent("moderator", llm=...), DebateWorker("advocate"), DebateWorker("critic"), DebateWorker("analyst")). When the user says “debate whether AI should be regulated,” the moderator:
  1. The LLM calls the debate tool
  2. job_group() sends the topic to all three agents in parallel
  3. Each agent runs its own LLM and responds with its perspective
  4. The moderator collects all responses and returns them to the LLM
  5. The LLM synthesizes a balanced summary and speaks it to the user

Job lifecycle

1

Request

The requester calls job() or job_group(). The framework waits for agents to be ready, then sends a job request to each.
2

Process

Agents receive the request in on_job_request() and do their work.
3

Respond

Agents call send_job_response(job_id, ...) with their results.
4

Complete

When all agents respond (or timeout occurs), the context manager exits and results are available.
The framework also supports fire-and-forget jobs, progress updates, and streaming.

Job cancellation

Jobs can be cancelled in several ways. The framework handles cleanup automatically and notifies agents so they can stop in-progress work.

Automatic cancellation with context managers

When using job() or job_group(), cancellation happens automatically if the context block raises an exception. This includes tool interruptions (CancelledError) when using cancel_on_interruption=True:
# If an exception or CancelledError is raised inside the block,
# all agents are cancelled automatically
async with self.job_group("w1", "w2", payload=data) as jg:
    # ... if this raises, the agents get cancelled
    pass

Worker errors

By default (cancel_on_error=True), if any agent responds with an error status, the remaining agents in the group are cancelled and JobGroupError is raised:
try:
    async with self.job_group("w1", "w2", payload=data) as jg:
        pass
except JobGroupError as e:
    # An agent errored -- remaining agents were cancelled
    pass

Manual cancellation

For fire-and-forget jobs (using request_job()), you manage cancellation yourself by tracking job IDs:
job_ids = []
try:
    job_ids.append(await self.request_job("w1", payload={"job": 1}))
    job_ids.append(await self.request_job("w2", payload={"job": 2}))
    # ...
except asyncio.CancelledError:
    for jid in job_ids:
        await self.cancel_job_group(jid, reason="tool cancelled")

Handling cancellation on the agent side

When a job is cancelled, the agent’s on_job_cancelled hook fires. The framework automatically sends a CANCELLED response back to the requester, so you only need to override this hook if you have resources to clean up:
class MyWorker(BaseWorker):
    async def on_job_cancelled(self, message: BusJobCancelMessage) -> None:
        # Optional: clean up resources, stop in-progress work
        logger.info(f"Job {message.job_id} cancelled: {message.reason}")

Agent shutdown

When an agent stops with jobs still in flight, it automatically sends a CANCELLED response for each active job so requesters aren’t left waiting on a timeout.

What’s next

You now understand the core concepts of Pipecat’s multi-agent system: agents, the bus, handoff, and jobs. The Fundamentals section covers specific patterns in depth.

Voice-enabling a client UI

A concrete application of jobs: a UIWorker that sees and drives the user’s GUI over a two-way RTVI interface.

What's Next

Explore Fundamentals and advanced patterns