Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt

Use this file to discover all available pages before exploring further.

What is the agent registry?

The WorkerRegistry tracks all known agents across local and remote runners. It is owned by the runner and shared with all its agents. When an agent becomes ready, it registers itself, and other agents that are watching for it get notified. You don’t interact with the registry directly in most cases. Instead, you use the @worker_ready decorator or watch_workers() to express interest in a specific agent, and the registry handles the rest.

Watching for agents

The @worker_ready decorator

The most common way to watch for an agent. Decorate a method with the agent name, and it fires when that agent registers:
from pipecat.pipeline.base_worker import BaseWorker
from pipecat.pipeline.worker_ready_decorator import worker_ready
from pipecat.registry.types import WorkerReadyData

class MainAgent(BaseWorker):
    @worker_ready(name="greeter")
    async def on_greeter_ready(self, data: WorkerReadyData) -> None:
        await self.activate_worker("greeter")
The framework automatically calls watch_workers() for each @worker_ready handler when the agent starts. If the watched agent is already registered, the handler fires immediately.

watch_workers()

For dynamic cases where you don’t know the agent name at class definition time, call watch_workers() directly. The best place to do this is in start(), after calling super().start():
class MainAgent(BaseWorker):
    async def start(self) -> None:
        await super().start()
        for name in self._dynamic_worker_names:
            await self.watch_workers(name)

    async def on_worker_ready(self, data: WorkerReadyData) -> None:
        await super().on_worker_ready(data)
        await self.activate_worker(data.worker_name)
This is useful when agent names come from configuration or are created at runtime. Watched agents that are not handled by a @worker_ready decorator dispatch to the on_worker_ready hook.

How discovery works

Local agents

When all agents run in the same process, discovery is straightforward. An agent registers in the shared registry when its pipeline is ready, and watchers are notified immediately.

Distributed agents

In distributed setups (agents across different processes connected to the same bus), runners exchange registry snapshots automatically. When a remote agent becomes ready, its runner broadcasts a registry message over the bus. Other runners update their local registry, and any matching watchers fire.
Only root agents (added via runner.add_workers()) are discoverable across runners. Child agents (added via parent.add_workers()) are not broadcast and remain invisible to other runners.

Agent readiness data

When a watcher fires, it receives a WorkerReadyData object:
@worker_ready(name="greeter")
async def on_greeter_ready(self, data: WorkerReadyData) -> None:
    print(data.worker_name)  # "greeter"
    print(data.runner)       # Name of the runner managing the agent

Uniqueness

Agent names must be unique within a registry. If the same name is registered from two different runners, the registry logs a warning. In distributed setups, choose unique names across all runners to avoid conflicts.