> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Custom FrameProcessor

> Learn how to write your own custom FrameProcessor

Pipecat's architecture is made up of a Pipeline, FrameProcessors, and Frames. See the [Core Concepts](/pipecat/learn/pipeline) for a full review. From that architecture, recall that FrameProcessors are the workers in the pipeline that receive frames and complete actions based on the frames received.

Pipecat comes with many FrameProcessors built in. These consist of services, like `OpenAILLMService` or `CartesiaTTSService`, utilities, like `LLMTextProcessor`, and other things. Largely, you can build most of your application with these built-in FrameProcessors, but commonly, your application code may require custom frame processing logic. For example, you may want to perform an action as a result of a frame that's pushed in the pipeline.

## Example: MetricsFrame logger

This custom FrameProcessor format and logs MetricsFrames:

```python theme={null}
class MetricsFrameLogger(FrameProcessor):
    """MetricsFrameLogger formats and logs all MetericsFrames"""

    def __init__(self):
        super().__init__()

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)

        if isinstance(frame, MetricsFrame):
            logger.info(f"{frame.name}\n    {format_metrics(frame.data)}")
            await self.push_frame(frame, direction)

        # ALWAYS push all frames
        else:
            # SUPER IMPORTANT: always push every frame!
            await self.push_frame(frame, direction)
```

This frame processor looks for `MetricsFrames`. When it sees one, it formats the data and logs it.

It uses this `format_metrics` function:

```python theme={null}
def format_metrics(metrics, indent=0):
    lines = []
    tab = "\t" * indent

    for metric in metrics:
        lines.append(tab + type(metric).__name__)
        for field, value in vars(metric).items():
            if hasattr(value, "__dict__") and not isinstance(
                value, (str, int, float, bool, type(None))
            ):
                lines.append(f"{tab}\t{field}={type(value).__name__}")
                for k, v in vars(value).items():
                    lines.append(f"{tab}\t\t{k}={repr(v)}")
            else:
                lines.append(f"{tab}\t{field}={repr(value)}")

    return "\n".join(lines)
```

<Note>
  See this [working
  example](https://github.com/pipecat-ai/pipecat/blob/main/examples/features/features-custom-frame-processor.py)
  using the `MetricsFrameLogger` FrameProcessor
</Note>

## Add to a Pipeline

```python theme={null}
# Create and initialize the custom FrameProcessor
metrics_frame_processor = MetricsFrameLogger()

pipeline = Pipeline(
    [
        transport.input(),
        stt,
        context_aggregator.user(),
        llm,
        tts,
        transport.output(),
        context_aggregator.assistant(),
        metrics_frame_processor,  # Our custom FrameProcessor that pretty prints metrics frames
    ]
)
```

With this positioning, the `MetricsFrameLogger` FrameProcessor will receive every MetericsFrame in the pipeline.

## Key Requirements

FrameProcessors must inherit from the base `FrameProcessor` class. This ensures that your custom FrameProcessor will correctly handle frames like `StartFrame`, `EndFrame`, `InterruptionFrame` without having to write custom logic for those frames. This inheritance also provides it with the ability to `process_frame()` and `push_frame()`:

* **`process_frame()`** is what allows the FrameProcessor to receive frames and add custom conditional logic based on the frames that are received.
* **`push_frame()`** allows the FrameProcessor to push frames to the pipeline. Normally, frames are pushed DOWNSTREAM, but based on which processors need the output, you can also push UPSTREAM or in both directions.

### Essential Implementation Details

To ensure proper base class inheritance, it's critical to include:

1. **`super().__init__()`** in your `__init__` method
2. **`await super().process_frame(frame, direction)`** in your `process_frame()` method

```python theme={null}
class MyCustomProcessor(FrameProcessor):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)  # ✅ Required
        # Your initialization code here

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)  # ✅ Required

        # Your custom frame processing logic here
        if isinstance(frame, SomeSpecificFrame):
            # Handle the frame
            pass

        await self.push_frame(frame, direction)  # ✅ Required - pass frame through
```

## Critical Responsibility: Frame Forwarding

FrameProcessors receive **all** frames that are pushed through the pipeline. This gives them a lot of power, but also a great responsibility. Critically, they must push all frames through the pipeline; if they don't, they block frames from moving through the Pipeline, which will cause issues in how your application functions.

As well as formatting and logging MetricsFrames, `MetricsFrameLogger` also has an `await self.push_frame(frame, direction)` which pushes the frame through to the next processor in the pipeline.

## Frame Direction

When pushing frames, you can specify the direction:

```python theme={null}
# Push downstream (default)
await self.push_frame(frame, FrameDirection.DOWNSTREAM)

# Push upstream
await self.push_frame(frame, FrameDirection.UPSTREAM)
```

Most custom FrameProcessors will push frames downstream, but upstream can be useful for sending control frames or error notifications back up the pipeline.

## Best Practices

1. **Always call the parent methods**: Use `super().__init__()` and `await super().process_frame()`
2. **Forward all frames**: Make sure every frame is pushed through with `await self.push_frame(frame, direction)`
3. **Handle frames conditionally**: Use `isinstance()` checks to handle specific frame types
4. **Use proper error handling**: Wrap risky operations in try/catch blocks
5. **Position carefully in pipeline**: Consider where in the pipeline your processor needs to be to receive the right frames

With these patterns, you can create powerful custom FrameProcessors that extend Pipecat's capabilities for your specific use case.
