> ## Documentation Index
> Fetch the complete documentation index at: https://docs.pipecat.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# PipelineTask

> Manage pipeline execution and lifecycle with PipelineTask

## Overview

`PipelineTask` is the central class for managing pipeline execution. It handles the lifecycle of the pipeline, processes frames in both directions, manages task cancellation, and provides event handlers for monitoring pipeline activity.

## Basic Usage

```python theme={null}
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask

# Create a pipeline
pipeline = Pipeline([...])

# Create a task with the pipeline
task = PipelineTask(pipeline)

# Queue frames for processing
await task.queue_frame(TTSSpeakFrame("Hello, how can I help you today?"))

# Run the pipeline
runner = PipelineRunner()
await runner.run(task)
```

## Constructor Parameters

<ParamField path="pipeline" type="BasePipeline" required>
  The pipeline to execute.
</ParamField>

<ParamField path="params" type="PipelineParams" default="PipelineParams()">
  Configuration parameters for the pipeline. See
  [PipelineParams](/api-reference/server/pipeline/pipeline-params) for details.
</ParamField>

<ParamField path="observers" type="List[BaseObserver]" default="[]">
  List of observers for monitoring pipeline execution. See
  [Observers](/api-reference/server/utilities/observers/observer-pattern) for
  details.
</ParamField>

<ParamField path="clock" type="BaseClock" default="SystemClock()">
  Clock implementation for timing operations.
</ParamField>

<ParamField path="task_manager" type="BaseTaskManager | None" default="None">
  Custom task manager for handling asyncio tasks. If None, a default TaskManager
  is used.
</ParamField>

<ParamField path="check_dangling_tasks" type="bool" default="True">
  Whether to check for processors' tasks finishing properly.
</ParamField>

<ParamField path="idle_timeout_secs" type="float | None" default="300">
  Timeout in seconds before considering the pipeline idle. Set to None to
  disable idle detection. See [Pipeline Idle
  Detection](/api-reference/server/pipeline/pipeline-idle-detection) for
  details.
</ParamField>

<ParamField path="idle_timeout_frames" type="Tuple[Type[Frame], ...]" default="(BotSpeakingFrame, UserSpeakingFrame)">
  Frame types that should prevent the pipeline from being considered idle. See
  [Pipeline Idle
  Detection](/api-reference/server/pipeline/pipeline-idle-detection) for
  details.
</ParamField>

<ParamField path="cancel_on_idle_timeout" type="bool" default="True">
  Whether to automatically cancel the pipeline task when idle timeout is
  reached. See [Pipeline Idle
  Detection](/api-reference/server/pipeline/pipeline-idle-detection) for
  details.
</ParamField>

<ParamField path="enable_tracing" type="bool" default="False">
  Whether to enable OpenTelemetry tracing. See [The OpenTelemetry
  guide](/api-reference/server/utilities/opentelemetry) for details.
</ParamField>

<ParamField path="enable_turn_tracking" type="bool" default="True">
  Whether to enable turn tracking. See [The OpenTelemetry
  guide](/api-reference/server/utilities/opentelemetry) for details.
</ParamField>

<ParamField path="conversation_id" type="str | None" default="None">
  Custom ID for the conversation. If not provided, a UUID will be generated. See
  [The OpenTelemetry guide](/api-reference/server/utilities/opentelemetry) for
  details.
</ParamField>

<ParamField path="additional_span_attributes" type="dict | None" default="None">
  Any additional attributes to add to top-level OpenTelemetry conversation span.
  See [The OpenTelemetry guide](/api-reference/server/utilities/opentelemetry)
  for details.
</ParamField>

<ParamField path="app_resources" type="Any" default="None">
  Application-defined bag of resources (database handles, API clients, state,
  etc.) shared across tool handlers. Passed by reference to every function
  handler via `FunctionCallParams.app_resources`. The framework never copies or
  clears this object; the caller retains their handle and can read mutations
  after the task finishes.
</ParamField>

<ParamField path="tool_resources" type="Any" default="None">
  Deprecated alias for `app_resources`. Use `app_resources` in new code.
</ParamField>

## Methods

### Task Lifecycle Management

<ResponseField name="run()" type="async">
  Starts and manages the pipeline execution until completion or cancellation. Typically called via `PipelineRunner` rather than directly:

  ```python theme={null}
  runner = PipelineRunner()
  await runner.run(task)
  ```
</ResponseField>

<ResponseField name="stop_when_done()" type="async">
  Sends an EndFrame to the pipeline to gracefully stop the task after all queued
  frames have been processed.

  ```python theme={null}
  await task.stop_when_done()
  ```
</ResponseField>

<ResponseField name="cancel()" type="async">
  Stops the running pipeline immediately by sending a CancelFrame.

  ```python theme={null}
    await task.cancel()
  ```
</ResponseField>

<ResponseField name="has_finished()" type="bool">
  Returns whether the task has finished (all processors have stopped).

  ```python theme={null}
  if task.has_finished(): print("Task is complete")
  ```
</ResponseField>

### Frame Management

<ResponseField name="queue_frame(frame, direction=FrameDirection.DOWNSTREAM)" type="async">
  Queues a single frame to be pushed through the pipeline.

  Downstream frames are pushed from the beginning of the pipeline. Upstream frames are pushed from the end of the pipeline.

  **Parameters:**

  | Parameter   | Type             | Default                     | Description                     |
  | ----------- | ---------------- | --------------------------- | ------------------------------- |
  | `frame`     | `Frame`          | (required)                  | The frame to be processed       |
  | `direction` | `FrameDirection` | `FrameDirection.DOWNSTREAM` | The direction to push the frame |

  ```python theme={null}
  # Push a frame downstream (default behavior)
  await task.queue_frame(TTSSpeakFrame("Hello!"))

  # Push a frame upstream from the end of the pipeline
  from pipecat.processors.frame_processor import FrameDirection
  await task.queue_frame(UserStoppedSpeakingFrame(), direction=FrameDirection.UPSTREAM)
  ```
</ResponseField>

<ResponseField name="queue_frames(frames, direction=FrameDirection.DOWNSTREAM)" type="async">
  Queues multiple frames to be pushed through the pipeline.

  Downstream frames are pushed from the beginning of the pipeline. Upstream frames are pushed from the end of the pipeline.

  **Parameters:**

  | Parameter   | Type                                      | Default                     | Description                             |
  | ----------- | ----------------------------------------- | --------------------------- | --------------------------------------- |
  | `frames`    | `Iterable[Frame] \| AsyncIterable[Frame]` | (required)                  | An iterable or async iterable of frames |
  | `direction` | `FrameDirection`                          | `FrameDirection.DOWNSTREAM` | The direction to push the frames        |

  ```python theme={null}
  # Push frames downstream (default behavior)
  frames = [TTSSpeakFrame("Hello!"), TTSSpeakFrame("How are you?")]
  await task.queue_frames(frames)

  # Push frames upstream from the end of the pipeline
  from pipecat.processors.frame_processor import FrameDirection
  frames = [TranscriptionFrame("user input"), UserStoppedSpeakingFrame()]
  await task.queue_frames(frames, direction=FrameDirection.UPSTREAM)
  ```
</ResponseField>

## Event Handlers

PipelineTask provides event handlers for monitoring pipeline lifecycle and frame flow. Register handlers using the `@event_handler` decorator.

| Event                         | Description                                         |
| ----------------------------- | --------------------------------------------------- |
| `on_pipeline_started`         | Pipeline has started processing                     |
| `on_pipeline_finished`        | Pipeline reached a terminal state                   |
| `on_pipeline_error`           | An error frame reached the pipeline task            |
| `on_frame_reached_upstream`   | A filtered frame type reached the pipeline source   |
| `on_frame_reached_downstream` | A filtered frame type reached the pipeline sink     |
| `on_idle_timeout`             | No activity detected within the idle timeout period |

### on\_pipeline\_started

Fired when the `StartFrame` has been processed by all processors in the pipeline. This indicates the pipeline is fully initialized and running.

```python theme={null}
@task.event_handler("on_pipeline_started")
async def on_pipeline_started(task, frame):
    print("Pipeline is running!")
```

**Parameters:**

| Parameter | Type           | Description                        |
| --------- | -------------- | ---------------------------------- |
| `task`    | `PipelineTask` | The pipeline task instance         |
| `frame`   | `StartFrame`   | The start frame that was processed |

### on\_pipeline\_finished

Fired after the pipeline reaches any terminal state. This includes normal completion (`EndFrame`), explicit stop (`StopFrame`), or cancellation (`CancelFrame`). Use this event for cleanup, logging, or post-processing.

```python theme={null}
@task.event_handler("on_pipeline_finished")
async def on_pipeline_finished(task, frame):
    if isinstance(frame, EndFrame):
        print("Pipeline ended normally")
    elif isinstance(frame, CancelFrame):
        print("Pipeline was cancelled")
    elif isinstance(frame, StopFrame):
        print("Pipeline was stopped")
```

**Parameters:**

| Parameter | Type           | Description                                                    |
| --------- | -------------- | -------------------------------------------------------------- |
| `task`    | `PipelineTask` | The pipeline task instance                                     |
| `frame`   | `Frame`        | The terminal frame (`EndFrame`, `StopFrame`, or `CancelFrame`) |

### on\_pipeline\_error

Fired when an `ErrorFrame` reaches the pipeline task (upstream from a processor). If the error is fatal, the pipeline will be cancelled after this handler runs.

```python theme={null}
@task.event_handler("on_pipeline_error")
async def on_pipeline_error(task, frame):
    print(f"Pipeline error: {frame.error}")
    if frame.fatal:
        print("Fatal error — pipeline will be cancelled")
```

**Parameters:**

| Parameter | Type           | Description                        |
| --------- | -------------- | ---------------------------------- |
| `task`    | `PipelineTask` | The pipeline task instance         |
| `frame`   | `ErrorFrame`   | The error frame with error details |

### on\_frame\_reached\_upstream

Fired when a frame of a registered type reaches the pipeline source (the start of the pipeline). You must configure which frame types trigger this event using `set_reached_upstream_filter()` or `add_reached_upstream_filter()`.

```python theme={null}
from pipecat.frames.frames import TranscriptionFrame

# Configure which frame types to monitor
task.set_reached_upstream_filter((TranscriptionFrame,))

@task.event_handler("on_frame_reached_upstream")
async def on_frame_reached_upstream(task, frame):
    print(f"Frame reached upstream: {frame}")
```

**Parameters:**

| Parameter | Type           | Description                                |
| --------- | -------------- | ------------------------------------------ |
| `task`    | `PipelineTask` | The pipeline task instance                 |
| `frame`   | `Frame`        | The frame that reached the pipeline source |

<Note>
  This event only fires for frame types you've explicitly registered. By
  default, no frame types are monitored. This is for efficiency — checking every
  frame would be wasteful when you typically only care about specific types.
</Note>

### on\_frame\_reached\_downstream

Fired when a frame of a registered type reaches the pipeline sink (the end of the pipeline). You must configure which frame types trigger this event using `set_reached_downstream_filter()` or `add_reached_downstream_filter()`.

```python theme={null}
from pipecat.frames.frames import TTSAudioRawFrame

# Configure which frame types to monitor
task.set_reached_downstream_filter((TTSAudioRawFrame,))

@task.event_handler("on_frame_reached_downstream")
async def on_frame_reached_downstream(task, frame):
    print(f"Frame reached downstream: {frame}")
```

**Parameters:**

| Parameter | Type           | Description                              |
| --------- | -------------- | ---------------------------------------- |
| `task`    | `PipelineTask` | The pipeline task instance               |
| `frame`   | `Frame`        | The frame that reached the pipeline sink |

### on\_idle\_timeout

Fired when no activity frames (as specified by `idle_timeout_frames`) have been received within the idle timeout period. See [Pipeline Idle Detection](/api-reference/server/pipeline/pipeline-idle-detection) for configuration details.

```python theme={null}
@task.event_handler("on_idle_timeout")
async def on_idle_timeout(task):
    print("Pipeline has been idle too long")
    await task.queue_frame(TTSSpeakFrame("Are you still there?"))
```

**Parameters:**

| Parameter | Type           | Description                |
| --------- | -------------- | -------------------------- |
| `task`    | `PipelineTask` | The pipeline task instance |

<Note>
  If `cancel_on_idle_timeout` is `True` (the default), the pipeline will be
  automatically cancelled after this handler runs. Set it to `False` if you want
  to handle idle timeouts yourself.
</Note>
