Skip to main content

Overview

Pipecat provides a straightforward way to collect conversation transcriptions using turn events. When a user or assistant turn ends, the corresponding event includes the complete transcript for that turn. The key events for transcription collection are:
  • on_user_turn_stopped - Provides the user’s complete transcript via UserTurnStoppedMessage
  • on_assistant_turn_stopped - Provides the assistant’s complete transcript via AssistantTurnStoppedMessage

Basic Example

from pipecat.processors.aggregators.llm_response_universal import (
    LLMContextAggregatorPair,
    UserTurnStoppedMessage,
    AssistantTurnStoppedMessage,
)

# Create context aggregator
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)

# Handle user transcriptions
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
    print(f"[USER] {message.content}")

# Handle assistant transcriptions
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
    print(f"[ASSISTANT] {message.content}")

Saving Transcripts to a File

import json
from datetime import datetime

transcript_log = []

@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
    transcript_log.append({
        "role": "user",
        "content": message.content,
        "timestamp": message.timestamp,
        "user_id": message.user_id,
    })

@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
    transcript_log.append({
        "role": "assistant",
        "content": message.content,
        "timestamp": message.timestamp,
    })

# Save transcript when session ends
async def save_transcript():
    with open(f"transcript_{datetime.now().isoformat()}.json", "w") as f:
        json.dump(transcript_log, f, indent=2)

Sending Transcripts to an External Service

import aiohttp

async def send_to_service(role: str, content: str, timestamp: str):
    async with aiohttp.ClientSession() as session:
        await session.post(
            "https://api.example.com/transcripts",
            json={"role": role, "content": content, "timestamp": timestamp}
        )

@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
    await send_to_service("user", message.content, message.timestamp)

@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
    await send_to_service("assistant", message.content, message.timestamp)

Message Types

For details on UserTurnStoppedMessage and AssistantTurnStoppedMessage fields, see Turn Events - Message Types.