Overview

FastAPIWebsocketTransport provides WebSocket support for FastAPI web applications, enabling real-time audio communication over WebSocket connections. It’s primarily designed for telephony integrations with providers like Twilio, Telnyx, and Plivo, supporting bidirectional audio streams with configurable serializers and voice activity detection.
FastAPIWebsocketTransport is best suited for telephony applications and server-side WebSocket integrations.For general client/server applications, we recommend using WebRTC-based transports for more robust network and media handling.

Installation

To use FastAPIWebsocketTransport, install the required dependencies:
pip install "pipecat-ai[websocket]"
No additional API keys are required for the transport itself, but you’ll need credentials for your chosen telephony provider.
This transport is commonly used with telephony providers. See the Frame Serializers documentation for provider-specific setup.

Frames

Input

  • InputAudioRawFrame - Raw audio data from WebSocket client
  • Frame - Other frame types based on configured serializer

Output

  • OutputAudioRawFrame - Audio data to WebSocket client (with optional WAV headers)
  • TransportMessageFrame - Application messages to client
  • TransportMessageUrgentFrame - Urgent messages to client

Key Features

  • Telephony Integration: Optimized for phone call audio streaming with major providers
  • Frame Serialization: Configurable serializers for different telephony protocols
  • Session Management: Built-in connection monitoring and timeout handling
  • Audio Timing: Simulates audio device timing for proper call flow
  • WAV Header Support: Optional WAV header generation for compatibility

Usage Example

The easiest way to use FastAPIWebsocketTransport for telephony is with Pipecat’s development runner:
import os
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.runner.types import RunnerArguments, WebSocketRunnerArguments
from pipecat.transports.network.fastapi_websocket import (
    FastAPIWebsocketParams,
    FastAPIWebsocketTransport,
)

async def run_bot(transport):
    """Your core bot logic - works with any transport."""
    # Your services (STT, LLM, TTS, etc.)
    # ...

    # Create pipeline
    pipeline = Pipeline([
        transport.input(),              # Receive audio from caller
        stt,                            # Convert speech to text
        context_aggregator.user(),      # Add user messages to context
        llm,                            # Process text with LLM
        tts,                            # Convert text to speech
        transport.output(),             # Send audio to caller
        context_aggregator.assistant(), # Add assistant responses to context
    ])

    # Event handlers
    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, client):
        # Start conversation when caller connects
        await task.queue_frames([context_aggregator.user().get_context_frame()])

    @transport.event_handler("on_client_disconnected")
    async def on_client_disconnected(transport, client):
        await task.cancel()

    # Run the pipeline
    # ...

async def bot(runner_args: RunnerArguments):
    """Entry point called by the development runner."""
    if isinstance(runner_args, WebSocketRunnerArguments):
        # Auto-detect telephony provider and create appropriate serializer
        from pipecat.runner.utils import parse_telephony_websocket

        transport_type, call_data = await parse_telephony_websocket(runner_args.websocket)

        # Create serializer based on detected provider
        if transport_type == "twilio":
            from pipecat.serializers.twilio import TwilioFrameSerializer
            serializer = TwilioFrameSerializer(
                stream_sid=call_data["stream_id"],
                call_sid=call_data["call_id"],
                account_sid=os.getenv("TWILIO_ACCOUNT_SID"),
                auth_token=os.getenv("TWILIO_AUTH_TOKEN"),
            )

        transport = FastAPIWebsocketTransport(
            websocket=runner_args.websocket,
            params=FastAPIWebsocketParams(
                audio_in_enabled=True,
                audio_out_enabled=True,
                vad_analyzer=SileroVADAnalyzer(),
                serializer=serializer,
            ),
        )
        await run_bot(transport)

if __name__ == "__main__":
    from pipecat.runner.run import main
    main()
Run your telephony bot with:
python bot.py -t twilio -x your-ngrok-domain.ngrok.io
python bot.py -t telnyx -x your-ngrok-domain.ngrok.io
python bot.py -t plivo -x your-ngrok-domain.ngrok.io
The development runner automatically:
  • Creates FastAPI server with telephony webhook endpoints
  • Sets up WebSocket endpoints for audio streaming
  • Handles provider-specific message parsing and serialization
  • Manages WebSocket connection lifecycle
The -x flag specifies your public domain (like ngrok) that telephony providers can reach for webhooks.

Manual FastAPI Implementation

For custom deployments, you can implement the FastAPI server manually:

1. FastAPI Server Setup

from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import HTMLResponse

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.post("/")
async def start_call():
    """Handle telephony provider webhook."""
    # Return TwiML/XML response that establishes WebSocket connection
    xml_response = f"""<?xml version="1.0" encoding="UTF-8"?>
    <Response>
      <Connect>
        <Stream url="wss://your-domain.com/ws"></Stream>
      </Connect>
      <Pause length="40"/>
    </Response>"""
    return HTMLResponse(content=xml_response, media_type="application/xml")

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """Handle WebSocket connections for audio streaming."""
    await websocket.accept()

    # Parse initial connection data from provider
    start_data = websocket.iter_text()
    await start_data.__anext__()
    call_data = json.loads(await start_data.__anext__())

    # Extract call information
    stream_sid = call_data["start"]["streamSid"]
    call_sid = call_data["start"]["callSid"]

    await run_bot(websocket, stream_sid, call_sid)
Websocket message parsing varies based on the telephony provider. Use the parse_telephony_websocket() utility to auto-detect and extract call data.

2. Bot Implementation

import json
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.transports.network.fastapi_websocket import (
    FastAPIWebsocketParams,
    FastAPIWebsocketTransport,
)

async def run_bot(websocket: WebSocket, stream_sid: str, call_sid: str):
    """Run the Pipecat bot for a specific call."""
    # Create serializer for telephony provider
    serializer = TwilioFrameSerializer(
        stream_sid=stream_sid,
        call_sid=call_sid,
        account_sid=os.getenv("TWILIO_ACCOUNT_SID"),
        auth_token=os.getenv("TWILIO_AUTH_TOKEN"),
    )

    # Create transport
    transport = FastAPIWebsocketTransport(
        websocket=websocket,
        params=FastAPIWebsocketParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            vad_analyzer=SileroVADAnalyzer(),
            serializer=serializer,
            session_timeout=30,  # Optional timeout
        ),
    )

    # Your pipeline setup
    # ...

    # Run the pipeline
    runner = PipelineRunner(handle_sigint=False, force_gc=True)
    await runner.run(task)

Event Handling

FastAPIWebsocketTransport provides event callbacks for connection management. Register callbacks using the @transport.event_handler() decorator:

Connection Events

  • on_client_connected - Client connects to WebSocket endpoint
  • on_client_disconnected - Client disconnects from WebSocket endpoint
  • on_session_timeout - Session timeout occurs (if configured)
For complete event details and parameters, see the FastAPIWebsocketTransport API reference.

Example Usage

@transport.event_handler("on_client_connected")
async def on_client_connected(transport, websocket):
    logger.info("Caller connected")
    # Start conversation
    await task.queue_frames([TextFrame("Hello! How can I help you today?")])

@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, websocket):
    logger.info("Call ended")
    await task.cancel()

@transport.event_handler("on_session_timeout")
async def on_session_timeout(transport, websocket):
    logger.info("Call timed out")
    # Handle timeout (e.g., play message, end call)

Telephony Integration

FastAPIWebsocketTransport works with major telephony providers through frame serializers:

Supported Providers

Provider-Specific Setup

Each provider requires specific configuration and webhook URLs. The development runner handles this automatically, or you can configure manually:
# Twilio setup
serializer = TwilioFrameSerializer(
    stream_sid="stream_id_from_webhook",
    call_sid="call_id_from_webhook",
    account_sid=os.getenv("TWILIO_ACCOUNT_SID"),
    auth_token=os.getenv("TWILIO_AUTH_TOKEN"),
)

# Telnyx setup
serializer = TelnyxFrameSerializer(
    stream_id="stream_id_from_webhook",
    call_control_id="call_control_id_from_webhook",
    api_key=os.getenv("TELNYX_API_KEY"),
)
See the Frame Serializers documentation for complete provider setup guides.

Advanced Configuration

Audio Processing

params = FastAPIWebsocketParams(
    audio_in_enabled=True,
    audio_out_enabled=True,
    audio_in_sample_rate=8000,   # Common for telephony
    audio_out_sample_rate=8000,  # Common for telephony
    add_wav_header=False,        # Depending on provider requirements
    vad_analyzer=SileroVADAnalyzer(),
)

Session Management

params = FastAPIWebsocketParams(
    session_timeout=300,  # 5 minute timeout
    # Other parameters...
)

@transport.event_handler("on_session_timeout")
async def handle_timeout(transport, websocket):
    # Play timeout message before ending call
    await task.queue_frames([
        TTSTextFrame("I haven't heard from you in a while. Goodbye!"),
        EndFrame()
    ])

Additional Notes

  • Telephony Focus: Optimized for phone call audio with 8kHz sample rates
  • Provider Integration: Works seamlessly with major telephony providers
  • Audio Timing: Simulates real audio device timing for proper call flow
  • Session Management: Built-in timeout handling for abandoned calls
  • Webhook Requirements: Requires publicly accessible endpoints for telephony providers
  • Development: Use ngrok or similar tools for local development with real phone numbers
FastAPIWebsocketTransport is the preferred choice for building phone-based voice AI applications with reliable audio streaming and provider compatibility.