Skip to main content

AgentBus

Abstract base for inter-agent and runner-agent communication. Provides pub/sub messaging where each subscriber receives messages independently through its own priority queue. System messages (e.g. cancel) are delivered before normal data messages.
from pipecat_subagents.bus import AgentBus

Methods

start

async def start(self) -> None
Start dispatch tasks for all registered subscribers.

stop

async def stop(self) -> None
Stop all dispatch tasks.

subscribe

async def subscribe(self, subscriber: BusSubscriber) -> None
Register a subscriber to receive messages from the bus.
ParameterTypeDescription
subscriberBusSubscriberThe subscriber to register

unsubscribe

async def unsubscribe(self, subscriber: BusSubscriber) -> None
Remove a subscriber and cancel its dispatch task.
ParameterTypeDescription
subscriberBusSubscriberThe subscriber to remove

send

async def send(self, message: BusMessage) -> None
Send a message through the bus. Local-only messages are delivered directly to subscribers. All other messages are passed to publish() for transport.
ParameterTypeDescription
messageBusMessageThe bus message to send

publish

@abstractmethod
async def publish(self, message: BusMessage) -> None
Publish a message to the transport. Subclasses implement this for the specific transport (e.g. in-process queue, Redis pub/sub).
ParameterTypeDescription
messageBusMessageThe bus message to publish

on_message_received

def on_message_received(self, message: BusMessage) -> None
Deliver a message to all local subscribers via their priority queues. Called by bus implementations when a message arrives (either from a local send() or from a network transport).

AsyncQueueBus

In-process bus that delivers messages via priority queues. This is the default bus used by AgentRunner when no bus is provided.
from pipecat_subagents.bus import AsyncQueueBus

bus = AsyncQueueBus()
runner = AgentRunner(bus=bus)
Suitable for single-process setups where all agents run in the same Python process.

RedisBus

Distributed agent bus backed by Redis pub/sub for cross-process communication.
Requires the redis extra: pip install pipecat-ai-subagents[redis]
from redis.asyncio import Redis
from pipecat_subagents.bus.network.redis import RedisBus

redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="my-session")
runner = AgentRunner(bus=bus)

Configuration

redis
Redis
required
A redis.asyncio.Redis client instance.
serializer
Optional[MessageSerializer]
default:"None"
The MessageSerializer for encoding/decoding messages. Defaults to JSONMessageSerializer.
channel
str
default:"pipecat:bus"
The Redis pub/sub channel name.

BusBridgeProcessor

Bidirectional mid-pipeline bridge between a Pipecat pipeline and the bus. Placed in a transport or session agent’s pipeline to exchange frames with other agents via the AgentBus.
from pipecat_subagents.bus import BusBridgeProcessor

bridge = BusBridgeProcessor(
    bus=runner.bus,
    agent_name="session",
    bridge="voice",
)

pipeline = Pipeline([transport.input(), bridge, transport.output()])

Configuration

bus
AgentBus
required
The AgentBus to exchange frames with.
agent_name
str
required
Name of this agent, used as message source.
target_agent
Optional[str]
default:"None"
When set, only exchange frames with this agent.
bridge
Optional[str]
default:"None"
Optional bridge name for routing. When set, outgoing frames are tagged with this name and only incoming frames with the same bridge name are accepted.
exclude_frames
Optional[tuple[type[Frame], ...]]
default:"None"
Extra frame types that should never cross the bus (on top of lifecycle frames which are always excluded).

BusSubscriber

Mixin for objects that receive messages from an AgentBus. Implementors override on_bus_message() to handle incoming messages.
from pipecat_subagents.bus.subscriber import BusSubscriber

class MySubscriber(BusSubscriber):
    async def on_bus_message(self, message: BusMessage) -> None:
        ...