> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Bus

> Bus infrastructure for inter-agent messaging

## AgentBus

Abstract base for inter-agent and runner-agent communication. Provides pub/sub messaging where each subscriber receives messages independently through its own priority queue. System messages (e.g. cancel) are delivered before normal data messages.

```python theme={null}
from pipecat_subagents.bus import AgentBus
```

### Methods

#### start

```python theme={null}
async def start(self) -> None
```

Start dispatch tasks for all registered subscribers.

#### stop

```python theme={null}
async def stop(self) -> None
```

Stop all dispatch tasks.

#### subscribe

```python theme={null}
async def subscribe(self, subscriber: BusSubscriber) -> None
```

Register a subscriber to receive messages from the bus.

| Parameter    | Type            | Description                |
| ------------ | --------------- | -------------------------- |
| `subscriber` | `BusSubscriber` | The subscriber to register |

#### unsubscribe

```python theme={null}
async def unsubscribe(self, subscriber: BusSubscriber) -> None
```

Remove a subscriber and cancel its dispatch task.

| Parameter    | Type            | Description              |
| ------------ | --------------- | ------------------------ |
| `subscriber` | `BusSubscriber` | The subscriber to remove |

#### send

```python theme={null}
async def send(self, message: BusMessage) -> None
```

Send a message through the bus. Local-only messages are delivered directly to subscribers. All other messages are passed to `publish()` for transport.

| Parameter | Type                                                      | Description             |
| --------- | --------------------------------------------------------- | ----------------------- |
| `message` | [`BusMessage`](/api-reference/pipecat-subagents/messages) | The bus message to send |

#### publish

```python theme={null}
@abstractmethod
async def publish(self, message: BusMessage) -> None
```

Publish a message to the transport. Subclasses implement this for the specific transport (e.g. in-process queue, Redis pub/sub).

| Parameter | Type                                                      | Description                |
| --------- | --------------------------------------------------------- | -------------------------- |
| `message` | [`BusMessage`](/api-reference/pipecat-subagents/messages) | The bus message to publish |

#### on\_message\_received

```python theme={null}
def on_message_received(self, message: BusMessage) -> None
```

Deliver a message to all local subscribers via their priority queues. Called by bus implementations when a message arrives (either from a local `send()` or from a network transport).

## AsyncQueueBus

In-process bus that delivers messages via priority queues. This is the default bus used by [`AgentRunner`](/api-reference/pipecat-subagents/agent-runner) when no bus is provided.

```python theme={null}
from pipecat_subagents.bus import AsyncQueueBus

bus = AsyncQueueBus()
runner = AgentRunner(bus=bus)
```

Suitable for single-process setups where all agents run in the same Python process.

## RedisBus

Distributed agent bus backed by Redis pub/sub for cross-process communication.

<Note>Requires the redis extra: `uv add "pipecat-ai-subagents[redis]"`</Note>

```python theme={null}
from redis.asyncio import Redis
from pipecat_subagents.bus.network.redis import RedisBus

redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="my-session")
runner = AgentRunner(bus=bus)
```

### Configuration

<ParamField path="redis" type="Redis" required>
  A `redis.asyncio.Redis` client instance.
</ParamField>

<ParamField path="serializer" type="MessageSerializer | None" default="None">
  The
  [`MessageSerializer`](/api-reference/pipecat-subagents/serializers#messageserializer)
  for encoding/decoding messages. Defaults to
  [`JSONMessageSerializer`](/api-reference/pipecat-subagents/serializers#jsonmessageserializer).
</ParamField>

<ParamField path="channel" type="str" default="pipecat:bus">
  The Redis pub/sub channel name.
</ParamField>

## BusBridgeProcessor

Bidirectional mid-pipeline bridge between a Pipecat pipeline and the bus. Placed in a transport or session agent's pipeline to exchange frames with other agents via the `AgentBus`.

```python theme={null}
from pipecat_subagents.bus import BusBridgeProcessor

bridge = BusBridgeProcessor(
    bus=runner.bus,
    agent_name="session",
    bridge="voice",
)

pipeline = Pipeline([transport.input(), bridge, transport.output()])
```

### Configuration

<ParamField path="bus" type="AgentBus" required>
  The `AgentBus` to exchange frames with.
</ParamField>

<ParamField path="agent_name" type="str" required>
  Name of this agent, used as message source.
</ParamField>

<ParamField path="target_agent" type="str | None" default="None">
  When set, only exchange frames with this agent.
</ParamField>

<ParamField path="bridge" type="str | None" default="None">
  Optional bridge name for routing. When set, outgoing frames are tagged with
  this name and only incoming frames with the same bridge name are accepted.
</ParamField>

<ParamField path="exclude_frames" type="tuple[type[Frame], ...] | None" default="None">
  Extra frame types that should never cross the bus (on top of lifecycle frames
  which are always excluded).
</ParamField>

## BusSubscriber

Mixin for objects that receive messages from an `AgentBus`. Implementors override `on_bus_message()` to handle incoming messages. Concrete subscribers must provide a `name` property (typically inherited from `BaseObject`) that uniquely identifies the subscriber on the bus.

```python theme={null}
from pipecat_subagents.bus.subscriber import BusSubscriber

class MySubscriber(BusSubscriber):
    @property
    def name(self) -> str:
        return "my_subscriber"

    async def on_bus_message(self, message: BusMessage) -> None:
        ...
```

### Properties

<ParamField path="name" type="str" required>
  Unique name identifying this subscriber on the bus. Built-in subscribers inherit this from `BaseObject`; custom implementations that extend `BusSubscriber` directly must provide one.
</ParamField>

### Methods

<ParamField path="on_bus_message" type="async (message: BusMessage) -> None">
  Override to handle an incoming bus message.
</ParamField>
