Overview
AgentRunner manages agent pipelines, coordinates startup and shutdown, and responds to bus messages. It owns the shared AgentRegistry and AgentBus for all agents in the system.
from pipecat_subagents.runner import AgentRunner
runner = AgentRunner()
agent = MyAgent("my_agent", bus=runner.bus, ...)
await runner.add_agent(agent)
await runner.run()
Configuration
name
Optional[str]
default:"None"
Unique name for this runner. Defaults to a UUID-based name. Must be unique
across all runners in a distributed setup.
bus
Optional[AgentBus]
default:"None"
Whether to handle SIGINT for graceful shutdown.
Properties
bus
The bus instance for agent communication. Pass this to agents when creating them.
registry
runner.registry -> AgentRegistry
The shared agent registry.
Methods
add_agent
async def add_agent(self, agent: BaseAgent) -> None
Add an agent to this runner. Can be called before or after run(). When called after run() has started, the agent’s pipeline task is created and started immediately.
| Parameter | Type | Description |
|---|
agent | BaseAgent | The agent to add |
run
async def run(self) -> None
Start all agents and block until shutdown. Starts all registered agents, fires the on_ready event handler, then blocks until end() or cancel() is called. New agents can be added dynamically via add_agent() after run() has started.
end
async def end(self, reason: Optional[str] = None) -> None
Gracefully end all agents and shut down. Ends root agents first; parent agents propagate shutdown to their children automatically. Idempotent.
| Parameter | Type | Default | Description |
|---|
reason | Optional[str] | None | Human-readable reason for ending |
cancel
async def cancel(self, reason: Optional[str] = None) -> None
Immediately cancel all agents and shut down. Cancels root agents first; parent agents propagate cancellation to their children automatically. Idempotent.
| Parameter | Type | Default | Description |
|---|
reason | Optional[str] | None | Human-readable reason for cancelling |
Event Handlers
@runner.event_handler("on_ready")
async def on_ready():
print("All agents started")
@runner.event_handler("on_error")
async def on_error(error: str):
print(f"Runner error: {error}")
| Event | Description |
|---|
on_ready | Fired after all registered agents have been started |
on_error | Fired when the runner encounters an error |