This guide walks through creating a voice AI agent that users can reach by dialing a Twilio phone number. We’ll integrate Twilio Media Streams via WebSockets with a Pipecat pipeline to create a fully functional voice AI experience.
Things you’ll need
An active Twilio account with at least one phone number
A public-facing server or a tunneling service like ngrok
API keys for speech-to-text, text-to-speech, and LLM services
Complete example code on GitHub The full source code for this example is available in the Pipecat repository.
Architecture Overview
When a user dials your Twilio phone number:
Twilio calls your server’s endpoint, which returns TwiML that establishes a WebSocket connection
Twilio sends real-time audio data over the WebSocket, and your server processes it using the Pipecat pipeline
Your Pipecat agent processes the audio which is sent to the pipeline, which then outputs audio data back to Twilio
Twilio plays this audio to the caller in real-time
Server Implementation
The server needs to:
Serve TwiML in response to Twilio’s HTTP requests
Handle WebSocket connections
Process audio with the Pipecat pipeline
Let’s look at the key components:
FastAPI Server Setup
from fastapi import FastAPI, WebSocket
from starlette.responses import HTMLResponse
app = FastAPI()
@app.post ( "/" )
async def start_call ():
return HTMLResponse(
content = open ( "templates/streams.xml" ).read(),
media_type = "application/xml"
)
@app.websocket ( "/ws" )
async def websocket_endpoint ( websocket : WebSocket):
await websocket.accept()
# Read initial WebSocket messages
start_data = ws.iter_text()
await start_data. __anext__ ()
# Second message contains the call details
call_data = json.loads( await start_data. __anext__ ())
# Extract both StreamSid and CallSid
stream_sid = call_data[ "start" ][ "streamSid" ]
call_sid = call_data[ "start" ][ "callSid" ]
# Run your Pipecat bot
await run_bot(websocket, stream_sid, call_sid, app.state.testing)
This server has two main endpoints:
POST /
- Returns TwiML instructions to Twilio
WebSocket /ws
- Handles the WebSocket connection for real-time audio
TwiML Configuration
The TwiML tells Twilio to establish a WebSocket connection with your server:
<? xml version = "1.0" encoding = "UTF-8" ?>
< Response >
< Connect >
< Stream url = "wss://<your server url>/ws" ></ Stream >
</ Connect >
< Pause length = "40" />
</ Response >
Replace <your server url>
with your server’s publicly accessible domain. The Pause
element keeps the call alive for a maximum of 40 seconds. Adjust this value based on your expected conversation length.
Pipecat Bot Implementation
The run_bot
function creates and connects all the components in the Pipecat pipeline:
async def run_bot ( websocket_client : WebSocket, stream_sid : str , call_sid : str , testing : bool ):
# Initialize the Twilio serializer with stream and call SIDs
serializer = TwilioFrameSerializer(
stream_sid = stream_sid,
call_sid = call_sid,
account_sid = os.getenv( "TWILIO_ACCOUNT_SID" , "" ),
auth_token = os.getenv( "TWILIO_AUTH_TOKEN" , "" ),
)
# Create the WebSocket transport
transport = FastAPIWebsocketTransport(
websocket = websocket_client,
params = FastAPIWebsocketParams(
audio_in_enabled = True ,
audio_out_enabled = True ,
add_wav_header = False ,
vad_analyzer = SileroVADAnalyzer(),
serializer = serializer,
),
)
# Initialize AI services
llm = OpenAILLMService( api_key = os.getenv( "OPENAI_API_KEY" ))
stt = DeepgramSTTService( api_key = os.getenv( "DEEPGRAM_API_KEY" ), audio_passthrough = True )
tts = CartesiaTTSService(
api_key = os.getenv( "CARTESIA_API_KEY" ),
voice_id = "71a7ad14-091c-4e8e-a314-022ece01c121" , # British Reading Lady
push_silence_after_stop = testing,
)
# Create the initial conversation prompt
messages = [
{
"role" : "system" ,
"content" : "You are an elementary teacher in an audio call. Your output will be converted to audio so don't include special characters in your answers. Respond to what the student said in a short short sentence." ,
},
]
# Setup the context aggregator
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# Create an audio buffer processor to capture conversation audio
audiobuffer = AudioBufferProcessor( user_continuous_stream = not testing)
# Build the pipeline
pipeline = Pipeline(
[
transport.input(), # Websocket input from client
stt, # Speech-To-Text
context_aggregator.user(),
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to client
audiobuffer, # Used to buffer the audio in the pipeline
context_aggregator.assistant(),
]
)
# Create a pipeline task with appropriate parameters
task = PipelineTask(
pipeline,
params = PipelineParams(
audio_in_sample_rate = 8000 , # Twilio uses 8kHz audio
audio_out_sample_rate = 8000 ,
allow_interruptions = True ,
),
)
# Setup event handlers
@transport.event_handler ( "on_client_connected" )
async def on_client_connected ( transport , client ):
# Start recording
await audiobuffer.start_recording()
# Kickstart the conversation
messages.append({ "role" : "system" , "content" : "Please introduce yourself to the user." })
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler ( "on_client_disconnected" )
async def on_client_disconnected ( transport , client ):
await task.cancel()
@audiobuffer.event_handler ( "on_audio_data" )
async def on_audio_data ( buffer , audio , sample_rate , num_channels ):
server_name = f "server_ { websocket_client.client.port } "
await save_audio(server_name, audio, sample_rate, num_channels)
# Run the pipeline
runner = PipelineRunner( handle_sigint = False , force_gc = True )
await runner.run(task)
Key Technical Considerations
Twilio Media Streams uses 8kHz mono audio with 16-bit PCM encoding. Make sure your pipeline is configured correctly:
task = PipelineTask(
pipeline,
params = PipelineParams(
audio_in_sample_rate = 8000 , # Twilio's audio format
audio_out_sample_rate = 8000 ,
allow_interruptions = True ,
),
)
Serialization and Call Control
The TwilioFrameSerializer
handles the protocol specifics for communicating with Twilio’s Media Streams:
serializer = TwilioFrameSerializer(
stream_sid = stream_sid,
call_sid = call_sid,
account_sid = os.getenv( "TWILIO_ACCOUNT_SID" , "" ),
auth_token = os.getenv( "TWILIO_AUTH_TOKEN" , "" ),
)
When you provide the account_sid
and auth_token
to the
TwilioFrameSerializer
, it will automatically end the call via Twilio’s REST
API when the pipeline ends. This ensures clean call termination when your bot
finishes its conversation.
Voice Activity Detection
The SileroVAD analyzer helps determine when a user has finished speaking:
params = FastAPIWebsocketParams(
audio_in_enabled = True ,
audio_out_enabled = True ,
add_wav_header = False ,
vad_analyzer = SileroVADAnalyzer(),
serializer = serializer,
)
Configuring Twilio
To set up your Twilio phone number to use your server:
Purchase a phone number in your Twilio account
Navigate to the Phone Numbers section and select your number
Under “Voice & Fax”, set the webhook for “A Call Comes In” to your server’s URL (e.g., https://your-server.com/
)
Make sure the request type is set to HTTP POST
Save your changes
If you’re using ngrok for local development, your webhook URL will look like
https://abc123.ngrok.io/
. Remember to update your TwiML template with the
correct WebSocket URL as well.
Testing Your Implementation
Local Testing Without Phone Calls
The example includes a test client that can simulate phone calls without actually using Twilio:
python server.py - t # Start server in testing mode
python client.py - u http: // localhost: 8765 - c 2 # Start 2 test clients
The -t
flag puts the server in testing mode, and the test client creates virtual clients that communicate with your server as if they were Twilio Media Streams.
Using the Phone
To test with an actual phone call:
Make sure your server is running and accessible via the internet
Configure Twilio as described above
Dial your Twilio phone number from any phone
You should hear your AI agent respond!
Ending a Conversation
There are two primary ways to end a conversation:
Automatic termination : If you provided Twilio credentials to the TwilioFrameSerializer
, the call will be ended automatically when your pipeline ends:
# End the conversation programmatically
await task.queue_frame(EndFrame())
Manual termination : You can also end the call explicitly through Twilio’s REST API. This is useful for implementing custom hang-up logic:
from twilio.rest import Client
client = Client(account_sid, auth_token)
client.calls(call_sid).update( status = "completed" )
Scaling Considerations
For production deployments, consider:
Multiple concurrent calls : Each bot instance should run in its own process to handle concurrent calls efficiently. The example server can spawn individual bot processes for each call:
proc = subprocess.Popen(
[
f "python3 -m bot_twilio --stream-sid { stream_sid } --call-sid { call_sid } "
],
shell = True ,
bufsize = 1 ,
cwd = os.path.dirname(os.path.abspath( __file__ ))
)
Error handling : Add robust error handling for network issues, service outages, etc.
Logging and monitoring : Implement detailed logging to track call quality and agent performance.
Security : Add authentication to your endpoints and use environment variables for all sensitive credentials.
Next Steps