Overview

WebSocket transports provide both client and server WebSocket implementations for real-time bidirectional communication. These transports support audio streaming, frame serialization, and connection management, making them ideal for prototyping and lightweight client-server applications where WebRTC might be overkill.
WebSocket transports are best suited for prototyping and controlled network environments.For production client-server applications, we recommend WebRTC-based transports for more robust network handling, NAT traversal, and media optimization.

Installation

To use WebSocket transports, install the required dependencies:
pip install "pipecat-ai[websocket]"
No additional API keys are required for WebSocket communication.
WebSocket transports use Protobuf serialization by default for efficient binary messaging, but support custom serializers for different protocols.

Frames

Input

  • InputAudioRawFrame - Raw audio data from WebSocket peer
  • Frame - Other frame types based on configured serializer

Output

  • OutputAudioRawFrame - Audio data to WebSocket peer (with optional WAV headers)
  • TransportMessageFrame - Application messages to peer
  • TransportMessageUrgentFrame - Urgent messages to peer

Key Features

  • Client & Server Support: Both WebSocket client and server implementations
  • Frame Serialization: Configurable serializers including Protobuf by default
  • Audio Timing: Simulates audio device timing for proper streaming flow
  • Session Management: Built-in connection monitoring and timeout handling
  • WAV Header Support: Optional WAV header generation for audio compatibility
  • Single Connection: Server supports one active client at a time (new connections replace existing)

Usage Example

WebSocket Server Transport

The server transport creates a WebSocket server that clients can connect to:
import os
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.serializers.protobuf import ProtobufFrameSerializer
from pipecat.transports.network.websocket_server import (
    WebsocketServerParams,
    WebsocketServerTransport,
)

async def run_websocket_server():
    # Create WebSocket server transport
    transport = WebsocketServerTransport(
        host="localhost",
        port=8765,
        params=WebsocketServerParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            vad_analyzer=SileroVADAnalyzer(),
            serializer=ProtobufFrameSerializer(),
            session_timeout=180,  # 3 minutes
        ),
    )

    # Your services (STT, LLM, TTS, etc.)
    # ...

    # Create pipeline
    pipeline = Pipeline([
        transport.input(),              # Receive data from WebSocket clients
        # ... your processing chain
        transport.output(),             # Send data to WebSocket clients
    ])

    # Event handlers
    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, websocket):
        logger.info(f"Client connected from {websocket.remote_address}")
        # Start conversation
        await task.queue_frames([context_aggregator.user().get_context_frame()])

    @transport.event_handler("on_client_disconnected")
    async def on_client_disconnected(transport, websocket):
        logger.info(f"Client disconnected: {websocket.remote_address}")
        await task.cancel()

    @transport.event_handler("on_session_timeout")
    async def on_session_timeout(transport, websocket):
        logger.info(f"Session timeout for {websocket.remote_address}")
        await task.cancel()

    # Run the pipeline
    runner = PipelineRunner()
    await runner.run(task)

WebSocket Client Transport

The client transport connects to an existing WebSocket server:
from pipecat.transports.network.websocket_client import (
    WebsocketClientParams,
    WebsocketClientTransport,
)

async def run_websocket_client():
    # Create WebSocket client transport
    transport = WebsocketClientTransport(
        uri="ws://localhost:8765",
        params=WebsocketClientParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            vad_analyzer=SileroVADAnalyzer(),
            serializer=ProtobufFrameSerializer(),
            add_wav_header=True,  # Optional WAV headers
        ),
    )

    # Your pipeline setup
    pipeline = Pipeline([
        transport.input(),
        # ... your processing chain
        transport.output(),
    ])

    # Event handlers
    @transport.event_handler("on_connected")
    async def on_connected(transport, websocket):
        logger.info("Connected to WebSocket server")

    @transport.event_handler("on_disconnected")
    async def on_disconnected(transport, websocket):
        logger.info("Disconnected from WebSocket server")
        await task.cancel()

    # Run the pipeline
    runner = PipelineRunner()
    await runner.run(task)

Combined Server Setup

For a complete application, you might run both a FastAPI web server and WebSocket server:
import asyncio
import uvicorn
from fastapi import FastAPI

app = FastAPI()

@app.post("/connect")
async def get_websocket_url():
    return {"wsUrl": "ws://localhost:8765"}

async def main():
    # Run both WebSocket bot and web server concurrently
    tasks = [
        run_websocket_server(),  # Your WebSocket bot
        uvicorn.Server(uvicorn.Config(app, host="0.0.0.0", port=7860)).serve()
    ]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

Event Handling

WebSocket transports provide event callbacks for connection management. Register callbacks using the @transport.event_handler() decorator:

Server Events

  • on_client_connected - Client connects to the server
  • on_client_disconnected - Client disconnects from the server
  • on_session_timeout - Client session times out (if configured)
  • on_websocket_ready - WebSocket server is ready to accept connections

Client Events

  • on_connected - Successfully connected to WebSocket server
  • on_disconnected - Disconnected from WebSocket server
For complete event details and parameters, see the API reference documentation for specific callback signatures and usage examples.

Example Usage

# Server event handling
@server_transport.event_handler("on_websocket_ready")
async def on_server_ready(transport):
    logger.info("WebSocket server ready for connections")

@server_transport.event_handler("on_client_connected")
async def on_client_connected(transport, websocket):
    logger.info(f"New client: {websocket.remote_address}")
    # Initialize conversation or send welcome message

# Client event handling
@client_transport.event_handler("on_connected")
async def on_connected(transport, websocket):
    logger.info("Connected to server")
    # Start sending data or audio

@client_transport.event_handler("on_disconnected")
async def on_disconnected(transport, websocket):
    logger.info("Server connection lost")
    # Handle reconnection or cleanup

Advanced Configuration

Audio Processing

# Server configuration
server_params = WebsocketServerParams(
    audio_in_enabled=True,
    audio_out_enabled=True,
    audio_out_sample_rate=24000,
    audio_out_channels=1,
    add_wav_header=False,  # Binary audio data
    vad_analyzer=SileroVADAnalyzer(),
)

# Client configuration
client_params = WebsocketClientParams(
    audio_in_enabled=True,
    audio_out_enabled=True,
    add_wav_header=True,   # WAV headers for compatibility
    serializer=ProtobufFrameSerializer(),
)

Session Management

params = WebsocketServerParams(
    session_timeout=300,  # 5 minute timeout
    # ... other parameters
)

@transport.event_handler("on_session_timeout")
async def handle_timeout(transport, websocket):
    logger.info("Session timed out, cleaning up")
    await task.cancel()

Use Cases

Server Transport

  • Voice AI Applications: Host voice assistants that clients connect to
  • Real-time Audio Processing: Server-side audio analysis and response
  • Prototyping: Quick setup for testing conversational AI flows
  • Controlled Environments: Internal networks where WebRTC complexity isn’t needed

Client Transport

  • Bot Clients: Connect bots to existing WebSocket servers
  • Testing Tools: Automated testing of WebSocket-based services
  • Integration: Connect Pipecat to third-party WebSocket APIs
  • Monitoring: Client bots that connect to monitor or interact with services