Overview

LiveKitTransport provides real-time audio communication capabilities using LiveKit’s open-source WebRTC platform. It supports bidirectional audio streaming, data messaging, participant management, and room event handling for conversational AI applications with the flexibility of self-hosted or cloud infrastructure.

Installation

To use LiveKitTransport, install the required dependencies:
pip install "pipecat-ai[livekit]"
You’ll need a LiveKit server URL and access token for room authentication.
Get started quickly with LiveKit Cloud or self-host LiveKit for full control.

Frames

Input

  • UserAudioRawFrame - Audio data from room participants
  • LiveKitTransportMessageUrgentFrame - Data messages from participants

Output

  • OutputAudioRawFrame - Audio data to room participants
  • LiveKitTransportMessageFrame - Data messages to participants
  • LiveKitTransportMessageUrgentFrame - Urgent messages to participants

Key Features

  • Open Source Platform: Self-hosted or cloud options with full control over infrastructure
  • Data Messaging: Bidirectional data channels for application messaging
  • Participant Management: Built-in participant controls and metadata management
  • Scalable Architecture: Handles multiple concurrent rooms and participants

Usage Example

import os
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.transports.services.livekit import LiveKitTransport, LiveKitParams

async def main():
    # LiveKit connection details (see Room Creation section below)
    url = "wss://your-livekit-server.com"
    token = "your-access-token"  # Generated for specific room/participant
    room_name = "conversation-room"

    # Configure the transport
    transport = LiveKitTransport(
        url=url,
        token=token,
        room_name=room_name,
        params=LiveKitParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            vad_analyzer=SileroVADAnalyzer(),
        ),
    )

    # Your services (STT, LLM, TTS, etc.)
    # ...

    # Create pipeline
    pipeline = Pipeline([
        transport.input(),              # Receive audio from participants
        # ... your processing chain
        transport.output(),             # Send audio to participants
    ])

    # Event handlers
    @transport.event_handler("on_first_participant_joined")
    async def on_first_participant_joined(transport, participant_id):
        # Start conversation when first participant joins
        await task.queue_frame(TextFrame("Hello! How can I help you today?"))

    @transport.event_handler("on_data_received")
    async def on_data_received(transport, data, participant_id):
        # Handle text messages from participants
        message_data = json.loads(data)
        await task.queue_frames([
            TranscriptionFrame(
                user_id=participant_id,
                text=message_data["message"],
                timestamp=message_data["timestamp"]
            )
        ])

    @transport.event_handler("on_participant_disconnected")
    async def on_participant_disconnected(transport, participant_id):
        await task.cancel()  # End session when participant leaves

    # Run the pipeline
    # ...

Room Creation

LiveKit requires a url, room, and corresponding token for authentication. Pipecat’s development runner provides a helper function to create room URL, token, and room name:
from pipecat.runner.livekit import configure

async def create_room():
    # This helper creates room URL, token, and room name
    url, token, room_name = await configure()
    return url, token, room_name

Methods

LiveKitTransport provides methods for room and participant management:

Room Management

  • get_participants() - Get list of participant IDs in the room
  • send_message() - Send messages to participants
  • send_message_urgent() - Send urgent messages to participants

Audio Control

  • send_audio() - Send audio frames to the room

Participant Management

  • get_participant_metadata() - Get metadata for specific participants
  • set_metadata() - Set metadata for the local participant
  • mute_participant() - Mute a participant’s audio tracks
  • unmute_participant() - Unmute a participant’s audio tracks
For complete method signatures, parameters, and examples, see the LiveKitTransport API reference.

Example Usage

# Get participants in the room
participants = transport.get_participants()
print(f"Active participants: {participants}")

# Send a data message to all participants
await transport.send_message("Welcome to the conversation!")

# Send urgent message to specific participant
await transport.send_message_urgent("Connection issue detected", participant_id="user-123")

# Control participant audio
await transport.mute_participant("user-123")
await transport.unmute_participant("user-123")

# Set bot metadata
await transport.set_metadata('{"role": "assistant", "capabilities": ["voice", "text"]}')

Event Handling

LiveKitTransport provides event callbacks for room and participant management. Register callbacks using the @transport.event_handler() decorator:

Connection Events

  • on_connected - Connected to the LiveKit room
  • on_disconnected - Disconnected from the LiveKit room

Participant Events

  • on_first_participant_joined - First participant joins (useful for starting conversations)
  • on_participant_connected - Any participant joins the room
  • on_participant_disconnected - Participant leaves the room
  • on_participant_left - Participant left (alias for compatibility)

Audio Events

  • on_audio_track_subscribed - Audio track from participant is available
  • on_audio_track_unsubscribed - Audio track is no longer available

Communication Events

  • on_data_received - Data messages received from participants
For complete event details, parameters, and examples, see the LiveKitTransport API reference.

Example Usage

@transport.event_handler("on_participant_connected")
async def on_participant_connected(transport, participant_id):
    print(f"Participant {participant_id} joined the room")

@transport.event_handler("on_audio_track_subscribed")
async def on_audio_track_subscribed(transport, participant_id):
    print(f"Now receiving audio from {participant_id}")

@transport.event_handler("on_data_received")
async def on_data_received(transport, data, participant_id):
    message = json.loads(data.decode())
    print(f"Received message from {participant_id}: {message}")

Additional Notes

  • Deployment Flexibility: Choose between LiveKit Cloud, self-hosted, or hybrid deployments
  • Open Source: Full access to LiveKit server source code for customization
  • Scalability: Horizontal scaling with multiple LiveKit servers
  • Security: End-to-end encryption and configurable authentication
  • Platform Support: Native SDKs for web, iOS, Android, and server applications