import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesAppendFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService, CartesiaTTSSettings
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams, LLMService
from pipecat.services.openai.base_llm import OpenAILLMSettings
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat_subagents.agents import BaseAgent, LLMAgent, LLMAgentActivationArgs, agent_ready, tool
from pipecat_subagents.bus import AgentBus, BusBridgeProcessor
from pipecat_subagents.runner import AgentRunner
from pipecat_subagents.types import AgentReadyData
load_dotenv(override=True)
transport_params = {
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
# --- LLM Agents ---
class AcmeLLMAgent(LLMAgent):
"""Base class with shared tools for transfer and ending conversations."""
def __init__(self, name: str, *, bus: AgentBus):
super().__init__(name, bus=bus, bridged=())
@tool(cancel_on_interruption=False)
async def transfer_to_agent(self, params: FunctionCallParams, agent: str, reason: str):
"""Transfer the user to another agent.
Args:
agent (str): The agent to transfer to (e.g. 'greeter', 'support').
reason (str): Why the user is being transferred.
"""
logger.info(f"Agent '{self.name}': transferring to '{agent}' ({reason})")
await self.handoff_to(
agent,
activation_args=LLMAgentActivationArgs(
messages=[{"role": "user", "content": reason}],
),
result_callback=params.result_callback,
)
@tool
async def end_conversation(self, params: FunctionCallParams, reason: str):
"""End the conversation when the user says goodbye.
Args:
reason (str): Why the conversation is ending.
"""
logger.info(f"Agent '{self.name}': ending conversation ({reason})")
await params.llm.queue_frame(
LLMMessagesAppendFrame(messages=[{"role": "user", "content": reason}], run_llm=True)
)
await self.end(reason=reason, result_callback=params.result_callback)
class GreeterAgent(AcmeLLMAgent):
"""Greets the user and routes to support when needed."""
def build_llm(self) -> LLMService:
return OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMSettings(
system_instruction=(
"You are a friendly greeter for Acme Corp. The available products "
"are: the Acme Rocket Boots, the Acme Invisible Paint, and the Acme "
"Tornado Kit. Ask which one they'd like to learn more about. "
"When the user picks a product or asks a question about one, "
"immediately call the transfer_to_agent tool with agent 'support'. "
"Do not answer product questions yourself. If the user says goodbye, "
"call the end_conversation tool. Keep responses brief."
),
),
)
class SupportAgent(AcmeLLMAgent):
"""Handles product questions and can transfer back to the greeter."""
def build_llm(self) -> LLMService:
return OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMSettings(
system_instruction=(
"You are a support agent for Acme Corp. You know about three "
"products: Acme Rocket Boots ($299, run up to 60 mph), "
"Acme Invisible Paint ($49 per can, lasts 24 hours), and "
"Acme Tornado Kit ($199, batteries included). Answer product "
"questions. If the user wants to browse other products, call "
"the transfer_to_agent tool with agent 'greeter'. If the user "
"says goodbye, call the end_conversation tool. Keep responses brief."
),
),
)
# --- Main Transport Agent ---
class AcmeAgent(BaseAgent):
"""Owns the transport and bridges audio to/from the LLM agents."""
def __init__(self, name: str, *, bus: AgentBus, transport: BaseTransport):
super().__init__(name, bus=bus)
self._transport = transport
@agent_ready(name="greeter")
async def on_greeter_ready(self, data: AgentReadyData) -> None:
await self.activate_agent(
"greeter",
args=LLMAgentActivationArgs(
messages=[
{
"role": "user",
"content": "Welcome the user to Acme Corp and ask how you can help.",
},
],
),
)
def build_pipeline_task(self, pipeline: Pipeline) -> PipelineTask:
return PipelineTask(
pipeline,
enable_rtvi=True,
params=PipelineParams(enable_metrics=True, enable_usage_metrics=True),
)
async def build_pipeline(self) -> Pipeline:
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSSettings(
voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc",
),
)
context = LLMContext()
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
bridge = BusBridgeProcessor(
bus=self.bus,
agent_name=self.name,
)
@self._transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
greeter = GreeterAgent("greeter", bus=self.bus)
support = SupportAgent("support", bus=self.bus)
for agent in [greeter, support]:
await self.add_agent(agent)
@self._transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await self.cancel()
return Pipeline(
[
self._transport.input(),
stt,
context_aggregator.user(),
bridge,
tts,
self._transport.output(),
context_aggregator.assistant(),
]
)
# --- Entry Point ---
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
runner = AgentRunner(handle_sigint=runner_args.handle_sigint)
main = AcmeAgent("acme", bus=runner.bus, transport=transport)
await runner.add_agent(main)
await runner.run()
async def bot(runner_args: RunnerArguments):
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()