import os
import sys
import time
import aiohttp
from loguru import logger
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
)
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.vad.vad_analyzer import VADParams
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main(room_url: str, token: str):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Friendly bot",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
transcription_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.6)),
),
)
messages = [
{
"role": "system",
"content": "You are a helpful AI assistant that can switch between two services to showcase the difference in performance and cost: 'openai_realtime' and 'custom'. Respond to user queries and switch services when asked.",
},
]
llm = OpenAILLMService(
name="LLM",
api_key=os.environ.get("OPENAI_API_KEY"),
model="gpt-4",
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
custom_context = OpenAILLMContext(messages=messages)
context_aggregator_custom = llm.create_context_aggregator(custom_context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator_custom.user(),
llm,
tts,
context_aggregator_custom.assistant(),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
time.sleep(1.5)
messages.append(
{
"role": "system",
"content": "Introduce yourself.",
}
)
await task.queue_frame(LLMMessagesFrame(messages))
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)
await session.close()