Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt

Use this file to discover all available pages before exploring further.

AgentBus

Abstract base for inter-agent and runner-agent communication. Provides pub/sub messaging where each subscriber receives messages independently through its own priority queue. System messages (e.g. cancel) are delivered before normal data messages.
from pipecat_subagents.bus import AgentBus

Methods

start

async def start(self) -> None
Start dispatch tasks for all registered subscribers.

stop

async def stop(self) -> None
Stop all dispatch tasks.

subscribe

async def subscribe(self, subscriber: BusSubscriber) -> None
Register a subscriber to receive messages from the bus.
ParameterTypeDescription
subscriberBusSubscriberThe subscriber to register

unsubscribe

async def unsubscribe(self, subscriber: BusSubscriber) -> None
Remove a subscriber and cancel its dispatch task.
ParameterTypeDescription
subscriberBusSubscriberThe subscriber to remove

send

async def send(self, message: BusMessage) -> None
Send a message through the bus. Local-only messages are delivered directly to subscribers. All other messages are passed to publish() for transport.
ParameterTypeDescription
messageBusMessageThe bus message to send

publish

@abstractmethod
async def publish(self, message: BusMessage) -> None
Publish a message to the transport. Subclasses implement this for the specific transport (e.g. in-process queue, Redis pub/sub).
ParameterTypeDescription
messageBusMessageThe bus message to publish

on_message_received

def on_message_received(self, message: BusMessage) -> None
Deliver a message to all local subscribers via their priority queues. Called by bus implementations when a message arrives (either from a local send() or from a network transport).

AsyncQueueBus

In-process bus that delivers messages via priority queues. This is the default bus used by AgentRunner when no bus is provided.
from pipecat_subagents.bus import AsyncQueueBus

bus = AsyncQueueBus()
runner = AgentRunner(bus=bus)
Suitable for single-process setups where all agents run in the same Python process.

RedisBus

Distributed agent bus backed by Redis pub/sub for cross-process communication.
Requires the redis extra: uv add "pipecat-ai-subagents[redis]"
from redis.asyncio import Redis
from pipecat_subagents.bus.network.redis import RedisBus

redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="my-session")
runner = AgentRunner(bus=bus)

Configuration

redis
Redis
required
A redis.asyncio.Redis client instance.
serializer
MessageSerializer | None
default:"None"
The MessageSerializer for encoding/decoding messages. Defaults to JSONMessageSerializer.
channel
str
default:"pipecat:bus"
The Redis pub/sub channel name.

PgmqBus

Distributed agent bus backed by PGMQ (PostgreSQL Message Queue). Implements pub/sub fan-out on top of PGMQ’s point-to-point queue semantics by giving each bus instance its own queue and broadcasting on publish.
Requires the pgmq extra: uv add "pipecat-ai-subagents[pgmq]"
PgmqBus supports two backend modes:
  • DirectPgmqBackend: Calls pgmq.* directly with prefix-based peer discovery. Suitable when bus peers trust each other (single-tenant deployments, internal services).
  • IsolatedPgmqBackend: Uses SECURITY DEFINER Postgres wrappers for isolation. Suitable when bus peers should be isolated and the channel name is the bus capability.

DirectPgmqBackend example

from pgmq.async_queue import PGMQueue
from pipecat_subagents.bus.network.pgmq import PgmqBus

pgmq = PGMQueue(
    host="localhost",
    port="5432",
    database="postgres",
    username="postgres",
    password="...",
    pool_size=4,
)
await pgmq.init()
bus = PgmqBus(pgmq=pgmq, channel="pipecat_acme")

IsolatedPgmqBackend example

import asyncpg
from pipecat_subagents.bus.network.pgmq import PgmqBus
from pipecat_subagents.bus.network.pgmq_backends import IsolatedPgmqBackend

pool = await asyncpg.create_pool(
    host="localhost",
    port=5432,
    database="postgres",
    user="postgres",
    password="...",
    min_size=2,
    max_size=4,
)
backend = IsolatedPgmqBackend(pool=pool)
bus = PgmqBus(backend=backend, channel="pipecat_acme")

Configuration

pgmq
PGMQueue | None
default:"None"
An initialized PGMQueue client (call await pgmq.init() before passing). Selects DirectPgmqBackend. Mutually exclusive with backend.
backend
PgmqBackend | None
default:"None"
A backend instance (e.g. IsolatedPgmqBackend or custom). Mutually exclusive with pgmq.
serializer
MessageSerializer | None
default:"None"
The MessageSerializer for encoding/decoding messages. Defaults to JSONMessageSerializer.
channel
str
default:"pipecat_bus"
Channel name. With DirectPgmqBackend this is sanitized into a queue-name prefix. With IsolatedPgmqBackend it is the bus capability passed to every wrapper call.
visibility_timeout
int
default:"30"
Seconds a read message stays invisible before redelivery.
batch_size
int
default:"10"
Maximum messages to fetch per read.
poll_interval_ms
int
default:"100"
Long-poll check interval in milliseconds. Backend may ignore if it doesn’t expose this knob.
max_poll_seconds
int
default:"5"
Maximum seconds the reader blocks per poll cycle.
Prefer a session-mode pooler when available. Transaction-mode pooling works for direct pgmq.* calls but is fragile around the long-poll inside the SECURITY DEFINER bus_subscribe wrapper used by IsolatedPgmqBackend.The underlying connection pool must allow at least two concurrent connections (one for the reader’s long-poll, one for publishes).

BusBridgeProcessor

Bidirectional mid-pipeline bridge between a Pipecat pipeline and the bus. Placed in a transport or session agent’s pipeline to exchange frames with other agents via the AgentBus.
from pipecat_subagents.bus import BusBridgeProcessor

bridge = BusBridgeProcessor(
    bus=runner.bus,
    agent_name="session",
    bridge="voice",
)

pipeline = Pipeline([transport.input(), bridge, transport.output()])

Configuration

bus
AgentBus
required
The AgentBus to exchange frames with.
agent_name
str
required
Name of this agent, used as message source.
target_agent
str | None
default:"None"
When set, only exchange frames with this agent.
bridge
str | None
default:"None"
Optional bridge name for routing. When set, outgoing frames are tagged with this name and only incoming frames with the same bridge name are accepted.
exclude_frames
tuple[type[Frame], ...] | None
default:"None"
Extra frame types that should never cross the bus (on top of lifecycle frames which are always excluded).

BusSubscriber

Mixin for objects that receive messages from an AgentBus. Implementors override on_bus_message() to handle incoming messages. Concrete subscribers must provide a name property (typically inherited from BaseObject) that uniquely identifies the subscriber on the bus.
from pipecat_subagents.bus.subscriber import BusSubscriber

class MySubscriber(BusSubscriber):
    @property
    def name(self) -> str:
        return "my_subscriber"

    async def on_bus_message(self, message: BusMessage) -> None:
        ...

Properties

name
str
required
Unique name identifying this subscriber on the bus. Built-in subscribers inherit this from BaseObject; custom implementations that extend BusSubscriber directly must provide one.

Methods

on_bus_message
async (message: BusMessage) -> None
Override to handle an incoming bus message.