Overview

PipelineTask is the central class for managing pipeline execution. It handles the lifecycle of the pipeline, processes frames in both directions, manages task cancellation, and provides event handlers for monitoring pipeline activity.

Basic Usage

from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask

# Create a pipeline
pipeline = Pipeline([...])

# Create a task with the pipeline
task = PipelineTask(pipeline)

# Queue frames for processing
await task.queue_frame(TTSSpeakFrame("Hello, how can I help you today?"))

# Run the pipeline
runner = PipelineRunner()
await runner.run(task)

Constructor Parameters

pipeline
BasePipeline
required

The pipeline to execute.

params
PipelineParams
default:"PipelineParams()"

Configuration parameters for the pipeline. See PipelineParams for details.

observers
List[BaseObserver]
default:"[]"

List of observers for monitoring pipeline execution. See Observers for details.

clock
BaseClock
default:"SystemClock()"

Clock implementation for timing operations.

task_manager
Optional[BaseTaskManager]
default:"None"

Custom task manager for handling asyncio tasks. If None, a default TaskManager is used.

check_dangling_tasks
bool
default:"True"

Whether to check for processors’ tasks finishing properly.

idle_timeout_secs
Optional[float]
default:"300"

Timeout in seconds before considering the pipeline idle. Set to None to disable idle detection. See Pipeline Idle Detection for details.

idle_timeout_frames
Tuple[Type[Frame], ...]
default:"(BotSpeakingFrame, LLMFullResponseEndFrame)"

Frame types that should prevent the pipeline from being considered idle. See Pipeline Idle Detection for details.

cancel_on_idle_timeout
bool
default:"True"

Whether to automatically cancel the pipeline task when idle timeout is reached. See Pipeline Idle Detection for details.

Methods

Task Lifecycle Management

run()
async

Starts and manages the pipeline execution until completion or cancellation.

await task.run()
stop_when_done()
async

Sends an EndFrame to the pipeline to gracefully stop the task after all queued frames have been processed.

await task.stop_when_done()
cancel()
async

Stops the running pipeline immediately by sending a CancelFrame.

  await task.cancel()
has_finished()
bool

Returns whether the task has finished (all processors have stopped).

if task.has_finished(): print("Task is complete")

Frame Management

queue_frame()
async

Queues a single frame to be pushed down the pipeline.

await task.queue_frame(TTSSpeakFrame("Hello!"))
queue_frames()
async

Queues multiple frames to be pushed down the pipeline.

frames = [TTSSpeakFrame("Hello!"), TTSSpeakFrame("How are you?")]

await task.queue_frames(frames)

Event Handlers

PipelineTask provides an event handler that can be registered using the event_handler decorator:

on_idle_timeout

Triggered when no activity frames (as specified by idle_timeout_frames) have been received within the idle timeout period.

@task.event_handler("on_idle_timeout")
async def on_idle_timeout(task):
    print("Pipeline has been idle too long")
    await task.queue_frame(TTSSpeakFrame("Are you still there?"))