What is the bus?
The agent bus is the communication backbone of every subagent system. All agents connect to the same bus and exchange typed messages for frame routing, lifecycle events, and task coordination.
Think of it as an internal event bus — agents publish messages and other agents receive them through their subscriptions. The bus handles priority queuing so that urgent messages (like cancellations) are delivered before queued data.
Bus implementations
Pipecat Subagents provides two bus implementations:
AsyncQueueBus (local)
The default bus, created automatically by AgentRunner when you don’t provide one. It uses asyncio queues for in-process communication with no serialization overhead.
runner = AgentRunner() # Creates AsyncQueueBus automatically
This is all you need for single-process applications where all agents run together.
RedisBus (distributed)
For distributed setups where agents run in separate processes or on different machines, use RedisBus. It uses Redis pub/sub to relay messages across process boundaries.
from redis.asyncio import Redis
from pipecat_subagents.bus.network.redis import RedisBus
redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="pipecat:my-app")
runner = AgentRunner(bus=bus)
All processes that share the same Redis channel can exchange messages. The programming model stays the same — your agent code doesn’t change between local and distributed setups.
RedisBus requires the redis extra: pip install pipecat-ai-subagents[redis]
Message types
Messages on the bus fall into three categories:
Data messages
Normal-priority messages that carry data between agents. The most important one is BusFrameMessage, which wraps a Pipecat frame (audio, text, etc.) for transport across the bus.
System messages
High-priority messages for lifecycle events: activation, deactivation, shutdown, and agent readiness. These are delivered before data messages in the queue.
Task messages
Messages for coordinating work between agents: task requests, responses, progress updates, streaming, and cancellation.
Local messages
Some messages are local-only and never cross process boundaries. For example, child agent errors stay local to the parent. This keeps internal state from leaking across distributed runners.
Message routing
Messages have source and target fields:
- Targeted messages (with a specific
target) are delivered only to the named agent
- Broadcast messages (with no
target) are delivered to all subscribers
The bus handles this routing automatically. When you call activate_agent("greeter"), it sends a BusActivateAgentMessage targeted at "greeter" — only that agent receives it.
The agent registry
The runner maintains an AgentRegistry that tracks which agents are available. To get notified when an agent is ready, use the @agent_ready decorator (or call watch_agent() explicitly):
from pipecat_subagents.agents import agent_ready
class MainAgent(BaseAgent):
@agent_ready(name="greeter")
async def on_greeter_ready(self, data: AgentReadyData) -> None:
await self.activate_agent("greeter")
The framework automatically calls watch_agent() for each @agent_ready handler when the agent starts. If the watched agent is already registered, the handler fires immediately, so you don’t need to worry about race conditions.