This guide walks through creating a voice AI agent that users can reach by dialing a Twilio phone number. We’ll integrate Twilio Media Streams via WebSockets with a Pipecat pipeline to create a fully functional voice AI experience.

Things you’ll need

  • An active Twilio account with at least one phone number
  • A public-facing server or a tunneling service like ngrok
  • API keys for speech-to-text, text-to-speech, and LLM services

Complete example code on GitHub

The full source code for this example is available in the Pipecat repository.

Architecture Overview

When a user dials your Twilio phone number:

  1. Twilio calls your server’s endpoint, which returns TwiML that establishes a WebSocket connection
  2. Twilio sends real-time audio data over the WebSocket, and your server processes it using the Pipecat pipeline
  3. Your Pipecat agent processes the audio which is sent to the pipeline, which then outputs audio data back to Twilio
  4. Twilio plays this audio to the caller in real-time

Server Implementation

The server needs to:

  1. Serve TwiML in response to Twilio’s HTTP requests
  2. Handle WebSocket connections
  3. Process audio with the Pipecat pipeline

Let’s look at the key components:

FastAPI Server Setup

from fastapi import FastAPI, WebSocket
from starlette.responses import HTMLResponse

app = FastAPI()

@app.post("/")
async def start_call():
    return HTMLResponse(
        content=open("templates/streams.xml").read(),
        media_type="application/xml"
    )

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    # Read initial WebSocket messages
    start_data = ws.iter_text()
    await start_data.__anext__()

    # Second message contains the call details
    call_data = json.loads(await start_data.__anext__())

    # Extract both StreamSid and CallSid
    stream_sid = call_data["start"]["streamSid"]
    call_sid = call_data["start"]["callSid"]

    # Run your Pipecat bot
    await run_bot(websocket, stream_sid, call_sid, app.state.testing)

This server has two main endpoints:

  • POST / - Returns TwiML instructions to Twilio
  • WebSocket /ws - Handles the WebSocket connection for real-time audio

TwiML Configuration

The TwiML tells Twilio to establish a WebSocket connection with your server:

<?xml version="1.0" encoding="UTF-8"?>
<Response>
  <Connect>
    <Stream url="wss://<your server url>/ws"></Stream>
  </Connect>
  <Pause length="40"/>
</Response>

Replace <your server url> with your server’s publicly accessible domain. The Pause element keeps the call alive for a maximum of 40 seconds. Adjust this value based on your expected conversation length.

Pipecat Bot Implementation

The run_bot function creates and connects all the components in the Pipecat pipeline:

async def run_bot(websocket_client: WebSocket, stream_sid: str, call_sid: str, testing: bool):
    # Initialize the Twilio serializer with stream and call SIDs
    serializer = TwilioFrameSerializer(
        stream_sid=stream_sid,
        call_sid=call_sid,
        account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
        auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
    )

    # Create the WebSocket transport
    transport = FastAPIWebsocketTransport(
        websocket=websocket_client,
        params=FastAPIWebsocketParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            add_wav_header=False,
            vad_analyzer=SileroVADAnalyzer(),
            serializer=serializer,
        ),
    )

    # Initialize AI services
    llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
    stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"), audio_passthrough=True)
    tts = CartesiaTTSService(
        api_key=os.getenv("CARTESIA_API_KEY"),
        voice_id="71a7ad14-091c-4e8e-a314-022ece01c121",  # British Reading Lady
        push_silence_after_stop=testing,
    )

    # Create the initial conversation prompt
    messages = [
        {
            "role": "system",
            "content": "You are an elementary teacher in an audio call. Your output will be converted to audio so don't include special characters in your answers. Respond to what the student said in a short short sentence.",
        },
    ]

    # Setup the context aggregator
    context = OpenAILLMContext(messages)
    context_aggregator = llm.create_context_aggregator(context)

    # Create an audio buffer processor to capture conversation audio
    audiobuffer = AudioBufferProcessor(user_continuous_stream=not testing)

    # Build the pipeline
    pipeline = Pipeline(
        [
            transport.input(),  # Websocket input from client
            stt,  # Speech-To-Text
            context_aggregator.user(),
            llm,  # LLM
            tts,  # Text-To-Speech
            transport.output(),  # Websocket output to client
            audiobuffer,  # Used to buffer the audio in the pipeline
            context_aggregator.assistant(),
        ]
    )

    # Create a pipeline task with appropriate parameters
    task = PipelineTask(
        pipeline,
        params=PipelineParams(
            audio_in_sample_rate=8000,  # Twilio uses 8kHz audio
            audio_out_sample_rate=8000,
            allow_interruptions=True,
        ),
    )

    # Setup event handlers
    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, client):
        # Start recording
        await audiobuffer.start_recording()
        # Kickstart the conversation
        messages.append({"role": "system", "content": "Please introduce yourself to the user."})
        await task.queue_frames([context_aggregator.user().get_context_frame()])

    @transport.event_handler("on_client_disconnected")
    async def on_client_disconnected(transport, client):
        await task.cancel()

    @audiobuffer.event_handler("on_audio_data")
    async def on_audio_data(buffer, audio, sample_rate, num_channels):
        server_name = f"server_{websocket_client.client.port}"
        await save_audio(server_name, audio, sample_rate, num_channels)

    # Run the pipeline
    runner = PipelineRunner(handle_sigint=False, force_gc=True)
    await runner.run(task)

Key Technical Considerations

Audio Format and Sample Rate

Twilio Media Streams uses 8kHz mono audio with 16-bit PCM encoding. Make sure your pipeline is configured correctly:

task = PipelineTask(
    pipeline,
    params=PipelineParams(
        audio_in_sample_rate=8000,  # Twilio's audio format
        audio_out_sample_rate=8000,
        allow_interruptions=True,
    ),
)

Serialization and Call Control

The TwilioFrameSerializer handles the protocol specifics for communicating with Twilio’s Media Streams:

serializer = TwilioFrameSerializer(
    stream_sid=stream_sid,
    call_sid=call_sid,
    account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
    auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
)

When you provide the account_sid and auth_token to the TwilioFrameSerializer, it will automatically end the call via Twilio’s REST API when the pipeline ends. This ensures clean call termination when your bot finishes its conversation.

Voice Activity Detection

The SileroVAD analyzer helps determine when a user has finished speaking:

params=FastAPIWebsocketParams(
    audio_in_enabled=True,
    audio_out_enabled=True,
    add_wav_header=False,
    vad_analyzer=SileroVADAnalyzer(),
    serializer=serializer,
)

Configuring Twilio

To set up your Twilio phone number to use your server:

  1. Purchase a phone number in your Twilio account
  2. Navigate to the Phone Numbers section and select your number
  3. Under “Voice & Fax”, set the webhook for “A Call Comes In” to your server’s URL (e.g., https://your-server.com/)
  4. Make sure the request type is set to HTTP POST
  5. Save your changes

If you’re using ngrok for local development, your webhook URL will look like https://abc123.ngrok.io/. Remember to update your TwiML template with the correct WebSocket URL as well.

Testing Your Implementation

Local Testing Without Phone Calls

The example includes a test client that can simulate phone calls without actually using Twilio:

python server.py -t  # Start server in testing mode
python client.py -u http://localhost:8765 -c 2  # Start 2 test clients

The -t flag puts the server in testing mode, and the test client creates virtual clients that communicate with your server as if they were Twilio Media Streams.

Using the Phone

To test with an actual phone call:

  1. Make sure your server is running and accessible via the internet
  2. Configure Twilio as described above
  3. Dial your Twilio phone number from any phone
  4. You should hear your AI agent respond!

Ending a Conversation

There are two primary ways to end a conversation:

  1. Automatic termination: If you provided Twilio credentials to the TwilioFrameSerializer, the call will be ended automatically when your pipeline ends:
# End the conversation programmatically
await task.queue_frame(EndFrame())
  1. Manual termination: You can also end the call explicitly through Twilio’s REST API. This is useful for implementing custom hang-up logic:
from twilio.rest import Client

client = Client(account_sid, auth_token)
client.calls(call_sid).update(status="completed")

Scaling Considerations

For production deployments, consider:

  1. Multiple concurrent calls: Each bot instance should run in its own process to handle concurrent calls efficiently. The example server can spawn individual bot processes for each call:
proc = subprocess.Popen(
    [
        f"python3 -m bot_twilio --stream-sid {stream_sid} --call-sid {call_sid}"
    ],
    shell=True,
    bufsize=1,
    cwd=os.path.dirname(os.path.abspath(__file__))
)
  1. Error handling: Add robust error handling for network issues, service outages, etc.

  2. Logging and monitoring: Implement detailed logging to track call quality and agent performance.

  3. Security: Add authentication to your endpoints and use environment variables for all sensitive credentials.

Next Steps