> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Metrics

> Learn how to monitor performance and LLM/TTS usage with Pipecat.

When developing real-time, multimodal AI applications, monitoring two key
factors is crucial: performance (latency) and LLM/TTS usage. Performance impacts
user experience, while usage can affect operational costs. Pipecat offers
built-in metrics for both, which can be enabled with straightforward
configuration options.

## Enabling performance metrics

Set `enable_metrics=True` in `PipelineParams` when creating a task:

```python Example config theme={null}
task = PipelineTask(
            pipeline,
            params=PipelineParams(
                ...
                enable_metrics=True,
                ...
            ),
        )
```

Once enabled, Pipecat logs the following metrics:

| Metric           | Description                                                                      |
| ---------------- | -------------------------------------------------------------------------------- |
| TTFB             | Time To First Byte in seconds                                                    |
| Processing Time  | Time taken by the service to respond in seconds                                  |
| Text Aggregation | Time from the first LLM token to the first complete sentence (TTS services only) |

```console Sample output theme={null}
AnthropicLLMService#0 TTFB: 0.8378312587738037
CartesiaTTSService#0 text aggregation time: 0.2134
CartesiaTTSService#0 processing time: 0.0005071163177490234
CartesiaTTSService#0 TTFB: 0.17177796363830566
AnthropicLLMService#0 processing time: 2.4927797317504883
```

### Limiting TTFB responses

If you only want the **first** TTFB measurement for each service, you can
optionally pass `report_only_initial_ttfb=True` in `PipelineParams`:

```python Example config theme={null}
task = PipelineTask(
            pipeline,
            params=PipelineParams(
                ...
                enable_metrics=True,
                report_only_initial_ttfb=True,
                ...
            ),
        )
```

> **Note:** `enable_metrics=True` is required for this setting to have an
> effect.

### Disabling initial empty metrics

By default, Pipecat sends an initial `MetricsFrame` with zero values for all
services when the pipeline starts. To disable this behavior:

```python Example config theme={null}
task = PipelineTask(
            pipeline,
            params=PipelineParams(
                ...
                enable_metrics=True,
                send_initial_empty_metrics=False,
                ...
            ),
        )
```

## Enabling LLM/TTS Usage Metrics

Set `enable_usage_metrics=True` in PipelineParams when creating a task:

```python Example config theme={null}
task = PipelineTask(
            pipeline,
            params=PipelineParams(
                ...
                enable_usage_metrics=True,
                ...
            ),
        )
```

Pipecat will log the following as applicable:

| Metric    | Description                                 |
| --------- | ------------------------------------------- |
| LLM Usage | Number of prompt and completion tokens used |
| TTS Usage | Number of characters processed              |

```console Sample output theme={null}
CartesiaTTSService#0 usage characters: 65
AnthropicLLMService#0 prompt tokens: 104, completion tokens: 53
```

> **Note:** Usage metrics are recorded per interaction and do not represent
> running totals.

## Capturing Metrics Data

When metrics are enabled, Pipecat emits a `MetricsFrame` for each interaction. The `MetricsFrame` contains a list of metrics data objects, which can include:

* `TTFBMetricsData` — Time To First Byte
* `ProcessingMetricsData` — Processing time
* `LLMUsageMetricsData` — LLM token usage
* `TTSUsageMetricsData` — TTS character usage
* `TextAggregationMetricsData` — Sentence aggregation latency (TTS)
* `TurnMetricsData` — Turn completion predictions

You can access the metrics data by either adding a custom [FrameProcessor](/pipecat/fundamentals/custom-frame-processor) to your pipeline or adding an [observer](/api-reference/server/utilities/observers/observer-pattern) to monitor `MetricsFrame`s.

### Example: Using MetricsLogObserver

The simplest way to log metrics is with the built-in `MetricsLogObserver`. Pass it as an observer when creating your `PipelineTask`:

```python theme={null}
from pipecat.observers.loggers.metrics_log_observer import MetricsLogObserver

task = PipelineTask(
    pipeline,
    params=PipelineParams(enable_metrics=True, enable_usage_metrics=True),
    observers=[MetricsLogObserver()],
)
```

You can filter which metrics types are logged by passing `include_metrics`:

```python theme={null}
from pipecat.metrics.metrics import LLMUsageMetricsData, TTSUsageMetricsData
from pipecat.observers.loggers.metrics_log_observer import MetricsLogObserver

observers = [
    MetricsLogObserver(
        include_metrics={LLMUsageMetricsData, TTSUsageMetricsData}
    )
]
```

### Example: Using a Custom FrameProcessor

Create a custom FrameProcessor to handle metrics data. Here's an example Metrics Processor that can be added to your pipeline after the TTS processor.

```python theme={null}
from pipecat.frames.frames import MetricsFrame
from pipecat.metrics.metrics import (
    LLMUsageMetricsData,
    ProcessingMetricsData,
    TextAggregationMetricsData,
    TTFBMetricsData,
    TTSUsageMetricsData,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor

class MetricsLogger(FrameProcessor):
    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)

        if isinstance(frame, MetricsFrame):
            for d in frame.data:
                if isinstance(d, TTFBMetricsData):
                    print(f"!!! MetricsFrame: {frame}, ttfb: {d.value}")
                elif isinstance(d, ProcessingMetricsData):
                    print(f"!!! MetricsFrame: {frame}, processing: {d.value}")
                elif isinstance(d, LLMUsageMetricsData):
                    tokens = d.value
                    print(
                        f"!!! MetricsFrame: {frame}, prompt_tokens: {tokens.prompt_tokens}, completion_tokens: {tokens.completion_tokens}"
                    )
                elif isinstance(d, TextAggregationMetricsData):
                    print(f"!!! MetricsFrame: {frame}, text aggregation: {d.value}")
                elif isinstance(d, TTSUsageMetricsData):
                    print(f"!!! MetricsFrame: {frame}, characters: {d.value}")
        await self.push_frame(frame, direction)
```

## Metrics Data Reference

All metrics data classes inherit from `MetricsData`, which includes `processor` (the name of the processor that generated the metric) and an optional `model` field.

### TTFBMetricsData

Time To First Byte — measures how long until the first byte of a response is received from a service.

| Field   | Type    | Description                 |
| ------- | ------- | --------------------------- |
| `value` | `float` | TTFB measurement in seconds |

### ProcessingMetricsData

Measures the total time taken by a service to process a request.

| Field   | Type    | Description                            |
| ------- | ------- | -------------------------------------- |
| `value` | `float` | Processing time measurement in seconds |

### TextAggregationMetricsData

Measures the time from the first LLM token to the first complete sentence, representing the latency cost of sentence aggregation in the TTS pipeline.

| Field   | Type    | Description                 |
| ------- | ------- | --------------------------- |
| `value` | `float` | Aggregation time in seconds |

### LLMUsageMetricsData

Token usage for an LLM interaction. The `value` field is an `LLMTokenUsage` object with:

| Field                         | Type            | Description                                  |
| ----------------------------- | --------------- | -------------------------------------------- |
| `prompt_tokens`               | `int`           | Number of tokens in the input prompt         |
| `completion_tokens`           | `int`           | Number of tokens in the generated completion |
| `total_tokens`                | `int`           | Total tokens used (prompt + completion)      |
| `cache_read_input_tokens`     | `Optional[int]` | Tokens read from cache, if applicable        |
| `cache_creation_input_tokens` | `Optional[int]` | Tokens used to create cache entries          |
| `reasoning_tokens`            | `Optional[int]` | Reasoning tokens (for reasoning models)      |

### TTSUsageMetricsData

Character usage for a TTS interaction.

| Field   | Type  | Description                           |
| ------- | ----- | ------------------------------------- |
| `value` | `int` | Number of characters processed by TTS |

### TurnMetricsData

Metrics from turn completion prediction, emitted by turn analyzers like Krisp Viva Turn and Smart Turn.

| Field                    | Type    | Description                                                                                |
| ------------------------ | ------- | ------------------------------------------------------------------------------------------ |
| `is_complete`            | `bool`  | Whether the turn is predicted to be complete                                               |
| `probability`            | `float` | Confidence probability of the prediction                                                   |
| `e2e_processing_time_ms` | `float` | End-to-end processing time in ms, from VAD speech-to-silence transition to turn completion |

## Related Observers

In addition to `MetricsLogObserver`, Pipecat provides observers that track higher-level conversational metrics.

### StartupTimingObserver

Measures the time taken by each processor to start up.

```python theme={null}
from pipecat.observers.startup_timing_observer import StartupTimingObserver

startup_observer = StartupTimingObserver()

@observer.event_handler("on_startup_timing_report")
async def on_startup_timing_report(observer, report):
    print(f"Total startup: {report.total_duration_secs:.3f}s")
    for timing in report.processor_timings:
        print(f"  {timing.processor_name}: {timing.duration_secs:.3f}s")
```

Additionally, it tracks the time taken to connect to the transport and the time taken to connect to the client.

```python theme={null}
@observer.event_handler("on_transport_timing_report")
async def on_transport_timing_report(observer, report):
    if report.bot_connected_secs is not None:
        print(f"Bot connected: {report.bot_connected_secs:.3f}s")
    print(f"Client connected: {report.client_connected_secs:.3f}s")
```

### UserBotLatencyObserver

Measures the time between when a user stops speaking and when the bot starts speaking.

```python theme={null}
from pipecat.observers.user_bot_latency_observer import UserBotLatencyObserver

latency_observer = UserBotLatencyObserver()

@latency_observer.event_handler("on_latency_measured")
async def on_latency_measured(observer, latency_seconds):
    print(f"User-to-bot latency: {latency_seconds:.3f}s")

task = PipelineTask(pipeline, observers=[latency_observer])
```

### TurnTrackingObserver

Tracks conversation turns, emitting events when turns start and end. Handles interruptions and configurable timeouts.

```python theme={null}
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver

turn_observer = TurnTrackingObserver(turn_end_timeout_secs=2.5)

@turn_observer.event_handler("on_turn_started")
async def on_turn_started(observer, turn_count):
    print(f"Turn {turn_count} started")

@turn_observer.event_handler("on_turn_ended")
async def on_turn_ended(observer, turn_count, duration, was_interrupted):
    status = "interrupted" if was_interrupted else "completed"
    print(f"Turn {turn_count} {status} after {duration:.2f}s")

task = PipelineTask(pipeline, observers=[turn_observer])
```
