Overview
Distributed agents are agents connected to the same bus but running in different processes or on different machines. By default, all agents run in a single process using a local bus. For distributed setups, swap to a network bus — all agents share the same channel and discover each other automatically. Your agent code stays the same.
Distributed agents share a bus. If you need to connect agents on different buses (separate networks, third-party services), see Proxy Agents instead.
Setting up RedisBus
Each process creates its own AgentRunner with a RedisBus connected to the same Redis channel:
from redis.asyncio import Redis
from pipecat_subagents.bus.network.redis import RedisBus
from pipecat_subagents.runner import AgentRunner
redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="pipecat:my-app")
runner = AgentRunner(bus=bus, handle_sigint=True)
All runners sharing the same channel can exchange messages. Agents discover each other automatically through registry snapshots.
Install the Redis extra: pip install pipecat-ai-subagents[redis]
Example: distributed handoff
This example splits the two-agent handoff across separate processes. The main agent handles transport on one machine, and LLM agents run independently on other machines.
Process 1: Main transport agent
The main agent owns the transport and bridges frames to the bus. It has no LLM — it waits for a remote greeter agent to register, then activates it.
# main_agent.py
from pipecat_subagents.agents import BaseAgent, LLMAgentActivationArgs, agent_ready
from pipecat_subagents.bus import BusBridgeProcessor
from pipecat_subagents.bus.network.redis import RedisBus
from pipecat_subagents.runner import AgentRunner
from pipecat_subagents.types import AgentReadyData
class MainAgent(BaseAgent):
def __init__(self, name, *, bus, transport):
super().__init__(name, bus=bus)
self._transport = transport
@agent_ready(name="greeter")
async def on_greeter_ready(self, data: AgentReadyData) -> None:
await self.activate_agent(
"greeter",
activation_args=LLMAgentActivationArgs(
messages=[{"role": "user", "content": "Welcome the user."}],
),
)
async def build_pipeline(self) -> Pipeline:
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(api_key=os.getenv("CARTESIA_API_KEY"))
context = LLMContext()
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
bridge = BusBridgeProcessor(bus=self.bus, agent_name=self.name)
return Pipeline([
self._transport.input(),
stt,
context_aggregator.user(),
bridge,
tts,
self._transport.output(),
context_aggregator.assistant(),
])
async def run_bot(transport, runner_args):
redis = Redis.from_url(runner_args.cli_args.redis_url)
bus = RedisBus(redis=redis, channel=runner_args.cli_args.channel)
runner = AgentRunner(bus=bus, handle_sigint=True)
agent = MainAgent("acme", bus=bus, transport=transport)
await runner.add_agent(agent)
await runner.run()
python main_agent.py --redis-url redis://localhost:6379
Process 2 & 3: LLM agents
Each LLM agent runs as a standalone process. It connects to the same Redis channel and waits for activation:
# llm_agent.py
import asyncio
from pipecat_subagents.agents import LLMAgent, LLMAgentActivationArgs, tool
from pipecat_subagents.bus.network.redis import RedisBus
from pipecat_subagents.runner import AgentRunner
class AcmeLLMAgent(LLMAgent):
def __init__(self, name, *, bus, system_instruction):
super().__init__(name, bus=bus, bridged=())
self._system_instruction = system_instruction
def build_llm(self) -> LLMService:
return OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMSettings(system_instruction=self._system_instruction),
)
@tool(cancel_on_interruption=False)
async def transfer_to_agent(self, params: FunctionCallParams, agent: str, reason: str):
"""Transfer the user to another agent.
Args:
agent (str): The target agent.
reason (str): Transfer reason.
"""
await self.handoff_to(
agent,
activation_args=LLMAgentActivationArgs(messages=[{"role": "user", "content": reason}]),
result_callback=params.result_callback,
)
async def main():
redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="pipecat:my-app")
agent = AcmeLLMAgent("greeter", bus=bus, system_instruction="You are a greeter...")
runner = AgentRunner(bus=bus, handle_sigint=True)
await runner.add_agent(agent)
await runner.run()
asyncio.run(main())
# Run on Machine B
python llm_agent.py greeter --redis-url redis://your-redis-host:6379
# Run on Machine C
python llm_agent.py support --redis-url redis://your-redis-host:6379
How discovery works
Runners exchange registry information automatically over the shared bus. To get notified when an agent is ready, use @agent_ready or watch_agent() — they work the same way locally and distributed.
Considerations
- Latency: Redis adds network overhead. For latency-sensitive voice applications, keep the main transport agent and its active LLM agent geographically close to each other and the Redis instance.
- Serialization:
RedisBus serializes messages to JSON. Custom frame types need to be registered with the serializer.
- Single channel: All agents on the same channel see all messages. Use different channels for different sessions or applications.