Skip to main content

Overview

The terms “aggregators” and “filters” are overloaded in Pipecat and can refer to different components depending on the context. This document provides a high-level overview of the low-level text utilities often used by various pipeline services, like the TTS. These utilities operate on streaming text, allowing for dynamic processing, transformation, and aggregation of text data as it flows through the system. They are essentially text in -> text out (but maybe with some metadata about the text too).

Aggregators

Aggregators are components that collect and combine text data over time, often buffering input until certain conditions are met (like sentence boundaries or specific patterns). They can modify, enhance, or restructure the text before passing it along. Examples include:
  • SimpleTextAggregator: Buffers text until sentence boundaries are detected. This is the default aggregator used by most TTS services.
  • SkipTagsAggregator: Buffers text until sentence boundaries, while skipping over specified tags so that sentences are not prematurely detected due to characters in between those tags. This is the default aggregator used by Cartesia and Rime in order to skip over custom spell tags.
  • PatternPairAggregator: Buffers text until either a sentence boundary is detected or a complete pattern pair (like XML tags or custom delimiters) is found. Patterns found can either be removed, left in but trigger a registered callback, or aggregated separately. Useful for voice switching, structured content processing, and extracting metadata from LLM outputs.

Usage Pattern

After initial setup, aggregators are used by repeatedly calling their aggregate() method with incoming text chunks. The aggregator processes the text according to its logic and returns aggregated text when appropriate. If no complete aggregation is ready, it will return None, otherwise it will return an Aggregation object containing the aggregated text along with a type string meant to convey the nature of the aggregation (e.g., “sentence”, “xml”, etc.).
from pipecat.utils.text.simple_text_aggregator import SimpleTextAggregator
aggregator = SimpleTextAggregator()
while streaming_text:
    chunk = get_next_text_chunk()
    aggregation = aggregator.aggregate(chunk)
    if aggregation:
        process_aggregated_text(aggregation.text, aggregation.type)

Custom Aggregators

You can create custom aggregators by subclassing the BaseTextAggregator class and implementing your own aggregation logic in the aggregate() method. This allows for tailored text processing to meet specific application needs.

Filters

Filters are components that process text data in a streaming fashion, transforming or modifying it as it passes through. Unlike aggregators, filters typically do not buffer text but instead operate on each chunk of text individually. Currently, the only built-in filter is the MarkdownTextFilter, which processes markdown syntax in streaming text, converting it into TTS-friendly plain text.

Usage Pattern

Filters are used by calling their filter() method with incoming text chunks. The filter processes the text and returns the transformed text immediately.
from pipecat.utils.text.markdown_text_filter import MarkdownTextFilter
filter = MarkdownTextFilter()
while streaming_text:
    chunk = get_next_text_chunk()
    filtered_text = filter.filter(chunk)
    process_filtered_text(filtered_text)

Custom Filters

You can create custom filters by subclassing the BaseTextFilter class and implementing your own filtering logic in the filter() method. This allows for specialized text transformations to suit your application’s requirements.