Skip to main content
Pipecat’s architecture is made up of a Pipeline, FrameProcessors, and Frames. See the Core Concepts for a full review. From that architecture, recall that FrameProcessors are the workers in the pipeline that receive frames and complete actions based on the frames received. Pipecat comes with many FrameProcessors built in. These consist of services, like OpenAILLMService or CartesiaTTSService, utilities, like UserIdleProcessor, and other things. Largely, you can build most of your application with these built-in FrameProcessors, but commonly, your application code may require custom frame processing logic. For example, you may want to perform an action as a result of a frame that’s pushed in the pipeline.

Example: MetricsFrame logger

This custom FrameProcessor format and logs MetricsFrames:
class MetricsFrameLogger(FrameProcessor):
    """MetricsFrameLogger formats and logs all MetericsFrames"""

    def __init__(self):
        super().__init__()

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)

        if isinstance(frame, MetricsFrame):
            logger.info(f"{frame.name}\n    {format_metrics(frame.data)}")
            await self.push_frame(frame, direction)

        # ALWAYS push all frames
        else:
            # SUPER IMPORTANT: always push every frame!
            await self.push_frame(frame, direction)
This frame processor looks for MetricsFrames. When it sees one, it formats the data and logs it. It uses this format_metrics function:
def format_metrics(metrics, indent=0):
    lines = []
    tab = "\t" * indent

    for metric in metrics:
        lines.append(tab + type(metric).__name__)
        for field, value in vars(metric).items():
            if hasattr(value, "__dict__") and not isinstance(
                value, (str, int, float, bool, type(None))
            ):
                lines.append(f"{tab}\t{field}={type(value).__name__}")
                for k, v in vars(value).items():
                    lines.append(f"{tab}\t\t{k}={repr(v)}")
            else:
                lines.append(f"{tab}\t{field}={repr(value)}")

    return "\n".join(lines)
See this working example using the MetricsFrameLogger FrameProcessor

Add to a Pipeline

# Create and initialize the custom FrameProcessor
metrics_frame_processor = MetricsFrameLogger()

pipeline = Pipeline(
    [
        transport.input(),
        stt,
        context_aggregator.user(),
        llm,
        tts,
        transport.output(),
        context_aggregator.assistant(),
        metrics_frame_processor,  # Our custom FrameProcessor that pretty prints metrics frames
    ]
)

With this positioning, the `MetricsFrameLogger` FrameProcessor will receive every MetericsFrame in the pipeline.

## Key Requirements

FrameProcessors must inherit from the base `FrameProcessor` class. This ensures that your custom FrameProcessor will correctly handle frames like `StartFrame`, `EndFrame`, `StartInterruptionFrame` without having to write custom logic for those frames. This inheritance also provides it with the ability to `process_frame()` and `push_frame()`:

- **`process_frame()`** is what allows the FrameProcessor to receive frames and add custom conditional logic based on the frames that are received.
- **`push_frame()`** allows the FrameProcessor to push frames to the pipeline. Normally, frames are pushed DOWNSTREAM, but based on which processors need the output, you can also push UPSTREAM or in both directions.

### Essential Implementation Details

To ensure proper base class inheritance, it's critical to include:

1. **`super().__init__()`** in your `__init__` method
2. **`await super().process_frame(frame, direction)`** in your `process_frame()` method

```python
class MyCustomProcessor(FrameProcessor):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)  # ✅ Required
        # Your initialization code here

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)  # ✅ Required

        # Your custom frame processing logic here
        if isinstance(frame, SomeSpecificFrame):
            # Handle the frame
            pass

        await self.push_frame(frame, direction)  # ✅ Required - pass frame through

Critical Responsibility: Frame Forwarding

FrameProcessors receive all frames that are pushed through the pipeline. This gives them a lot of power, but also a great responsibility. Critically, they must push all frames through the pipeline; if they don’t, they block frames from moving through the Pipeline, which will cause issues in how your application functions. As well as formatting and logging MetricsFrames, MetricsFrameLogger also has an await self.push_frame(frame, direction) which pushes the frame through to the next processor in the pipeline.

Frame Direction

When pushing frames, you can specify the direction:
# Push downstream (default)
await self.push_frame(frame, FrameDirection.DOWNSTREAM)

# Push upstream
await self.push_frame(frame, FrameDirection.UPSTREAM)
Most custom FrameProcessors will push frames downstream, but upstream can be useful for sending control frames or error notifications back up the pipeline.

Best Practices

  1. Always call the parent methods: Use super().__init__() and await super().process_frame()
  2. Forward all frames: Make sure every frame is pushed through with await self.push_frame(frame, direction)
  3. Handle frames conditionally: Use isinstance() checks to handle specific frame types
  4. Use proper error handling: Wrap risky operations in try/catch blocks
  5. Position carefully in pipeline: Consider where in the pipeline your processor needs to be to receive the right frames
With these patterns, you can create powerful custom FrameProcessors that extend Pipecat’s capabilities for your specific use case.