Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Distributed agents are agents connected to the same bus but running in different processes or on different machines. By default, all agents run in a single process using a local bus. For distributed setups, swap to a network bus — all agents share the same channel and discover each other automatically. Your agent code stays the same.
Distributed agents share a bus. If you need to connect agents on different buses (separate networks, third-party services), see Proxy Agents instead.

Setting up a distributed bus

Pipecat provides two distributed bus implementations: RedisBus and PgmqBus. Choose based on your infrastructure.

RedisBus

Each process creates its own WorkerRunner with a RedisBus connected to the same Redis channel:
from redis.asyncio import Redis
from pipecat.bus.network.redis import RedisBus
from pipecat.workers.runner import WorkerRunner

redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="pipecat:my-app")
runner = WorkerRunner(bus=bus, handle_sigint=True)
All runners sharing the same channel can exchange messages. Agents discover each other automatically through registry snapshots.
Install the Redis extra: uv add "pipecat-ai[redis]"

PgmqBus

Alternatively, use PgmqBus backed by PostgreSQL Message Queue:
from pgmq.async_queue import PGMQueue
from pipecat.bus.network.pgmq import PgmqBus
from pipecat.workers.runner import WorkerRunner

pgmq = PGMQueue(
    host="localhost",
    port="5432",
    database="postgres",
    username="postgres",
    password="...",
    pool_size=4,
)
await pgmq.init()
bus = PgmqBus(pgmq=pgmq, channel="pipecat:my-app")
runner = WorkerRunner(bus=bus, handle_sigint=True)
Install the PGMQ extra: uv add "pipecat-ai[pgmq]"

Example: distributed handoff

This example splits the two-agent handoff across separate processes. The main agent handles transport on one machine, and LLM agents run independently on other machines.

Process 1: Main transport agent

The main agent owns the transport and bridges frames to the bus. It has no LLM — it waits for the remote greeter agent to register, then activates it.
# main.py
import os

from redis.asyncio import Redis

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.bus import BusBridgeProcessor
from pipecat.bus.network.redis import RedisBus
from pipecat.pipeline.pipeline import Pipeline
from pipecat.workers.runner import WorkerRunner
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
    LLMContextAggregatorPair,
    LLMUserAggregatorParams,
)
from pipecat.registry.types import WorkerReadyData
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.workers.llm import LLMWorkerActivationArgs

MAIN_NAME = "acme"


async def run_bot(transport, runner_args):
    redis = Redis.from_url(runner_args.cli_args.redis_url)
    bus = RedisBus(redis=redis, channel=runner_args.cli_args.channel)
    runner = WorkerRunner(bus=bus, handle_sigint=runner_args.handle_sigint)

    stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
    tts = CartesiaTTSService(
        api_key=os.environ["CARTESIA_API_KEY"],
        settings=CartesiaTTSService.Settings(
            voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc",
        ),
    )

    context = LLMContext()
    aggregators = LLMContextAggregatorPair(
        context,
        user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
    )

    bridge = BusBridgeProcessor(
        bus=runner.bus,
        worker_name=MAIN_NAME,
        name=f"{MAIN_NAME}::BusBridge",
    )

    pipeline = Pipeline(
        [
            transport.input(),
            stt,
            aggregators.user(),
            bridge,
            tts,
            transport.output(),
            aggregators.assistant(),
        ]
    )

    main = PipelineWorker(
        pipeline,
        name=MAIN_NAME,
        params=PipelineParams(enable_metrics=True, enable_usage_metrics=True),
    )

    # The remote greeter may take a moment to register on the bus, so only
    # activate it once both the client is connected and the greeter is ready.
    state = {"client_connected": False, "greeter_ready": False}

    async def maybe_activate():
        if not (state["client_connected"] and state["greeter_ready"]):
            return
        await main.activate_worker(
            "greeter",
            args=LLMWorkerActivationArgs(
                messages=[{"role": "developer", "content": "Welcome the user."}],
            ),
        )

    async def on_greeter_ready(_data: WorkerReadyData) -> None:
        state["greeter_ready"] = True
        await maybe_activate()

    await runner.registry.watch("greeter", on_greeter_ready)

    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, client):
        state["client_connected"] = True
        await maybe_activate()

    @transport.event_handler("on_client_disconnected")
    async def on_client_disconnected(transport, client):
        await runner.cancel()

    await runner.add_workers(main)
    await runner.run()
python main.py --redis-url redis://localhost:6379

Process 2 & 3: LLM agents

Each LLM agent runs as a standalone process. It connects to the same Redis channel and waits for activation. Subclass LLMWorker to host @tool methods and pass the LLM service in the constructor:
# llm.py
import argparse
import asyncio
import os

from redis.asyncio import Redis

from pipecat.bus.network.redis import RedisBus
from pipecat.workers.runner import WorkerRunner
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.workers.llm import LLMWorker, LLMWorkerActivationArgs, tool


class AcmeLLMAgent(LLMWorker):
    def __init__(self, name: str, *, system_instruction: str, watch: list[str]):
        llm = OpenAILLMService(
            api_key=os.environ["OPENAI_API_KEY"],
            settings=OpenAILLMService.Settings(system_instruction=system_instruction),
        )
        super().__init__(name, llm=llm, bridged=())
        self._watch = watch

    async def start(self) -> None:
        """Watch sibling agents so handoff knows when they are available."""
        await super().start()
        await self.watch_workers(*self._watch)

    @tool(cancel_on_interruption=False)
    async def transfer_to_agent(self, params: FunctionCallParams, agent: str, reason: str):
        """Transfer the user to another agent.

        Args:
            agent (str): The target agent.
            reason (str): Transfer reason.
        """
        await self.activate_worker(
            agent,
            args=LLMWorkerActivationArgs(
                messages=[{"role": "developer", "content": reason}]
            ),
            deactivate_self=True,
            result_callback=params.result_callback,
        )


async def main_async():
    parser = argparse.ArgumentParser()
    parser.add_argument("worker", choices=["greeter", "support"])
    parser.add_argument("--redis-url", default="redis://localhost:6379")
    parser.add_argument("--channel", default="pipecat:my-app")
    args = parser.parse_args()

    redis = Redis.from_url(args.redis_url)
    bus = RedisBus(redis=redis, channel=args.channel)

    agent = AcmeLLMAgent(
        args.worker,
        system_instruction="You are a greeter..." if args.worker == "greeter" else "You are support...",
        watch=["support"] if args.worker == "greeter" else ["greeter"],
    )

    runner = WorkerRunner(bus=bus, handle_sigint=True)
    await runner.add_workers(agent)
    await runner.run()


asyncio.run(main_async())
# Run on Machine B
python llm.py greeter --redis-url redis://your-redis-host:6379

# Run on Machine C
python llm.py support --redis-url redis://your-redis-host:6379
Each LLM agent watches its sibling so that, when the user is transferred, the target is already known to be available. You can do the same with the @worker_ready decorator on a worker subclass to react automatically when a specific agent registers.

How discovery works

Runners exchange registry information automatically over the shared bus. To get notified when an agent is ready, watch it with runner.registry.watch(...), call watch_workers() from inside a worker, or use the @worker_ready decorator — they all work the same way locally and distributed.

Considerations

  • Latency: Network buses add overhead. For latency-sensitive voice applications, keep the main transport agent and its active LLM agent geographically close to each other and the bus server (Redis or PostgreSQL).
  • Serialization: Both RedisBus and PgmqBus serialize messages to JSON. Custom frame types need to be registered with the serializer.
  • Single channel: All agents on the same channel see all messages. Use different channels for different sessions or applications.