Distributed agents are agents connected to the same bus but running in different processes or on different machines. By default, all agents run in a single process using a local bus. For distributed setups, swap to a network bus — all agents share the same channel and discover each other automatically. Your agent code stays the same.
Distributed agents share a bus. If you need to connect agents on different
buses (separate networks, third-party services), see Proxy
Agents instead.
This example splits the two-agent handoff across separate processes. The main agent handles transport on one machine, and LLM agents run independently on other machines.
Each LLM agent runs as a standalone process. It connects to the same Redis channel and waits for activation. Subclass LLMWorker to host @tool methods and pass the LLM service in the constructor:
# llm.pyimport argparseimport asyncioimport osfrom redis.asyncio import Redisfrom pipecat.bus.network.redis import RedisBusfrom pipecat.workers.runner import WorkerRunnerfrom pipecat.services.llm_service import FunctionCallParamsfrom pipecat.services.openai.llm import OpenAILLMServicefrom pipecat.workers.llm import LLMWorker, LLMWorkerActivationArgs, toolclass AcmeLLMAgent(LLMWorker): def __init__(self, name: str, *, system_instruction: str, watch: list[str]): llm = OpenAILLMService( api_key=os.environ["OPENAI_API_KEY"], settings=OpenAILLMService.Settings(system_instruction=system_instruction), ) super().__init__(name, llm=llm, bridged=()) self._watch = watch async def start(self) -> None: """Watch sibling agents so handoff knows when they are available.""" await super().start() await self.watch_workers(*self._watch) @tool(cancel_on_interruption=False) async def transfer_to_agent(self, params: FunctionCallParams, agent: str, reason: str): """Transfer the user to another agent. Args: agent (str): The target agent. reason (str): Transfer reason. """ await self.activate_worker( agent, args=LLMWorkerActivationArgs( messages=[{"role": "developer", "content": reason}] ), deactivate_self=True, result_callback=params.result_callback, )async def main_async(): parser = argparse.ArgumentParser() parser.add_argument("worker", choices=["greeter", "support"]) parser.add_argument("--redis-url", default="redis://localhost:6379") parser.add_argument("--channel", default="pipecat:my-app") args = parser.parse_args() redis = Redis.from_url(args.redis_url) bus = RedisBus(redis=redis, channel=args.channel) agent = AcmeLLMAgent( args.worker, system_instruction="You are a greeter..." if args.worker == "greeter" else "You are support...", watch=["support"] if args.worker == "greeter" else ["greeter"], ) runner = WorkerRunner(bus=bus, handle_sigint=True) await runner.add_workers(agent) await runner.run()asyncio.run(main_async())
# Run on Machine Bpython llm.py greeter --redis-url redis://your-redis-host:6379# Run on Machine Cpython llm.py support --redis-url redis://your-redis-host:6379
Each LLM agent watches its sibling so that, when the user is transferred, the target is already known to be available. You can do the same with the @worker_ready decorator on a worker subclass to react automatically when a specific agent registers.
Runners exchange registry information automatically over the shared bus. To get notified when an agent is ready, watch it with runner.registry.watch(...), call watch_workers() from inside a worker, or use the @worker_ready decorator — they all work the same way locally and distributed.
Latency: Network buses add overhead. For latency-sensitive voice applications, keep the main transport agent and its active LLM agent geographically close to each other and the bus server (Redis or PostgreSQL).
Serialization: Both RedisBus and PgmqBus serialize messages to JSON. Custom frame types need to be registered with the serializer.
Single channel: All agents on the same channel see all messages. Use different channels for different sessions or applications.