Documentation Index
Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Distributed agents are agents connected to the same bus but running in different processes or on different machines. By default, all agents run in a single process using a local bus. For distributed setups, swap to a network bus — all agents share the same channel and discover each other automatically. Your agent code stays the same.
Distributed agents share a bus. If you need to connect agents on different
buses (separate networks, third-party services), see Proxy
Agents instead.
Setting up RedisBus
Each process creates its own AgentRunner with a RedisBus connected to the same Redis channel:
from redis.asyncio import Redis
from pipecat_subagents.bus.network.redis import RedisBus
from pipecat_subagents.runner import AgentRunner
redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="pipecat:my-app")
runner = AgentRunner(bus=bus, handle_sigint=True)
All runners sharing the same channel can exchange messages. Agents discover each other automatically through registry snapshots.
Install the Redis extra: uv add "pipecat-ai-subagents[redis]"
Example: distributed handoff
This example splits the two-agent handoff across separate processes. The main agent handles transport on one machine, and LLM agents run independently on other machines.
Process 1: Main transport agent
The main agent owns the transport and bridges frames to the bus. It has no LLM — it waits for a remote greeter agent to register, then activates it.
# main_agent.py
from pipecat_subagents.agents import BaseAgent, LLMAgentActivationArgs, agent_ready
from pipecat_subagents.bus import BusBridgeProcessor
from pipecat_subagents.bus.network.redis import RedisBus
from pipecat_subagents.runner import AgentRunner
from pipecat_subagents.types import AgentReadyData
class MainAgent(BaseAgent):
def __init__(self, name, *, bus, transport):
super().__init__(name, bus=bus)
self._transport = transport
@agent_ready(name="greeter")
async def on_greeter_ready(self, data: AgentReadyData) -> None:
await self.activate_agent(
"greeter",
activation_args=LLMAgentActivationArgs(
messages=[{"role": "user", "content": "Welcome the user."}],
),
)
async def build_pipeline(self) -> Pipeline:
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(api_key=os.getenv("CARTESIA_API_KEY"))
context = LLMContext()
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
bridge = BusBridgeProcessor(bus=self.bus, agent_name=self.name)
return Pipeline([
self._transport.input(),
stt,
context_aggregator.user(),
bridge,
tts,
self._transport.output(),
context_aggregator.assistant(),
])
async def run_bot(transport, runner_args):
redis = Redis.from_url(runner_args.cli_args.redis_url)
bus = RedisBus(redis=redis, channel=runner_args.cli_args.channel)
runner = AgentRunner(bus=bus, handle_sigint=True)
agent = MainAgent("acme", bus=bus, transport=transport)
await runner.add_agent(agent)
await runner.run()
python main_agent.py --redis-url redis://localhost:6379
Process 2 & 3: LLM agents
Each LLM agent runs as a standalone process. It connects to the same Redis channel and waits for activation:
# llm_agent.py
import asyncio
from pipecat_subagents.agents import LLMAgent, LLMAgentActivationArgs, tool
from pipecat_subagents.bus.network.redis import RedisBus
from pipecat_subagents.runner import AgentRunner
class AcmeLLMAgent(LLMAgent):
def __init__(self, name, *, bus, system_instruction):
super().__init__(name, bus=bus, bridged=())
self._system_instruction = system_instruction
def build_llm(self) -> LLMService:
return OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMSettings(system_instruction=self._system_instruction),
)
@tool(cancel_on_interruption=False)
async def transfer_to_agent(self, params: FunctionCallParams, agent: str, reason: str):
"""Transfer the user to another agent.
Args:
agent (str): The target agent.
reason (str): Transfer reason.
"""
await self.handoff_to(
agent,
activation_args=LLMAgentActivationArgs(messages=[{"role": "user", "content": reason}]),
result_callback=params.result_callback,
)
async def main():
redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="pipecat:my-app")
agent = AcmeLLMAgent("greeter", bus=bus, system_instruction="You are a greeter...")
runner = AgentRunner(bus=bus, handle_sigint=True)
await runner.add_agent(agent)
await runner.run()
asyncio.run(main())
# Run on Machine B
python llm_agent.py greeter --redis-url redis://your-redis-host:6379
# Run on Machine C
python llm_agent.py support --redis-url redis://your-redis-host:6379
How discovery works
Runners exchange registry information automatically over the shared bus. To get notified when an agent is ready, use @agent_ready or watch_agent() — they work the same way locally and distributed.
Considerations
- Latency: Redis adds network overhead. For latency-sensitive voice applications, keep the main transport agent and its active LLM agent geographically close to each other and the Redis instance.
- Serialization:
RedisBus serializes messages to JSON. Custom frame types need to be registered with the serializer.
- Single channel: All agents on the same channel see all messages. Use different channels for different sessions or applications.