Overview

ParallelPipeline allows you to create multiple independent processing branches that run simultaneously, sharing input and coordinating output. It’s particularly useful for multi-agent systems, parallel stream processing, and creating redundant service paths.

Each branch receives the same downstream frames, processes them independently, and the results are merged back into a single stream. System frames (like StartFrame and EndFrame) are synchronized across all branches.

Constructor Parameters

*args
List[List[FrameProcessor]]
required

Multiple lists of processors, where each list defines a parallel branch. All branches execute simultaneously when frames flow through the pipeline.

Usage Examples

Multi-Agent Conversation

Create a conversation with two AI agents that can interact with the user independently:

pipeline = Pipeline([
    transport.input(),
    ParallelPipeline(
        # Agent 1: Customer service representative
        [
            stt_1,
            context_aggregator.user_a(),
            llm_agent_1,
            tts_agent_1,
        ],
        # Agent 2: Technical specialist
        [   stt_2,
            context_aggregator.user_b(),
            llm_agent_2,
            tts_agent_2,
        ]
    ),
    transport.output(),
])

Redundant Services with Failover

Set up redundant services with automatic failover:

pipeline = Pipeline([
    transport.input(),
    stt,
    ParallelPipeline(
        # Primary LLM service
        [   gate_primary,
            primary_llm,
            error_detector,
        ],
        # Backup LLM service (used only if primary fails)
        [   gate_backup,
            backup_llm,
            fallback_processor,
        ]
    ),
    tts,
    transport.output(),
])

Cross-Branch Communication

Using Producer/Consumer processors to share data between branches:

# Create producer/consumer pair for cross-branch communication
frame_producer = ProducerProcessor(filter=is_important_frame)
frame_consumer = ConsumerProcessor(producer=frame_producer)

pipeline = Pipeline([
    transport.input(),
    ParallelPipeline(
        # Branch that generates important frames
        [
            stt,
            llm,
            tts,
            frame_producer,  # Share frames with other branch
        ],
        # Branch that consumes those frames
        [
            frame_consumer,  # Receive frames from other branch
            llm,             # Speech to Speech LLM (audio in)
        ]
    ),
    transport.output(),
])

How It Works

  1. ParallelPipeline adds special source and sink processors to each branch
  2. System frames (like StartFrame and EndFrame) are sent to all branches
  3. Other frames flow downstream to all branch sources
  4. Results from each branch are collected at the sinks
  5. The pipeline ensures EndFrames are only passed through after all branches complete