> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# The Worker Bus

> How workers communicate through the shared message bus.

## What is the bus?

The worker bus is the communication backbone of every multi-worker system. All workers connect to the same bus and exchange typed messages for frame routing, lifecycle events, and job coordination.

Think of it as an internal event bus -- workers publish messages and other workers receive them through their subscriptions. The bus handles priority queuing so that urgent messages (like cancellations) are delivered before queued data.

## Bus implementations

Pipecat provides three bus implementations:

### AsyncQueueBus (local)

The default bus, created automatically by `WorkerRunner` when you don't provide one. It uses asyncio queues for in-process communication with no serialization overhead.

```python theme={null}
runner = WorkerRunner()  # Creates AsyncQueueBus automatically
```

This is all you need for single-process applications where all workers run together.

### RedisBus (distributed)

For distributed setups where workers run in separate processes or on different machines, use `RedisBus`. It uses Redis pub/sub to relay messages across process boundaries.

```python theme={null}
from redis.asyncio import Redis
from pipecat.bus.network.redis import RedisBus

redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="pipecat:my-app")
runner = WorkerRunner(bus=bus)
```

All processes that share the same Redis channel can exchange messages. The programming model stays the same -- your agent code doesn't change between local and distributed setups.

<Info>`RedisBus` requires the `redis` extra: `uv add "pipecat-ai[redis]"`</Info>

### PgmqBus (distributed)

An alternative distributed bus backed by PGMQ (PostgreSQL Message Queue). Each instance creates its own queue and broadcasts to peer queues.

```python theme={null}
from pgmq.async_queue import PGMQueue
from pipecat.bus.network.pgmq import PgmqBus

pgmq = PGMQueue(
    host="localhost",
    port="5432",
    database="postgres",
    username="postgres",
    password="...",
    pool_size=4,
)
await pgmq.init()
bus = PgmqBus(pgmq=pgmq, channel="pipecat:my-app")
runner = WorkerRunner(bus=bus)
```

<Info>`PgmqBus` requires the `pgmq` extra: `uv add "pipecat-ai[pgmq]"`</Info>

## Message types

Messages on the bus fall into four categories:

### Data messages

Normal-priority messages that carry data between agents. The most important one is `BusFrameMessage`, which wraps a Pipecat frame (audio, text, etc.) for transport across the bus.

### System messages

High-priority messages for lifecycle events: activation, deactivation, shutdown, and worker readiness. These are delivered before data messages in the queue.

### Job messages

Messages for coordinating work between agents: job requests, responses, progress updates, streaming, and cancellation.

### Local messages

Some messages are local-only and never cross process boundaries. For example, child agent errors stay local to the parent. This keeps internal state from leaking across distributed runners.

## Message routing

Messages have `source` and `target` fields:

* **Targeted messages** (with a specific `target`) are delivered only to the named agent
* **Broadcast messages** (with no `target`) are delivered to all subscribers

The bus handles this routing automatically. When you call `activate_worker("greeter")`, it sends a `BusActivateWorkerMessage` targeted at `"greeter"` -- only that agent receives it.

## The agent registry

The runner maintains a `WorkerRegistry` that tracks which agents are available. To get notified when an agent is ready, use the `@worker_ready` decorator (or call `watch_workers()` explicitly):

```python theme={null}
from pipecat.pipeline.base_worker import BaseWorker
from pipecat.pipeline.worker_ready_decorator import worker_ready
from pipecat.registry.types import WorkerReadyData

class MainAgent(BaseWorker):
    @worker_ready(name="greeter")
    async def on_greeter_ready(self, data: WorkerReadyData) -> None:
        await self.activate_worker("greeter")
```

The framework automatically calls `watch_workers()` for each `@worker_ready` handler when the agent starts. If the watched agent is already registered, the handler fires immediately, so you don't need to worry about race conditions.
