Project structure
A Pipecat project will often consist of the following:
1. Bot file
E.g. bot.py
. Your Pipecat bot / agent, containing all the pipelines that you
want to run in order to communicate with an end-user. A bot file may take some
command line arguments, such as a transport URL and configuration.
2. Bot runner
E.g. bot_runner.py
. Typically a basic HTTP service that listens for incoming
user requests and spawns the relevant bot file in response.
You can call these files whatever you like! We use bot.py
and
bot_runner.py
for simplicity.
Typical user / bot flow
There are many ways to approach connecting users to bots. Pipecat is unopinionated about how exactly you should do this, but it’s helpful to put an idea forward.
At a very basic level, it may look something like this:
User requests to join session via client / app
Client initiates a HTTP request to a hosted bot runner service.
Bot runner handles the request
Authenticates, configures and instantiates everything necessary for the
session to commence (e.g. a new WebSocket channel, or WebRTC room, etc.)
Bot runner spawns bot / agent
A new bot process / VM is created for the user to connect with (passing
across any necessary configuration.) Your project may load just one bot
file, contextually swap between multiple, or launch many at once.
Bot instantiates and joins sessuib via specified transport credentials
Bot initializes, connects to the session (e.g. locally or via WebSockets,
WebRTC etc) and runs your bot code.
Bot runner returns status to client
Once the bot is ready, the runner resolves the HTTP request with details for
the client to connect.
Bot runner
The majority of use-cases require a way to trigger and manage a bot session over the internet. We call these bot runners; a HTTP service that provides a gateway for spawning bots on-demand.
The anatomy of a bot runner service is entirery arbitrary, but at very least will have a method that spawns a new bot process, for example:
import uvicorn
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
app = FastAPI()
@app.post("/start_bot")
async def start_bot(request: Request) -> JSONResponse:
try:
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start bot: {e}")
return JSONResponse({...})
if __name__ == "__main__":
uvicorn.run(
"bot_runner:app",
host="0.0.0.0",
port=7860
)
This pseudo code defines a /start_bot/
endpoint which listens for incoming user POST
requests or webhooks, then configures the session (such as creating rooms on your transport provider) and instantiates a new bot process.
A client will typically require some information regarding the newly spawned bot, such as a web address, so we also return some JSON with the necessary details.
Data transport
Your transport layer is responsible for sending and receiving audio and video data over the internet.
You will have implemented a transport layer as part of your bot.py
pipeline. This may be a service that you want to host and include in your deployment, or it may be a third-party service waiting for peers to connect (such as Daily, or a websocket.)
For this example, we will make use of Daily’s WebRTC transport. This will mean that our bot_runner.py
will need to do some configuration when it spawns a new bot:
- Create and configure a new Daily room for the session to take place in.
- Issue both the bot and the user an authentication token to join the session.
Whatever you use for your transport layer, you’ll likely need to setup some environmental variables and run some custom code before spawning the agent.
Best practice for bot files
A good pattern to work to is the assumption that your bot.py
is an encapsulated entity and does not have any knowledge of the bot_runner.py
. You should provide the bot everything it needs to operate during instantiation.
Sticking to this approach helps keep things simple and makes it easier to step through debugging (if the bot launched and something goes wrong, you know to look for errors in your bot file.)
Example
Let’s assume we have a fully service-driven bot.py
that connects to a WebRTC session, passes audio transcription to GPT4 and returns audio text-to-speech with ElevenLabs.
We’ll also use Silero voice activity detection, to better know when the user has stopped talking.
import asyncio
import aiohttp
import os
import sys
import argparse
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregator
from pipecat.frames.frames import LLMMessagesFrame, EndFrame
from pipecat.services.openai import OpenAILLMService
from pipecat.services.elevenlabs import ElevenLabsTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
daily_api_key = os.getenv("DAILY_API_KEY", "")
daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")
async def main(room_url: str, token: str):
async with aiohttp.ClientSession() as session:
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
api_url=daily_api_url,
api_key=daily_api_key,
audio_in_enabled=True,
audio_out_enabled=True,
camera_out_enabled=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
transcription_enabled=True,
)
)
tts = ElevenLabsTTSService(
aiohttp_session=session,
api_key=os.getenv("ELEVENLABS_API_KEY", ""),
voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline([
transport.input(),
tma_in,
llm,
tts,
transport.output(),
tma_out,
])
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipecat Bot")
parser.add_argument("-u", type=str, help="Room URL")
parser.add_argument("-t", type=str, help="Token")
config = parser.parse_args()
asyncio.run(main(config.u, config.t))
HTTP API
To launch this bot, let’s create a bot_runner.py
that:
- Creates an API for users to send requests to.
- Launches a bot as a subprocess.
import os
import argparse
import subprocess
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParams
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from dotenv import load_dotenv
load_dotenv(override=True)
MAX_SESSION_TIME = 5 * 60
REQUIRED_ENV_VARS = [
'DAILY_API_KEY',
'OPENAI_API_KEY',
'ELEVENLABS_API_KEY',
'ELEVENLABS_VOICE_ID']
daily_rest_helper = DailyRESTHelper(
os.getenv("DAILY_API_KEY", ""),
os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
@app.post("/start_bot")
async def start_bot(request: Request) -> JSONResponse:
try:
data = await request.json()
except Exception as e:
pass
try:
params = DailyRoomParams(
properties=DailyRoomProperties()
)
room: DailyRoomObject = daily_rest_helper.create_room(params=params)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Unable to provision room {e}")
token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
if not room or not token:
raise HTTPException(
status_code=500, detail=f"Failed to get token for room: {room_url}")
try:
subprocess.Popen(
[f"python3 -m bot -u {room.url} -t {token}"],
shell=True,
bufsize=1,
cwd=os.path.dirname(os.path.abspath(__file__)))
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start subprocess: {e}")
user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME)
return JSONResponse({
"room_url": room.url,
"token": user_token,
})
if __name__ == "__main__":
for env_var in REQUIRED_ENV_VARS:
if env_var not in os.environ:
raise Exception(f"Missing environment variable: {env_var}.")
parser = argparse.ArgumentParser(description="Pipecat Bot Runner")
parser.add_argument("--host", type=str,
default=os.getenv("HOST", "0.0.0.0"), help="Host address")
parser.add_argument("--port", type=int,
default=os.getenv("PORT", 7860), help="Port number")
parser.add_argument("--reload", action="store_true",
default=False, help="Reload code on change")
config = parser.parse_args()
try:
import uvicorn
uvicorn.run(
"bot_runner:app",
host=config.host,
port=config.port,
reload=config.reload
)
except KeyboardInterrupt:
print("Pipecat runner shutting down...")
Dockerfile
Since our bot is just using Python, our Dockerfile can be quite simple:
The bot runner and bot requirements.txt
:
pipecat-ai[daily,openai,silero]
fastapi
uvicorn
python-dotenv
And finally, let’s create a .env
file with our service keys
DAILY_API_KEY=...
OPENAI_API_KEY=...
ELEVENLABS_API_KEY=...
ELEVENLABS_VOICE_ID=...
How it works
Right now, this runner is spawning bot.py
as a subprocess. When spawning the process, we pass through the transport room and token as system arguments to our bot, so it knows where to connect.
Subprocesses serve as a great way to test out your bot in the cloud without too much hassle, but depending on the size of the host machine, it will likely not hold up well under load.
Whilst some bots are just simple operators between the transport and third-party AI services (such as OpenAI), others have somewhat CPU-intensive operations, such as running and loading VAD models, so you may find you’re only able to scale this to support up to 5-10 concurrent bots.
Scaling your setup would require virtualizing your bot with it’s own set of system resources, the process of which depends on your cloud provider.
Best practices
In an ideal world, we’d recommend containerizing your bot and bot runner independently so you can deploy each without any unnecessary dependencies or models.
Most cloud providers will offer a way to deploy various images programmatically, which we explore in the various provider examples in these docs.
For the sake of simplicity defining this pattern, we’re just using one container for everything.
Build and run
We should now have a project that contains the following files:
bot.py
bot_runner.py
requirements.txt
.env
Dockerfile
You can now docker build ...
and deploy your container. Of course, you can still work with your bot in local development too:
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
python bot_runner.py --host localhost --reload