A Pipecat project will often consist of the following:
1. Bot file
E.g. bot.py. Your Pipecat bot / agent, containing all the pipelines that you
want to run in order to communicate with an end-user. A bot file may take some
command line arguments, such as a transport URL and configuration.
2. Bot runner
E.g. bot_runner.py. Typically a basic HTTP service that listens for incoming
user requests and spawns the relevant bot file in response.
You can call these files whatever you like! We use bot.py and
bot_runner.py for simplicity.
There are many ways to approach connecting users to bots. Pipecat is unopinionated about how exactly you should do this, but it’s helpful to put an idea forward.
At a very basic level, it may look something like this:
1
User requests to join session via client / app
Client initiates a HTTP request to a hosted bot runner service.
2
Bot runner handles the request
Authenticates, configures and instantiates everything necessary for the
session to commence (e.g. a new WebSocket channel, or WebRTC room, etc.)
3
Bot runner spawns bot / agent
A new bot process / VM is created for the user to connect with (passing
across any necessary configuration.) Your project may load just one bot
file, contextually swap between multiple, or launch many at once.
4
Bot instantiates and joins session via specified transport credentials
Bot initializes, connects to the session (e.g. locally or via WebSockets,
WebRTC etc) and runs your bot code.
5
Bot runner returns status to client
Once the bot is ready, the runner resolves the HTTP request with details for
the client to connect.
The majority of use-cases require a way to trigger and manage a bot session over the internet. We call these bot runners; a HTTP service that provides a gateway for spawning bots on-demand.
The anatomy of a bot runner service is entirery arbitrary, but at very least will have a method that spawns a new bot process, for example:
import uvicornfrom fastapi import FastAPI, Request, HTTPExceptionfrom fastapi.responses import JSONResponseapp = FastAPI()@app.post("/start_bot")async def start_bot(request: Request) -> JSONResponse: # ... handle / authenticate the request # ... setup the transport session # Spawn a new bot process try: #... create a new bot instance except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to start bot: {e}") # Return a URL for the user to join return JSONResponse({...})if __name__ == "__main__": uvicorn.run( "bot_runner:app", host="0.0.0.0", port=7860 )
This pseudo code defines a /start_bot/ endpoint which listens for incoming user POST requests or webhooks, then configures the session (such as creating rooms on your transport provider) and instantiates a new bot process.
A client will typically require some information regarding the newly spawned bot, such as a web address, so we also return some JSON with the necessary details.
Your transport layer is responsible for sending and receiving audio and video data over the internet.
You will have implemented a transport layer as part of your bot.py pipeline. This may be a service that you want to host and include in your deployment, or it may be a third-party service waiting for peers to connect (such as Daily, or a websocket.)
For this example, we will make use of Daily’s WebRTC transport. This will mean that our bot_runner.py will need to do some configuration when it spawns a new bot:
Create and configure a new Daily room for the session to take place in.
Issue both the bot and the user an authentication token to join the session.
Whatever you use for your transport layer, you’ll likely need to setup some environmental variables and run some custom code before spawning the agent.
A good pattern to work to is the assumption that your bot.py is an encapsulated entity and does not have any knowledge of the bot_runner.py. You should provide the bot everything it needs to operate during instantiation.
Sticking to this approach helps keep things simple and makes it easier to step through debugging (if the bot launched and something goes wrong, you know to look for errors in your bot file.)
Let’s assume we have a fully service-driven bot.py that connects to a WebRTC session, passes audio transcription to GPT4 and returns audio text-to-speech with ElevenLabs.
We’ll also use Silero voice activity detection, to better know when the user has stopped talking.
bot.py
import asyncioimport aiohttpimport osimport sysimport argparsefrom pipecat.pipeline.pipeline import Pipelinefrom pipecat.pipeline.runner import PipelineRunnerfrom pipecat.pipeline.task import PipelineParams, PipelineTaskfrom pipecat.processors.aggregators.llm_response import LLMAssistantResponseAggregator, LLMUserResponseAggregatorfrom pipecat.frames.frames import LLMMessagesFrame, EndFramefrom pipecat.services.openai.llm import OpenAILLMServicefrom pipecat.services.elevenlabs.tts import ElevenLabsTTSServicefrom pipecat.transports.services.daily import DailyParams, DailyTransportfrom pipecat.vad.silero import SileroVADAnalyzerfrom loguru import loggerfrom dotenv import load_dotenvload_dotenv(override=True)logger.remove(0)logger.add(sys.stderr, level="DEBUG")daily_api_key = os.getenv("DAILY_API_KEY", "")daily_api_url = os.getenv("DAILY_API_URL", "https://api.daily.co/v1")async def main(room_url: str, token: str): async with aiohttp.ClientSession() as session: transport = DailyTransport( room_url, token, "Chatbot", DailyParams( api_url=daily_api_url, api_key=daily_api_key, audio_in_enabled=True, audio_out_enabled=True, video_out_enabled=False, vad_analyzer=SileroVADAnalyzer(), transcription_enabled=True, ) ) tts = ElevenLabsTTSService( aiohttp_session=session, api_key=os.getenv("ELEVENLABS_API_KEY", ""), voice_id=os.getenv("ELEVENLABS_VOICE_ID", ""), ) llm = OpenAILLMService( api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") messages = [ { "role": "system", "content": "You are Chatbot, a friendly, helpful robot. Your output will be converted to audio so don't include special characters other than '!' or '?' in your answers. Respond to what the user said in a creative and helpful way, but keep your responses brief. Start by saying hello.", }, ] tma_in = LLMUserResponseAggregator(messages) tma_out = LLMAssistantResponseAggregator(messages) pipeline = Pipeline([ transport.input(), tma_in, llm, tts, transport.output(), tma_out, ]) task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): transport.capture_participant_transcription(participant["id"]) await task.queue_frames([LLMMessagesFrame(messages)]) @transport.event_handler("on_participant_left") async def on_participant_left(transport, participant, reason): await task.queue_frame(EndFrame()) @transport.event_handler("on_call_state_updated") async def on_call_state_updated(transport, state): if state == "left": await task.queue_frame(EndFrame()) runner = PipelineRunner() await runner.run(task)if __name__ == "__main__": parser = argparse.ArgumentParser(description="Pipecat Bot") parser.add_argument("-u", type=str, help="Room URL") parser.add_argument("-t", type=str, help="Token") config = parser.parse_args() asyncio.run(main(config.u, config.t))
To launch this bot, let’s create a bot_runner.py that:
Creates an API for users to send requests to.
Launches a bot as a subprocess.
bot_runner.py
import osimport argparseimport subprocessfrom pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomObject, DailyRoomProperties, DailyRoomParamsfrom fastapi import FastAPI, Request, HTTPExceptionfrom fastapi.middleware.cors import CORSMiddlewarefrom fastapi.responses import JSONResponse# Load API keys from envfrom dotenv import load_dotenvload_dotenv(override=True)# ------------ Configuration ------------ #MAX_SESSION_TIME = 5 * 60 # 5 minutes# List of require env vars our bot requiresREQUIRED_ENV_VARS = [ 'DAILY_API_KEY', 'OPENAI_API_KEY', 'ELEVENLABS_API_KEY', 'ELEVENLABS_VOICE_ID']daily_rest_helper = DailyRESTHelper( os.getenv("DAILY_API_KEY", ""), os.getenv("DAILY_API_URL", 'https://api.daily.co/v1'))# ----------------- API ----------------- #app = FastAPI()app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])# ----------------- Main ----------------- #@app.post("/start_bot")async def start_bot(request: Request) -> JSONResponse: try: # Grab any data included in the post request data = await request.json() except Exception as e: pass # Create a new Daily WebRTC room for the session to take place in try: params = DailyRoomParams( properties=DailyRoomProperties() ) room: DailyRoomObject = daily_rest_helper.create_room(params=params) except Exception as e: raise HTTPException( status_code=500, detail=f"Unable to provision room {e}") # Give the agent a token to join the session token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME) # Return an error if we were unable to create a room or a token if not room or not token: raise HTTPException( status_code=500, detail=f"Failed to get token for room: {room_url}") try: # Start a new subprocess, passing the room and token to the bot file subprocess.Popen( [f"python3 -m bot -u {room.url} -t {token}"], shell=True, bufsize=1, cwd=os.path.dirname(os.path.abspath(__file__))) except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to start subprocess: {e}") # Grab a token for the user to join with user_token = daily_rest_helper.get_token(room.url, MAX_SESSION_TIME) # Return the room url and user token back to the user return JSONResponse({ "room_url": room.url, "token": user_token, })if __name__ == "__main__": # Check for required environment variables for env_var in REQUIRED_ENV_VARS: if env_var not in os.environ: raise Exception(f"Missing environment variable: {env_var}.") parser = argparse.ArgumentParser(description="Pipecat Bot Runner") parser.add_argument("--host", type=str, default=os.getenv("HOST", "0.0.0.0"), help="Host address") parser.add_argument("--port", type=int, default=os.getenv("PORT", 7860), help="Port number") parser.add_argument("--reload", action="store_true", default=False, help="Reload code on change") config = parser.parse_args() try: import uvicorn uvicorn.run( "bot_runner:app", host=config.host, port=config.port, reload=config.reload ) except KeyboardInterrupt: print("Pipecat runner shutting down...")
Right now, this runner is spawning bot.py as a subprocess. When spawning the process, we pass through the transport room and token as system arguments to our bot, so it knows where to connect.
Subprocesses serve as a great way to test out your bot in the cloud without too much hassle, but depending on the size of the host machine, it will likely not hold up well under load.
Whilst some bots are just simple operators between the transport and third-party AI services (such as OpenAI), others have somewhat CPU-intensive operations, such as running and loading VAD models, so you may find you’re only able to scale this to support up to 5-10 concurrent bots.
Scaling your setup would require virtualizing your bot with it’s own set of system resources, the process of which depends on your cloud provider.
In an ideal world, we’d recommend containerizing your bot and bot runner independently so you can deploy each without any unnecessary dependencies or models.
Most cloud providers will offer a way to deploy various images programmatically, which we explore in the various provider examples in these docs.
For the sake of simplicity defining this pattern, we’re just using one container for everything.