WorkerBus
Abstract base for inter-agent and runner-agent communication. Provides pub/sub messaging where each subscriber receives messages independently through its own priority queue. System messages (e.g. cancel) are delivered ahead of normal data messages.Methods
start
stop
subscribe
| Parameter | Type | Description |
|---|---|---|
subscriber | BusSubscriber | The subscriber to register |
unsubscribe
| Parameter | Type | Description |
|---|---|---|
subscriber | BusSubscriber | The subscriber to remove |
send
publish() for transport.
| Parameter | Type | Description |
|---|---|---|
message | BusMessage | The bus message to send |
publish
| Parameter | Type | Description |
|---|---|---|
message | BusMessage | The bus message to publish |
on_message_received
send() or from a network transport).
AsyncQueueBus
In-process bus that delivers messages via priority queues. This is the default bus used byWorkerRunner when no bus is provided.
RedisBus
Distributed agent bus backed by Redis pub/sub for cross-process communication.Requires the redis extra:
uv add "pipecat-ai[redis]"Configuration
A
redis.asyncio.Redis client instance.The
MessageSerializer
for encoding/decoding messages. Defaults to
JSONMessageSerializer.The Redis pub/sub channel name.
PgmqBus
Distributed agent bus backed by PGMQ (PostgreSQL Message Queue). Implements pub/sub fan-out on top of PGMQ’s point-to-point queue semantics by giving each bus instance its own queue and broadcasting on publish.Requires the pgmq extra:
uv add "pipecat-ai[pgmq]"- DirectPgmqBackend: Calls
pgmq.*directly with prefix-based peer discovery. Suitable when bus peers trust each other (single-tenant deployments, internal services). - IsolatedPgmqBackend: Uses SECURITY DEFINER Postgres wrappers for isolation. Suitable when bus peers should be isolated and the channel name is the bus capability.
DirectPgmqBackend example
IsolatedPgmqBackend example
Configuration
An initialized
PGMQueue client (call await pgmq.init() before passing).
Selects DirectPgmqBackend. Mutually exclusive with backend.A backend instance (e.g.
IsolatedPgmqBackend or custom). Mutually exclusive
with pgmq.The
MessageSerializer
for encoding/decoding messages. Defaults to
JSONMessageSerializer.Channel name. With DirectPgmqBackend this is sanitized into a queue-name
prefix. With IsolatedPgmqBackend it is the bus capability passed to every
wrapper call.
Seconds a read message stays invisible before redelivery.
Maximum messages to fetch per read.
Long-poll check interval in milliseconds. Backend may ignore if it doesn’t
expose this knob.
Maximum seconds the reader blocks per poll cycle.
Prefer a session-mode pooler when available. Transaction-mode pooling works for direct
pgmq.* calls but is fragile around the long-poll inside the SECURITY DEFINER bus_subscribe wrapper used by IsolatedPgmqBackend.The underlying connection pool must allow at least two concurrent connections (one for the reader’s long-poll, one for publishes).BusBridgeProcessor
Bidirectional mid-pipeline bridge between a Pipecat pipeline and the bus. Placed in a transport or session agent’s pipeline to exchange frames with other agents via theWorkerBus.
Configuration
The
WorkerBus to exchange frames with.Name of the owning agent, used as message source.
When set, only exchange frames with this agent.
Optional bridge name for routing. When set, outgoing frames are tagged with
this name and only incoming frames with the same bridge name are accepted.
Extra frame types that should never cross the bus (on top of lifecycle frames
which are always excluded).
BusSubscriber
Mixin for objects that receive messages from aWorkerBus. Implementors override on_bus_message() to handle incoming messages. Concrete subscribers must provide a name property (typically inherited from BaseObject) that uniquely identifies the subscriber on the bus.
Properties
Unique name identifying this subscriber on the bus. Built-in subscribers
inherit this from
BaseObject; custom implementations that extend
BusSubscriber directly must provide one.Methods
Override to handle an incoming bus message.