Skip to main content

Overview

MossRetrievalService performs low-latency semantic retrieval against Moss vector indexes and injects the relevant passages into your Pipecat agent’s LLM context. Its query() method returns a MossIndexProcessor that you place in the pipeline to augment each user turn with context-aware search results.

Source Repository

Source code, examples, and issues for the Moss integration

PyPI Package

The pipecat-moss package on PyPI

Moss

Learn more about Moss semantic retrieval

Moss Portal

Create a project and get your project ID and key

Installation

This is a community-maintained package distributed separately from pipecat-ai:
pip install pipecat-moss

Prerequisites

Moss Project Setup

Before using the Moss retrieval service, you need:
  1. Moss Project: Create a project in the Moss Portal
  2. Project Credentials: Get your project ID and project key from the portal
  3. A Moss Index: Create and populate an index before running your pipeline (see the source repository for the index-creation example)

Required Environment Variables

  • MOSS_PROJECT_ID: Your Moss project ID
  • MOSS_PROJECT_KEY: Your Moss project key

Configuration

MossRetrievalService

project_id
str
default:"None"
Moss project ID. Can be supplied via the MOSS_PROJECT_ID environment variable.
project_key
str
default:"None"
Moss project key. Can be supplied via the MOSS_PROJECT_KEY environment variable.
system_prompt
str
Prefix prepended to the retrieved documents before they are added to the LLM context.
Load an index before the pipeline runs with the awaitable load_index(index_name) method, then create a pipeline processor with query(index_name, *, top_k=5, alpha=0.8).

query() parameters

index_name
str
required
Name of the Moss index to retrieve from.
top_k
int
default:"5"
Number of documents to retrieve per query.
alpha
float
default:"0.8"
Blends semantic vs. keyword scoring. 0.0 is keyword-only and 1.0 is semantic-only.

Usage

import os
import asyncio
from pipecat_moss import MossRetrievalService
from pipecat.pipeline.pipeline import Pipeline

moss_service = MossRetrievalService(
    project_id=os.getenv("MOSS_PROJECT_ID"),
    project_key=os.getenv("MOSS_PROJECT_KEY"),
    system_prompt="Relevant passages from the Moss knowledge base:\n\n",
)

async def setup_indexes():
    await moss_service.load_index(os.getenv("MOSS_INDEX_NAME"))

asyncio.run(setup_indexes())

pipeline = Pipeline([
    transport.input(),               # audio/user input
    stt,                             # speech to text
    context_aggregator.user(),       # add user text to context
    moss_service.query(os.getenv("MOSS_INDEX_NAME"), top_k=5, alpha=0.8),  # retrieve docs
    llm,                             # LLM generates response
    tts,                             # TTS synthesis
    transport.output(),              # stream audio back to user
    context_aggregator.assistant(),  # store assistant response
])
setup_indexes() must be awaited before the pipeline starts so the service can load the Moss index. See the source repository for a complete working example.

Compatibility

Tested with Pipecat v0.0.99. Check the source repository for the latest tested version and changelog.