Skip to main content

What is the bus?

The agent bus is the communication backbone of every subagent system. All agents connect to the same bus and exchange typed messages for frame routing, lifecycle events, and task coordination. Think of it as an internal event bus — agents publish messages and other agents receive them through their subscriptions. The bus handles priority queuing so that urgent messages (like cancellations) are delivered before queued data.

Bus implementations

Pipecat Subagents provides two bus implementations:

AsyncQueueBus (local)

The default bus, created automatically by AgentRunner when you don’t provide one. It uses asyncio queues for in-process communication with no serialization overhead.
runner = AgentRunner()  # Creates AsyncQueueBus automatically
This is all you need for single-process applications where all agents run together.

RedisBus (distributed)

For distributed setups where agents run in separate processes or on different machines, use RedisBus. It uses Redis pub/sub to relay messages across process boundaries.
from redis.asyncio import Redis
from pipecat_subagents.bus.network.redis import RedisBus

redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="pipecat:my-app")
runner = AgentRunner(bus=bus)
All processes that share the same Redis channel can exchange messages. The programming model stays the same — your agent code doesn’t change between local and distributed setups.
RedisBus requires the redis extra: pip install pipecat-ai-subagents[redis]

Message types

Messages on the bus fall into three categories:

Data messages

Normal-priority messages that carry data between agents. The most important one is BusFrameMessage, which wraps a Pipecat frame (audio, text, etc.) for transport across the bus.

System messages

High-priority messages for lifecycle events: activation, deactivation, shutdown, and agent readiness. These are delivered before data messages in the queue.

Task messages

Messages for coordinating work between agents: task requests, responses, progress updates, streaming, and cancellation.

Local messages

Some messages are local-only and never cross process boundaries. For example, child agent errors stay local to the parent. This keeps internal state from leaking across distributed runners.

Message routing

Messages have source and target fields:
  • Targeted messages (with a specific target) are delivered only to the named agent
  • Broadcast messages (with no target) are delivered to all subscribers
The bus handles this routing automatically. When you call activate_agent("greeter"), it sends a BusActivateAgentMessage targeted at "greeter" — only that agent receives it.

The agent registry

The runner maintains an AgentRegistry that tracks which agents are available. To get notified when an agent is ready, use the @agent_ready decorator (or call watch_agent() explicitly):
from pipecat_subagents.agents import agent_ready

class MainAgent(BaseAgent):
    @agent_ready(name="greeter")
    async def on_greeter_ready(self, data: AgentReadyData) -> None:
        await self.activate_agent("greeter")
The framework automatically calls watch_agent() for each @agent_ready handler when the agent starts. If the watched agent is already registered, the handler fires immediately, so you don’t need to worry about race conditions.