The RTVIProcessor manages bidirectional communication between clients and your Pipecat application. It processes client messages, handles service configuration, executes actions, and coordinates function calls.

Initialization

Add the RTVIProcessor to your pipeline:

from pipecat.processors.rtvi import RTVIProcessor, RTVIConfig, RTVIServiceConfig

# Create the RTVIProcessor
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))

# Add to pipeline
pipeline = Pipeline([
    transport.input(),
    rtvi,
    stt,
    # ... other processors ...
    transport.output()
])

Readiness Protocol

Client Ready State

Clients indicate readiness by sending a client-ready message, triggering the on_client_ready event in the processor:

@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
    # Handle client ready state
    await rtvi.set_bot_ready()
    # Initialize conversation
    await task.queue_frames([...])

Bot Ready State

The server must mark the bot as ready before it can process client messages:

await rtvi.set_bot_ready()

When marked ready, the bot sends a response containing:

  • RTVI protocol version
  • Current service configuration
  • Available actions

Services

Services represent configurable components of your application that clients can interact with.

Registering Services

# 1. Define option handler
async def handle_voice_option(processor, service, option):
    voice_id = option.value
    # Apply configuration change
    logger.info(f"Voice ID updated to: {voice_id}")

# 2. Create RTVIService
voice_service = RTVIService(
    name="voice",
    options=[
        RTVIServiceOption(
            name="voice_id",
            type="string",
            handler=handle_voice_option
        )
    ]
)

# 3. Register with processor
rtvi.register_service(voice_service)

Option Types

Services support multiple data types for configuration:

RTVIServiceOption(
    name="temperature",
    type="number",  # number, string, bool, array, object
    handler=handle_temperature
)

Option handlers receive:

  • The processor instance
  • The service name
  • The option configuration with new value

Actions

Actions are server-side functions that clients can trigger with arguments.

Registering Actions

# 1. Define handler function
async def handle_print_message(processor, service, arguments):
    message = arguments.get("message", "Default message")
    logger.info(f"Print action triggered with message: {message}")
    return True

# 2. Create and register RTVIAction
print_action = RTVIAction(
    service="conversation",
    action="print_message",
    arguments=[
        RTVIActionArgument(name="message", type="string")
    ],
    result="bool",
    handler=handle_print_message
)
rtvi.register_action(print_action)

Action Arguments

Actions can accept typed arguments from clients:

search_action = RTVIAction(
    service="knowledge",
    action="search",
    arguments=[
        RTVIActionArgument(name="query", type="string"),
        RTVIActionArgument(name="limit", type="number")
    ],
    result="array",
    handler=handle_search
)

Function Calls

Handle LLM function calls with client interaction:

await processor.handle_function_call(
    function_name=function_name,
    tool_call_id=tool_call_id,
    arguments=arguments,
    llm=llm,
    context=context,
    result_callback=result_callback
)

The function call process:

  1. LLM requests a function call
  2. Processor notifies client with llm-function-call message
  3. Client executes function and returns result
  4. Result is passed back to LLM via FunctionCallResultFrame
  5. Conversation continues

Error Handling

Send error messages to clients:

# General error
await processor.send_error("Invalid configuration")

# Request-specific error
await processor._send_error_response(request_id, "Invalid action arguments")

Error categories:

  • Configuration errors
  • Action execution errors
  • Function call errors
  • Protocol errors
  • Fatal and non-fatal errors

Bot Control

Manage bot state and handle interruptions:

# Set bot as ready
await processor.set_bot_ready()

# Handle interruptions
await processor.interrupt_bot()

Custom Messaging

Send custom messages from server to client:

from pipecat.frames.frames import RTVIServerMessageFrame

# Send a custom message
frame = RTVIServerMessageFrame(
    data={
        "type": "custom-event",
        "payload": {"key": "value"}
    }
)
await rtvi.push_frame(frame)

The client receives these messages through the onServerMessage callback or serverMessage event.