Project structure

A Pipecat project will often consist of the following:

1. Bot file

E.g. bot.py. Your Pipecat bot / agent, containing all the pipelines that you want to run in order to communicate with an end-user. A bot file may take some command line arguments, such as a transport URL and configuration.

2. Bot runner

E.g. bot_runner.py. Typically a basic HTTP service that listens for incoming user requests and spawns the relevant bot file in response.

You can call these files whatever you like! We use bot.py and bot_runner.py for simplicity.

Typical user / bot flow

There are many ways to approach connecting users to bots. Pipecat is unopinionated about how exactly you should do this, but it’s helpful to put an idea forward.

At a very basic level, it may look something like this:

1

User requests to join session via client / app

Client initiates a HTTP request to a hosted bot runner service.

2

Bot runner handles the request

Authenticates, configures and instantiates everything necessary for the session to commence (e.g. a new WebSocket channel, or WebRTC room, etc.)

3

Bot runner spawns bot / agent

A new bot process / VM is created for the user to connect with (passing across any necessary configuration.) Your project may load just one bot file, contextually swap between multiple, or launch many at once.

4

Bot instantiates and joins sessuib via specified transport credentials

Bot initializes, connects to the session (e.g. locally or via WebSockets, WebRTC etc) and runs your bot code.

5

Bot runner returns status to client

Once the bot is ready, the runner resolves the HTTP request with details for the client to connect.

Basic pipeline image

Bot runner

The majority of use-cases require a way to trigger and manage a bot session over the internet. We call these bot runners; a HTTP service that provides a gateway for spawning bots on-demand.

The anatomy of a bot runner service is entirery arbitrary, but at very least will have a method that spawns a new bot process, for example:

import uvicorn

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse

app = FastAPI()

@app.post("/start_bot")
async def start_bot(request: Request) -> JSONResponse:
    # ... handle / authenticate the request
    # ... setup the transport session

    # Spawn a new bot process
    try:
       #... create a new bot instance
    except Exception as e:
        raise HTTPException(
            status_code=500, detail=f"Failed to start bot: {e}")

    # Return a URL for the user to join
    return JSONResponse({...})

if __name__ == "__main__":
    uvicorn.run(
        "bot_runner:app",
        host="0.0.0.0",
        port=7860
    )

This pseudo code defines a /start_bot/ endpoint which listens for incoming user POST requests or webhooks, then configures the session (such as creating rooms on your transport provider) and instantiates a new bot process.

A client will typically require some information regarding the newly spawned bot, such as a web address, so we also return some JSON with the necessary details.

Data transport

Your transport layer is responsible for sending and receiving audio and video data over the internet.

You will have implemented a transport layer as part of your bot.py pipeline. This may be a service that you want to host and include in your deployment, or it may be a third-party service waiting for peers to connect (such as Daily, or a websocket.)

For this example, we will make use of Daily’s WebRTC transport. This will mean that our bot_runner.py will need to do some configuration when it spawns a new bot:

  1. Create and configure a new Daily room for the session to take place in.
  2. Issue both the bot and the user an authentication token to join the session.

Whatever you use for your transport layer, you’ll likely need to setup some environmental variables and run some custom code before spawning the agent.

Best practice for bot files

A good pattern to work to is the assumption that your bot.py is an encapsulated entity and does not have any knowledge of the bot_runner.py. You should provide the bot everything it needs to operate during instantiation.

Sticking to this approach helps keep things simple and makes it easier to step through debugging (if the bot launched and something goes wrong, you know to look for errors in your bot file.)


Example

Let’s assume we have a fully service-driven bot.py that connects to a WebRTC session, passes audio transcription to GPT4 and returns audio text-to-speech with ElevenLabs.

We’ll also use Silero voice activity detection, to better know when the user has stopped talking.

bot.py
import asyncio
import aiohttp
import os
import sys
import argparse

from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer

from loguru import logger

from dotenv import load_dotenv
load_dotenv(override=True)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")

daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")


async def main(room_url: str, token: str):
    async with aiohttp.ClientSession() as session:
        transport = DailyTransport(
            room_url,
            token,
            "Chatbot",
            DailyParams(
                api_url=daily_api_url,
                api_key=daily_api_key,
                audio_in_enabled=True,
                audio_out_enabled=True,
                camera_out_enabled=False,
                vad_enabled=True,
                vad_analyzer=SileroVADAnalyzer(),
                transcription_enabled=True,
            )
        )

        tts = ElevenLabsTTSService(
            aiohttp_session=session,
            api_key=os.getenv("ELEVENLABS_API_KEY", ""),
            voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
        )

        llm = OpenAILLMService(
            api_key=os.getenv("OPENAI_API_KEY"),
            model="gpt-4o")

        messages = [
            {
                "role": "system",
                "content": "You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello.",
            },
        ]

        tma_in = LLMUserResponseAggregator(messages)
        tma_out = LLMAssistantResponseAggregator(messages)

        pipeline = Pipeline([
            transport.input(),
            tma_in,
            llm,
            tts,
            transport.output(),
            tma_out,
        ])

        task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

        @transport.event_handler("on_first_participant_joined")
        async def on_first_participant_joined(transport, participant):
            transport.capture_participant_transcription(participant["id"])
            await task.queue_frames([LLMMessagesFrame(messages)])

        @transport.event_handler("on_participant_left")
        async def on_participant_left(transport, participant, reason):
            await task.queue_frame(EndFrame())

        @transport.event_handler("on_call_state_updated")
        async def on_call_state_updated(transport, state):
            if state == "left":
                await task.queue_frame(EndFrame())

        runner = PipelineRunner()

        await runner.run(task)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Pipecat Bot")
    parser.add_argument("-u", type=str, help="Room URL")
    parser.add_argument("-t", type=str, help="Token")
    config = parser.parse_args()

    asyncio.run(main(config.u, config.t))

HTTP API

To launch this bot, let’s create a bot_runner.py that:

  • Creates an API for users to send requests to.
  • Launches a bot as a subprocess.
bot_runner.py
import os
import argparse
import subprocess

from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams

from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse

# Load API keys from env
from dotenv import load_dotenv
load_dotenv(override=True)


# ------------ Configuration ------------ #

MAX_SESSION_TIME = 5 * 60  # 5 minutes
# List of require env vars our bot requires
REQUIRED_ENV_VARS = [
    'DAILY_API_KEY',
    'OPENAI_API_KEY',
    'ELEVENLABS_API_KEY',
    'ELEVENLABS_VOICE_ID']

daily_rest_helper = DailyRESTHelper(
    os.getenv("DAILY_API_KEY", ""),
    os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))


# ----------------- API ----------------- #

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"]
)

# ----------------- Main ----------------- #


@app.post("/start_bot")
async def start_bot(request: Request) -> JSONResponse:
    try:
        # Grab any data included in the post request
        data = await request.json()
    except Exception as e:
        pass

    # Create a new Daily WebRTC room for the session to take place in
    try:
        params = DailyRoomParams(
            properties=DailyRoomProperties()
        )
        room: DailyRoomObject = daily_rest_helper.create_room(params=params)
    except Exception as e:
        raise HTTPException(
            status_code=500,
            detail=f"Unable to provision room {e}")

    # Give the agent a token to join the session
    token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)

    # Return an error if we were unable to create a room or a token
    if not room or not token:
        raise HTTPException(
            status_code=500, detail=f"Failed to get token for room: {room_url}")

    try:
        # Start a new subprocess, passing the room and token to the bot file
        subprocess.Popen(
            [f"python3 -m bot -u {room.url} -t {token}"],
            shell=True,
            bufsize=1,
            cwd=os.path.dirname(os.path.abspath(__file__)))
    except Exception as e:
        raise HTTPException(
            status_code=500, detail=f"Failed to start subprocess: {e}")

    # Grab a token for the user to join with
    user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)

    # Return the room url and user token back to the user
    return JSONResponse({
        "room_url": room.url,
        "token": user_token,
    })


if __name__ == "__main__":
    # Check for required environment variables
    for env_var in REQUIRED_ENV_VARS:
        if env_var not in os.environ:
            raise Exception(f"Missing environment variable: {env_var}.")

    parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
    parser.add_argument("--host", type=str,
                        default=os.getenv("HOST", "0.0.0.0"), help="Host address")
    parser.add_argument("--port", type=int,
                        default=os.getenv("PORT", 7860), help="Port number")
    parser.add_argument("--reload", action="store_true",
                        default=False, help="Reload code on change")

    config = parser.parse_args()

    try:
        import uvicorn

        uvicorn.run(
            "bot_runner:app",
            host=config.host,
            port=config.port,
            reload=config.reload
        )

    except KeyboardInterrupt:
        print("Pipecat runner shutting down...")

Dockerfile

Since our bot is just using Python, our Dockerfile can be quite simple:

The bot runner and bot requirements.txt:

requirements.txt
pipecat-ai[daily,openai,silero]
fastapi
uvicorn
python-dotenv

And finally, let’s create a .env file with our service keys

.env
DAILY_API_KEY=...
OPENAI_API_KEY=...
ELEVENLABS_API_KEY=...
ELEVENLABS_VOICE_ID=...

How it works

Right now, this runner is spawning bot.py as a subprocess. When spawning the process, we pass through the transport room and token as system arguments to our bot, so it knows where to connect.

Subprocesses serve as a great way to test out your bot in the cloud without too much hassle, but depending on the size of the host machine, it will likely not hold up well under load.

Whilst some bots are just simple operators between the transport and third-party AI services (such as OpenAI), others have somewhat CPU-intensive operations, such as running and loading VAD models, so you may find you’re only able to scale this to support up to 5-10 concurrent bots.

Scaling your setup would require virtualizing your bot with it’s own set of system resources, the process of which depends on your cloud provider.

Best practices

In an ideal world, we’d recommend containerizing your bot and bot runner independently so you can deploy each without any unnecessary dependencies or models.

Most cloud providers will offer a way to deploy various images programmatically, which we explore in the various provider examples in these docs.

For the sake of simplicity defining this pattern, we’re just using one container for everything.

Build and run

We should now have a project that contains the following files:

  • bot.py
  • bot_runner.py
  • requirements.txt
  • .env
  • Dockerfile

You can now docker build ... and deploy your container. Of course, you can still work with your bot in local development too:

# Install and activate a virtual env
python -m venv venv
source venv/bin/activate # or OS equivalent

pip install -r requirements.txt

python bot_runner.py --host localhost --reload