Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt

Use this file to discover all available pages before exploring further.

Overview

WorkerRunner runs agents to completion and acts as the host for a multi-agent session. It owns the shared WorkerBus and WorkerRegistry that back the whole session, and responds to bus messages for lifecycle coordination.
from pipecat.workers.runner import WorkerRunner
PipelineRunner is a deprecated alias for WorkerRunner; existing code that uses it keeps working but should migrate to WorkerRunner.
runner = WorkerRunner()

agent = MyAgent("my_agent", ...)
await runner.add_workers(agent)

await runner.run()
Adding an agent with add_workers() attaches it to the runner’s bus and registry, so agents do not take a bus in their constructor.

Configuration

name
str | None
default:"None"
Unique name for this runner. Defaults to a UUID-based name. Must be unique across all runners in a distributed setup.
bus
WorkerBus | None
default:"None"
The WorkerBus instance. Creates an AsyncQueueBus if not provided.
handle_sigint
bool
default:"True"
Whether to automatically handle SIGINT signals for graceful shutdown.
handle_sigterm
bool
default:"False"
Whether to automatically handle SIGTERM signals for graceful shutdown.
force_gc
bool
default:"False"
Whether to force garbage collection after the main worker completes.
loop
asyncio.AbstractEventLoop | None
default:"None"
Event loop to use. If None, uses the current running loop.

Properties

bus

runner.bus -> WorkerBus
The bus this runner hosts, shared across all added agents.

registry

runner.registry -> WorkerRegistry
The shared agent registry this runner owns.

Methods

add_workers

async def add_workers(self, *workers: BaseWorker) -> None
Add one or more agents to this runner. Each agent is attached to the runner’s bus and registry and started in the background. Can be called before or after run(). When called after run() has started, each agent starts immediately; otherwise agents are queued and started during run setup.
ParameterTypeDescription
*workersBaseWorkerOne or more agents to add

run

async def run(self, worker: BaseWorker | None = None, *, auto_end: bool = True) -> None
Run all added agents and block until the runner is stopped. By default (auto_end=True), the runner ends once every root agent has finished. For long-lived hosts that add and remove agents over many sessions, pass auto_end=False so the runner does not exit when no agents are left.
ParameterTypeDefaultDescription
workerBaseWorker | NoneNoneOptional agent to run. Deprecated — register with add_workers() before calling run() instead.
auto_endboolTrueWhen True, the runner ends once every root agent has finished. When False, it blocks until end() or cancel() is called.

end

async def end(self, reason: str | None = None) -> None
Gracefully end all agents and shut down. Ends root agents first; parent agents propagate shutdown to their children automatically. Idempotent.
ParameterTypeDefaultDescription
reasonstr | NoneNoneHuman-readable reason for ending

cancel

async def cancel(self, reason: str | None = None) -> None
Immediately cancel all agents and shut down. Cancels root agents first; parent agents propagate cancellation to their children automatically. Idempotent.
ParameterTypeDefaultDescription
reasonstr | NoneNoneHuman-readable reason for cancelling

Event Handlers

@runner.event_handler("on_ready")
async def on_ready(runner):
    print("All agents started")

@runner.event_handler("on_error")
async def on_error(runner, error):
    print(f"Runner error: {error}")
EventDescription
on_readyFired after the runner has finished setup and any added agents have started
on_errorFired when starting an added agent fails