Cerebrium is a serverless Infrastructure platform that makes it easy for companies to build, deploy and scale AI applications. Cerebrium offers both CPUs and GPUs (H100s, A100s etc) with extremely low cold start times allowing us to create highly performant applications in the most cost efficient manner.

Install the Cerebrium CLI

To get started, let us run the following commands:

  1. Run pip install cerebrium to install the Python package.
  2. Run cerebrium login to authenticate yourself.

If you don’t have a Cerebrium account, you can create one and get started with $30 in free credits.

Create a Cerebrium project

  1. Create a new Cerebrium project:
cerebrium init pipecat-agent
  1. This will create two key files:
  • main.py - Your application entrypoint
  • cerebrium.toml - Configuration for build and environment settings

Update your cerebrium.toml with the necessary configuration:

[cerebrium.hardware]
region = "us-east-1"
provider = "aws"
compute = "CPU"
cpu = 4
memory = 18.0

[cerebrium.dependencies.pip]
torch = ">=2.0.0"
"pipecat-ai[silero, daily, openai, cartesia]" = "latest"
aiohttp = "latest"
torchaudio = "latest"

In order for our application to work, we need to copy our API keys from the various platforms. Navigate to the Secrets section in your Cerebrium dashboard to store your API keys:

  • OPENAI_API_KEY - We use OpenAI For the LLM. You can get your API key from here
  • DAILY_TOKEN - For WebRTC communication. You can get your token from here
  • CARTERSIA_API_KEY - For text-to-speech services. You can get your API key from here

We access these secrets in our code as if they are normal ENV vars. For the above, You can swap in any LLM or TTS service you wish to use.

Agent setup

We create a basic pipeline setup in our main.py that combines our LLM, TTS and Daily WebRTC transport layer.

import os
import sys
import time

import aiohttp
from loguru import logger
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.openai import OpenAILLMService
from pipecat.processors.aggregators.openai_llm_context import (
    OpenAILLMContext,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.vad.vad_analyzer import VADParams


logger.remove(0)
logger.add(sys.stderr, level="DEBUG")

async def main(room_url: str, token: str):
    async with aiohttp.ClientSession() as session:
        transport = DailyTransport(
            room_url,
            token,
            "Friendly bot",
            DailyParams(
                audio_in_enabled=True,
                audio_in_sample_rate=24000,
                audio_out_enabled=True,
                audio_out_sample_rate=24000,
                transcription_enabled=True,
                vad_enabled=True,
                vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.6)),
                vad_audio_passthrough=True,
            ),
        )

       
        messages = [
            {
                "role": "system",
                "content": "You are a helpful AI assistant that can switch between two services to showcase the difference in performance and cost: 'openai_realtime' and 'custom'. Respond to user queries and switch services when asked.",
            },
        ]
        
        llm = OpenAILLMService(
            name="LLM",
            api_key=os.environ.get("OPENAI_API_KEY"),
            model="gpt-4",
        )

        tts = CartesiaTTSService(
            api_key=os.getenv("CARTESIA_API_KEY"),
            voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22",  # British Lady
        )  

        custom_context = OpenAILLMContext(messages=messages)
        context_aggregator_custom = llm.create_context_aggregator(custom_context)

        pipeline = Pipeline(
            [
                transport.input(),  # Transport user input
                context_aggregator_custom.user(),
                llm,
                tts,
                context_aggregator_custom.assistant(),
                transport.output(),  # Transport bot output
            ]
        )

        task = PipelineTask(
            pipeline,
            PipelineParams(
                allow_interruptions=True,
                enable_metrics=True,
                enable_usage_metrics=True,
            ),
        )

        @transport.event_handler("on_first_participant_joined")
        async def on_first_participant_joined(transport, participant):
            transport.capture_participant_transcription(participant["id"])
            time.sleep(1.5)
            messages.append(
                {
                    "role": "system",
                    "content": "Introduce yourself.",
                }
            )
            await task.queue_frame(LLMMessagesFrame(messages))

        @transport.event_handler("on_participant_left")
        async def on_participant_left(transport, participant, reason):
            await task.queue_frame(EndFrame())

        @transport.event_handler("on_call_state_updated")
        async def on_call_state_updated(transport, state):
            if state == "left":
                await task.queue_frame(EndFrame())

        runner = PipelineRunner()
        await runner.run(task)
        await session.close()

First, in our main function, we initialize the daily transport layer to receive/send the audio/video data from the Daily room we will connect to. You can see we pass the room_url we would like to join as well as a token to authenticate us programmatically joining. We also set our VAD stop seconds which is the amount of time we wait for a pause before our bot will respond - in this example, we set it to 600 milliseconds.

Next we connect to our LLM (OpenAI) as well as our TTS model (Cartesia). By setting ‘transcription_enabled=true’ we are using the STT from Daily itself. This is where the Pipecat framework helps convert audio data to text and vice versa.

We then put this all together as a PipelineTask which is what Pipecat runs all together. The make up of a task is completely customizable and has support for Image and Vision use cases. Lastly, we have some event handlers for when a user joins/leaves the room.

Deploy bot

Deploy your application to Cerebrium:

cerebrium deploy

You will then see that an endpoint is created for your bot at POST \<BASE_URL\>/main that you can call with your room_url and token. Let us test it.

Test it out

def create_room():
    url = "https://api.daily.co/v1/rooms/"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {os.environ.get('DAILY_TOKEN')}",
    }
    data = {
        "properties": {
            "exp": int(time.time()) + 60 * 5,  ##5 mins
            "eject_at_room_exp": True,
        }
    }

    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 200:
        room_info = response.json()
        token = create_token(room_info["name"])
        if token and "token" in token:
            room_info["token"] = token["token"]
        else:
            logger.error("Failed to create token")
            return {
                "message": "There was an error creating your room",
                "status_code": 500,
            }
        return room_info
    else:
        logger.error(f"Failed to create room: {response.status_code}")
        return {"message": "There was an error creating your room", "status_code": 500}


def create_token(room_name: str):
    url = "https://api.daily.co/v1/meeting-tokens"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {os.environ.get('DAILY_TOKEN')}",
    }
    data = {"properties": {"room_name": room_name, "is_owner": True}}

    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 200:
        token_info = response.json()
        return token_info
    else:
        logger.error(f"Failed to create token: {response.status_code}")
        return None

if __name__ == "__main__":
    room_info = create_room()
    print(f"Join room: {room_info["url"]}")
    asyncio.run(main(room_info["url"], room_info["token"]))

Future Considerations

Since Cerebrium supports both CPU and GPU workloads if you would like to lower the latency of your application then the best would be to get model weights from various providers and run them locally. You can do this for:

  • LLM: Run any OpenSource model using a framework such as vLLM
  • TTS: Both PlayHt and Deepgram offer TTS models that can be run locally
  • STT: Deepgram offers a local model that can be run locally

If you implement all three models locally, you should have much better performance. We have been able to get ~300ms voice-to-voice responses.

Examples