Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt

Use this file to discover all available pages before exploring further.

What is the bus?

The worker bus is the communication backbone of every multi-worker system. All workers connect to the same bus and exchange typed messages for frame routing, lifecycle events, and job coordination. Think of it as an internal event bus — workers publish messages and other workers receive them through their subscriptions. The bus handles priority queuing so that urgent messages (like cancellations) are delivered before queued data.

Bus implementations

Pipecat provides three bus implementations:

AsyncQueueBus (local)

The default bus, created automatically by WorkerRunner when you don’t provide one. It uses asyncio queues for in-process communication with no serialization overhead.
runner = WorkerRunner()  # Creates AsyncQueueBus automatically
This is all you need for single-process applications where all workers run together.

RedisBus (distributed)

For distributed setups where workers run in separate processes or on different machines, use RedisBus. It uses Redis pub/sub to relay messages across process boundaries.
from redis.asyncio import Redis
from pipecat.bus.network.redis import RedisBus

redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="pipecat:my-app")
runner = WorkerRunner(bus=bus)
All processes that share the same Redis channel can exchange messages. The programming model stays the same — your agent code doesn’t change between local and distributed setups.
RedisBus requires the redis extra: uv add "pipecat-ai[redis]"

PgmqBus (distributed)

An alternative distributed bus backed by PGMQ (PostgreSQL Message Queue). Each instance creates its own queue and broadcasts to peer queues.
from pgmq.async_queue import PGMQueue
from pipecat.bus.network.pgmq import PgmqBus

pgmq = PGMQueue(
    host="localhost",
    port="5432",
    database="postgres",
    username="postgres",
    password="...",
    pool_size=4,
)
await pgmq.init()
bus = PgmqBus(pgmq=pgmq, channel="pipecat:my-app")
runner = WorkerRunner(bus=bus)
PgmqBus requires the pgmq extra: uv add "pipecat-ai[pgmq]"

Message types

Messages on the bus fall into four categories:

Data messages

Normal-priority messages that carry data between agents. The most important one is BusFrameMessage, which wraps a Pipecat frame (audio, text, etc.) for transport across the bus.

System messages

High-priority messages for lifecycle events: activation, deactivation, shutdown, and worker readiness. These are delivered before data messages in the queue.

Job messages

Messages for coordinating work between agents: job requests, responses, progress updates, streaming, and cancellation.

Local messages

Some messages are local-only and never cross process boundaries. For example, child agent errors stay local to the parent. This keeps internal state from leaking across distributed runners.

Message routing

Messages have source and target fields:
  • Targeted messages (with a specific target) are delivered only to the named agent
  • Broadcast messages (with no target) are delivered to all subscribers
The bus handles this routing automatically. When you call activate_worker("greeter"), it sends a BusActivateWorkerMessage targeted at "greeter" — only that agent receives it.

The agent registry

The runner maintains a WorkerRegistry that tracks which agents are available. To get notified when an agent is ready, use the @worker_ready decorator (or call watch_workers() explicitly):
from pipecat.pipeline.base_worker import BaseWorker
from pipecat.pipeline.worker_ready_decorator import worker_ready
from pipecat.registry.types import WorkerReadyData

class MainAgent(BaseWorker):
    @worker_ready(name="greeter")
    async def on_greeter_ready(self, data: WorkerReadyData) -> None:
        await self.activate_worker("greeter")
The framework automatically calls watch_workers() for each @worker_ready handler when the agent starts. If the watched agent is already registered, the handler fires immediately, so you don’t need to worry about race conditions.