Skip to main content

Overview

AgentRunner manages agent pipelines, coordinates startup and shutdown, and responds to bus messages. It owns the shared AgentRegistry and AgentBus for all agents in the system.
from pipecat_subagents.runner import AgentRunner
runner = AgentRunner()

agent = MyAgent("my_agent", bus=runner.bus, ...)
await runner.add_agent(agent)

await runner.run()

Configuration

name
Optional[str]
default:"None"
Unique name for this runner. Defaults to a UUID-based name. Must be unique across all runners in a distributed setup.
bus
Optional[AgentBus]
default:"None"
The AgentBus instance. Creates an AsyncQueueBus if not provided.
handle_sigint
bool
default:"True"
Whether to handle SIGINT for graceful shutdown.

Properties

bus

runner.bus -> AgentBus
The bus instance for agent communication. Pass this to agents when creating them.

registry

runner.registry -> AgentRegistry
The shared agent registry.

Methods

add_agent

async def add_agent(self, agent: BaseAgent) -> None
Add an agent to this runner. Can be called before or after run(). When called after run() has started, the agent’s pipeline task is created and started immediately.
ParameterTypeDescription
agentBaseAgentThe agent to add

run

async def run(self) -> None
Start all agents and block until shutdown. Starts all registered agents, fires the on_ready event handler, then blocks until end() or cancel() is called. New agents can be added dynamically via add_agent() after run() has started.

end

async def end(self, reason: Optional[str] = None) -> None
Gracefully end all agents and shut down. Ends root agents first; parent agents propagate shutdown to their children automatically. Idempotent.
ParameterTypeDefaultDescription
reasonOptional[str]NoneHuman-readable reason for ending

cancel

async def cancel(self, reason: Optional[str] = None) -> None
Immediately cancel all agents and shut down. Cancels root agents first; parent agents propagate cancellation to their children automatically. Idempotent.
ParameterTypeDefaultDescription
reasonOptional[str]NoneHuman-readable reason for cancelling

Event Handlers

@runner.event_handler("on_ready")
async def on_ready():
    print("All agents started")

@runner.event_handler("on_error")
async def on_error(error: str):
    print(f"Runner error: {error}")
EventDescription
on_readyFired after all registered agents have been started
on_errorFired when the runner encounters an error