Pipecat’s architecture is made up of a Pipeline, FrameProcessors, and Frames. See the Core Concepts for a full review. From that architecture, recall that FrameProcessors are the workers in the pipeline that receive frames and complete actions based on the frames received.

Pipecat comes with many FrameProcessors built in. These consist of services, like OpenAILLMService or CartesiaTTSService, utilities, like UserIdleProcessor, and other things. Largely, you can build most of your application with these built-in FrameProcessors, but commonly, your application code may require custom frame processing logic. For example, you may want to perform an action as a result of a frame that’s pushed in the pipeline.

Example: ImageSyncAggregator

Let’s look at an example custom FrameProcessor that synchronizes images with bot speech:

class ImageSyncAggregator(FrameProcessor):
    def __init__(self, speaking_path: str, waiting_path: str):
        super().__init__()
        self._speaking_image = Image.open(speaking_path)
        self._speaking_image_format = self._speaking_image.format
        self._speaking_image_bytes = self._speaking_image.tobytes()

        self._waiting_image = Image.open(waiting_path)
        self._waiting_image_format = self._waiting_image.format
        self._waiting_image_bytes = self._waiting_image.tobytes()

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)

        if isinstance(frame, BotStartedSpeakingFrame):
            await self.push_frame(
                OutputImageRawFrame(
                    image=self._speaking_image_bytes,
                    size=(1024, 1024),
                    format=self._speaking_image_format,
                )
            )

        elif isinstance(frame, BotStoppedSpeakingFrame):
            await self.push_frame(
                OutputImageRawFrame(
                    image=self._waiting_image_bytes,
                    size=(1024, 1024),
                    format=self._waiting_image_format,
                )
            )

        await self.push_frame(frame)

This example custom FrameProcessor looks for BotStartedSpeakingFrame and BotStoppedSpeakingFrame. When it sees a BotStartedSpeakingFrame, it will show an image that says the bot is speaking. When it sees a BotStoppedSpeakingFrame, it will show an image that says the bot is not speaking.

See this working example using the ImageSyncAggregator FrameProcessor

Adding to a Pipeline

This custom FrameProcessor can be added to a Pipeline just before the transport output:

# Create and initialize the custom FrameProcessor
image_sync_aggregator = ImageSyncAggregator(
    os.path.join(os.path.dirname(__file__), "assets", "speaking.png"),
    os.path.join(os.path.dirname(__file__), "assets", "waiting.png"),
)

pipeline = Pipeline(
    [
        transport.input(),
        stt,
        context_aggregator.user(),
        llm,
        tts,
        image_sync_aggregator,  # Our custom FrameProcessor
        transport.output(),
        context_aggregator.assistant(),
    ]
)

With this positioning, the ImageSyncAggregator FrameProcessor will receive the BotStartedSpeakingFrame and BotStoppedSpeakingFrame outputted by the TTS processor and then push its own frame—OutputImageRawFrame—to the output transport.

Key Requirements

FrameProcessors must inherit from the base FrameProcessor class. This ensures that your custom FrameProcessor will correctly handle frames like StartFrame, EndFrame, StartInterruptionFrame without having to write custom logic for those frames. This inheritance also provides it with the ability to process_frame() and push_frame():

  • process_frame() is what allows the FrameProcessor to receive frames and add custom conditional logic based on the frames that are received.
  • push_frame() allows the FrameProcessor to push frames to the pipeline. Normally, frames are pushed DOWNSTREAM, but based on which processors need the output, you can also push UPSTREAM or in both directions.

Essential Implementation Details

To ensure proper base class inheritance, it’s critical to include:

  1. super().__init__() in your __init__ method
  2. await super().process_frame(frame, direction) in your process_frame() method
class MyCustomProcessor(FrameProcessor):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)  # ✅ Required
        # Your initialization code here

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)  # ✅ Required

        # Your custom frame processing logic here
        if isinstance(frame, SomeSpecificFrame):
            # Handle the frame
            pass

        await self.push_frame(frame)  # ✅ Required - pass frame through

Critical Responsibility: Frame Forwarding

FrameProcessors receive all frames that are pushed through the pipeline. This gives them a lot of power, but also a great responsibility. Critically, they must push all frames through the pipeline; if they don’t, they block frames from moving through the Pipeline, which will cause issues in how your application functions.

You can see this at work in the ImageSyncAggregator’s process_frame() method. It handles both bot speaking frames and also has an await self.push_frame(frame) which pushes the frame through to the next processor in the pipeline.

Frame Direction

When pushing frames, you can specify the direction:

# Push downstream (default)
await self.push_frame(frame)
await self.push_frame(frame, FrameDirection.DOWNSTREAM)

# Push upstream
await self.push_frame(frame, FrameDirection.UPSTREAM)

Most custom FrameProcessors will push frames downstream, but upstream can be useful for sending control frames or error notifications back up the pipeline.

Best Practices

  1. Always call the parent methods: Use super().__init__() and await super().process_frame()
  2. Forward all frames: Make sure every frame is pushed through with await self.push_frame(frame)
  3. Handle frames conditionally: Use isinstance() checks to handle specific frame types
  4. Use proper error handling: Wrap risky operations in try/catch blocks
  5. Position carefully in pipeline: Consider where in the pipeline your processor needs to be to receive the right frames

With these patterns, you can create powerful custom FrameProcessors that extend Pipecat’s capabilities for your specific use case.