> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Startup Timing Observer

> Measure processor startup times and transport readiness during pipeline initialization

The `StartupTimingObserver` measures how long each processor's `start()` method takes during pipeline startup, and tracks transport connection timing. This is useful for diagnosing startup slowness and identifying initialization bottlenecks such as WebSocket connections, API authentication, or model loading.

## Features

* Measures per-processor `start()` duration by tracking `StartFrame` propagation
* Reports total pipeline startup time and per-processor breakdown
* Tracks transport connection milestones (bot connected, client connected)
* Emits `on_startup_timing_report` with processor timing data
* Emits `on_transport_timing_report` with transport connection timing
* Supports filtering to measure only specific processor types
* Excludes internal pipeline processors by default

## Usage

### Basic Startup Monitoring

Add startup monitoring to your pipeline and handle the events:

```python theme={null}
from pipecat.observers.startup_timing_observer import StartupTimingObserver

observer = StartupTimingObserver()

@observer.event_handler("on_startup_timing_report")
async def on_startup_timing_report(observer, report):
    print(f"Total startup: {report.total_duration_secs:.3f}s")
    for timing in report.processor_timings:
        print(f"  {timing.processor_name}: {timing.duration_secs:.3f}s")

@observer.event_handler("on_transport_timing_report")
async def on_transport_timing_report(observer, report):
    if report.bot_connected_secs is not None:
        print(f"Bot connected: {report.bot_connected_secs:.3f}s")
    print(f"Client connected: {report.client_connected_secs:.3f}s")

task = PipelineTask(
    pipeline,
    observers=[observer],
)
```

### Filtering Processor Types

To measure only specific processor types, pass a `processor_types` tuple:

```python theme={null}
from pipecat.services.stt_service import STTService
from pipecat.services.tts_service import TTSService

observer = StartupTimingObserver(
    processor_types=(STTService, TTSService)
)
```

## Configuration

<ParamField path="processor_types" type="Tuple[Type[FrameProcessor], ...] | None" default="None">
  Optional tuple of processor types to measure. If `None`, all non-internal
  processors are measured. Internal pipeline processors (`PipelineSource`,
  `Pipeline`) are always excluded.
</ParamField>

## Event Handlers

### on\_startup\_timing\_report

Called once after the pipeline has fully started, with timing data for all measured processors.

```python theme={null}
@observer.event_handler("on_startup_timing_report")
async def on_startup_timing_report(observer, report):
    # report is a StartupTimingReport
    print(f"Total: {report.total_duration_secs:.3f}s")
    for timing in report.processor_timings:
        print(f"  {timing.processor_name}: {timing.duration_secs:.3f}s")
```

**Report fields (`StartupTimingReport`):**

| Field                 | Type                           | Description                                            |
| --------------------- | ------------------------------ | ------------------------------------------------------ |
| `start_time`          | `float`                        | Unix timestamp when the first processor began starting |
| `total_duration_secs` | `float`                        | Sum of all measured processor `start()` durations      |
| `processor_timings`   | `List[ProcessorStartupTiming]` | Per-processor timing data, in pipeline order           |

**Processor timing fields (`ProcessorStartupTiming`):**

| Field               | Type    | Description                                                     |
| ------------------- | ------- | --------------------------------------------------------------- |
| `processor_name`    | `str`   | The name of the processor                                       |
| `start_offset_secs` | `float` | Offset from the StartFrame to when this processor's start began |
| `duration_secs`     | `float` | How long the processor's `start()` took                         |

### on\_transport\_timing\_report

Called once when the first client connects, with transport connection timing relative to the `StartFrame`.

```python theme={null}
@observer.event_handler("on_transport_timing_report")
async def on_transport_timing_report(observer, report):
    # report is a TransportTimingReport
    if report.bot_connected_secs is not None:
        print(f"Bot connected: {report.bot_connected_secs:.3f}s")
    print(f"Client connected: {report.client_connected_secs:.3f}s")
```

**Report fields (`TransportTimingReport`):**

| Field                   | Type              | Description                                                          |
| ----------------------- | ----------------- | -------------------------------------------------------------------- |
| `start_time`            | `float`           | Unix timestamp of the StartFrame (pipeline start)                    |
| `bot_connected_secs`    | `Optional[float]` | Seconds from StartFrame to `BotConnectedFrame` (SFU transports only) |
| `client_connected_secs` | `Optional[float]` | Seconds from StartFrame to first `ClientConnectedFrame`              |

<Note>
  `bot_connected_secs` is only set for SFU transports (Daily, LiveKit, HeyGen,
  Tavus) that emit a `BotConnectedFrame` when the bot joins the room. Non-SFU
  transports (WebSocket, SmallWebRTC) will have this field set to `None`.
</Note>
