import os
import sys
import time
import aiohttp
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import EndFrame, LLMMessagesAppendFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
)
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.daily.transport import DailyParams, DailyTransport
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main(room_url: str, token: str):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Friendly bot",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
transcription_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.6)),
),
)
messages = [
{
"role": "system",
"content": "You are a helpful AI assistant that can switch between two services to showcase the difference in performance and cost: 'openai_realtime' and 'custom'. Respond to user queries and switch services when asked.",
},
]
llm = OpenAILLMService(
name="LLM",
api_key=os.environ.get("OPENAI_API_KEY"),
model="gpt-4",
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
custom_context = OpenAILLMContext(messages=messages)
context_aggregator_custom = llm.create_context_aggregator(custom_context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator_custom.user(),
llm,
tts,
context_aggregator_custom.assistant(),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
time.sleep(1.5)
await task.queue_frame(
LLMMessagesAppendFrame(
[
{
"role": "system",
"content": "Introduce yourself.",
}
],
run_llm=True,
)
)
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)
await session.close()