Skip to main content

Overview

Distributed agents are agents connected to the same bus but running in different processes or on different machines. By default, all agents run in a single process using a local bus. For distributed setups, swap to a network bus — all agents share the same channel and discover each other automatically. Your agent code stays the same.
Distributed agents share a bus. If you need to connect agents on different buses (separate networks, third-party services), see Proxy Agents instead.

Setting up RedisBus

Each process creates its own AgentRunner with a RedisBus connected to the same Redis channel:
from redis.asyncio import Redis
from pipecat_subagents.bus.network.redis import RedisBus
from pipecat_subagents.runner import AgentRunner

redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="pipecat:my-app")
runner = AgentRunner(bus=bus, handle_sigint=True)
All runners sharing the same channel can exchange messages. Agents discover each other automatically through registry snapshots.
Install the Redis extra: pip install pipecat-ai-subagents[redis]

Example: distributed handoff

This example splits the two-agent handoff across separate processes. The main agent handles transport on one machine, and LLM agents run independently on other machines.

Process 1: Main transport agent

The main agent owns the transport and bridges frames to the bus. It has no LLM — it waits for a remote greeter agent to register, then activates it.
# main_agent.py
from pipecat_subagents.agents import BaseAgent, LLMAgentActivationArgs, agent_ready
from pipecat_subagents.bus import BusBridgeProcessor
from pipecat_subagents.bus.network.redis import RedisBus
from pipecat_subagents.runner import AgentRunner
from pipecat_subagents.types import AgentReadyData

class MainAgent(BaseAgent):
    def __init__(self, name, *, bus, transport):
        super().__init__(name, bus=bus)
        self._transport = transport

    @agent_ready(name="greeter")
    async def on_greeter_ready(self, data: AgentReadyData) -> None:
        await self.activate_agent(
            "greeter",
            activation_args=LLMAgentActivationArgs(
                messages=[{"role": "user", "content": "Welcome the user."}],
            ),
        )

    async def build_pipeline(self) -> Pipeline:
        stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
        tts = CartesiaTTSService(api_key=os.getenv("CARTESIA_API_KEY"))
        context = LLMContext()
        context_aggregator = LLMContextAggregatorPair(
            context,
            user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
        )
        bridge = BusBridgeProcessor(bus=self.bus, agent_name=self.name)

        return Pipeline([
            self._transport.input(),
            stt,
            context_aggregator.user(),
            bridge,
            tts,
            self._transport.output(),
            context_aggregator.assistant(),
        ])


async def run_bot(transport, runner_args):
    redis = Redis.from_url(runner_args.cli_args.redis_url)
    bus = RedisBus(redis=redis, channel=runner_args.cli_args.channel)
    runner = AgentRunner(bus=bus, handle_sigint=True)
    agent = MainAgent("acme", bus=bus, transport=transport)
    await runner.add_agent(agent)
    await runner.run()
python main_agent.py --redis-url redis://localhost:6379

Process 2 & 3: LLM agents

Each LLM agent runs as a standalone process. It connects to the same Redis channel and waits for activation:
# llm_agent.py
import asyncio
from pipecat_subagents.agents import LLMAgent, LLMAgentActivationArgs, tool
from pipecat_subagents.bus.network.redis import RedisBus
from pipecat_subagents.runner import AgentRunner

class AcmeLLMAgent(LLMAgent):
    def __init__(self, name, *, bus, system_instruction):
        super().__init__(name, bus=bus, bridged=())
        self._system_instruction = system_instruction

    def build_llm(self) -> LLMService:
        return OpenAILLMService(
            api_key=os.getenv("OPENAI_API_KEY"),
            settings=OpenAILLMSettings(system_instruction=self._system_instruction),
        )

    @tool(cancel_on_interruption=False)
    async def transfer_to_agent(self, params: FunctionCallParams, agent: str, reason: str):
        """Transfer the user to another agent.

        Args:
            agent (str): The target agent.
            reason (str): Transfer reason.
        """
        await self.handoff_to(
            agent,
            activation_args=LLMAgentActivationArgs(messages=[{"role": "user", "content": reason}]),
            result_callback=params.result_callback,
        )


async def main():
    redis = Redis.from_url("redis://localhost:6379")
    bus = RedisBus(redis=redis, channel="pipecat:my-app")
    agent = AcmeLLMAgent("greeter", bus=bus, system_instruction="You are a greeter...")
    runner = AgentRunner(bus=bus, handle_sigint=True)
    await runner.add_agent(agent)
    await runner.run()

asyncio.run(main())
# Run on Machine B
python llm_agent.py greeter --redis-url redis://your-redis-host:6379

# Run on Machine C
python llm_agent.py support --redis-url redis://your-redis-host:6379

How discovery works

Runners exchange registry information automatically over the shared bus. To get notified when an agent is ready, use @agent_ready or watch_agent() — they work the same way locally and distributed.

Considerations

  • Latency: Redis adds network overhead. For latency-sensitive voice applications, keep the main transport agent and its active LLM agent geographically close to each other and the Redis instance.
  • Serialization: RedisBus serializes messages to JSON. Custom frame types need to be registered with the serializer.
  • Single channel: All agents on the same channel see all messages. Use different channels for different sessions or applications.