Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Distributed agents are agents connected to the same bus but running in different processes or on different machines. By default, all agents run in a single process using a local bus. For distributed setups, swap to a network bus — all agents share the same channel and discover each other automatically. Your agent code stays the same.
Distributed agents share a bus. If you need to connect agents on different buses (separate networks, third-party services), see Proxy Agents instead.

Setting up RedisBus

Each process creates its own AgentRunner with a RedisBus connected to the same Redis channel:
from redis.asyncio import Redis
from pipecat_subagents.bus.network.redis import RedisBus
from pipecat_subagents.runner import AgentRunner

redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="pipecat:my-app")
runner = AgentRunner(bus=bus, handle_sigint=True)
All runners sharing the same channel can exchange messages. Agents discover each other automatically through registry snapshots.
Install the Redis extra: uv add "pipecat-ai-subagents[redis]"

Example: distributed handoff

This example splits the two-agent handoff across separate processes. The main agent handles transport on one machine, and LLM agents run independently on other machines.

Process 1: Main transport agent

The main agent owns the transport and bridges frames to the bus. It has no LLM — it waits for a remote greeter agent to register, then activates it.
# main_agent.py
from pipecat_subagents.agents import BaseAgent, LLMAgentActivationArgs, agent_ready
from pipecat_subagents.bus import BusBridgeProcessor
from pipecat_subagents.bus.network.redis import RedisBus
from pipecat_subagents.runner import AgentRunner
from pipecat_subagents.types import AgentReadyData

class MainAgent(BaseAgent):
    def __init__(self, name, *, bus, transport):
        super().__init__(name, bus=bus)
        self._transport = transport

    @agent_ready(name="greeter")
    async def on_greeter_ready(self, data: AgentReadyData) -> None:
        await self.activate_agent(
            "greeter",
            activation_args=LLMAgentActivationArgs(
                messages=[{"role": "user", "content": "Welcome the user."}],
            ),
        )

    async def build_pipeline(self) -> Pipeline:
        stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
        tts = CartesiaTTSService(api_key=os.getenv("CARTESIA_API_KEY"))
        context = LLMContext()
        context_aggregator = LLMContextAggregatorPair(
            context,
            user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
        )
        bridge = BusBridgeProcessor(bus=self.bus, agent_name=self.name)

        return Pipeline([
            self._transport.input(),
            stt,
            context_aggregator.user(),
            bridge,
            tts,
            self._transport.output(),
            context_aggregator.assistant(),
        ])


async def run_bot(transport, runner_args):
    redis = Redis.from_url(runner_args.cli_args.redis_url)
    bus = RedisBus(redis=redis, channel=runner_args.cli_args.channel)
    runner = AgentRunner(bus=bus, handle_sigint=True)
    agent = MainAgent("acme", bus=bus, transport=transport)
    await runner.add_agent(agent)
    await runner.run()
python main_agent.py --redis-url redis://localhost:6379

Process 2 & 3: LLM agents

Each LLM agent runs as a standalone process. It connects to the same Redis channel and waits for activation:
# llm_agent.py
import asyncio
from pipecat_subagents.agents import LLMAgent, LLMAgentActivationArgs, tool
from pipecat_subagents.bus.network.redis import RedisBus
from pipecat_subagents.runner import AgentRunner

class AcmeLLMAgent(LLMAgent):
    def __init__(self, name, *, bus, system_instruction):
        super().__init__(name, bus=bus, bridged=())
        self._system_instruction = system_instruction

    def build_llm(self) -> LLMService:
        return OpenAILLMService(
            api_key=os.getenv("OPENAI_API_KEY"),
            settings=OpenAILLMSettings(system_instruction=self._system_instruction),
        )

    @tool(cancel_on_interruption=False)
    async def transfer_to_agent(self, params: FunctionCallParams, agent: str, reason: str):
        """Transfer the user to another agent.

        Args:
            agent (str): The target agent.
            reason (str): Transfer reason.
        """
        await self.handoff_to(
            agent,
            activation_args=LLMAgentActivationArgs(messages=[{"role": "user", "content": reason}]),
            result_callback=params.result_callback,
        )


async def main():
    redis = Redis.from_url("redis://localhost:6379")
    bus = RedisBus(redis=redis, channel="pipecat:my-app")
    agent = AcmeLLMAgent("greeter", bus=bus, system_instruction="You are a greeter...")
    runner = AgentRunner(bus=bus, handle_sigint=True)
    await runner.add_agent(agent)
    await runner.run()

asyncio.run(main())
# Run on Machine B
python llm_agent.py greeter --redis-url redis://your-redis-host:6379

# Run on Machine C
python llm_agent.py support --redis-url redis://your-redis-host:6379

How discovery works

Runners exchange registry information automatically over the shared bus. To get notified when an agent is ready, use @agent_ready or watch_agent() — they work the same way locally and distributed.

Considerations

  • Latency: Redis adds network overhead. For latency-sensitive voice applications, keep the main transport agent and its active LLM agent geographically close to each other and the Redis instance.
  • Serialization: RedisBus serializes messages to JSON. Custom frame types need to be registered with the serializer.
  • Single channel: All agents on the same channel see all messages. Use different channels for different sessions or applications.