StartFrame

Initiates pipeline processing and sets up initial conditions.

Properties

clock
BaseClock
required

Clock instance for pipeline timing

allow_interruptions
bool
default: "false"

Whether to allow user interruptions during processing

enable_metrics
bool
default: "false"

Whether to collect performance metrics

enable_usage_metrics
bool
default: "false"

Whether to collect usage metrics

report_only_initial_ttfb
bool
default: "false"

Whether to report only initial time-to-first-byte metrics

EndFrame

Signals normal pipeline termination.

@dataclass
class EndFrame(ControlFrame):
    """
    Indicates that a pipeline has ended and frame processors and pipelines
    should be shut down. If the transport receives this frame, it will stop
    sending frames to its output channel(s) and close all its threads.
    """
    pass

CancelFrame

Signals immediate pipeline termination.

@dataclass
class CancelFrame(SystemFrame):
    """Indicates that a pipeline needs to stop right away."""
    pass

EndTaskFrame

Requests graceful pipeline closure.

@dataclass
class EndTaskFrame(SystemFrame):
    """
    Used to notify the pipeline task that the pipeline should be
    closed nicely (flushing all the queued frames) by pushing an EndFrame
    downstream.
    """
    pass

CancelTaskFrame

Requests immediate pipeline termination.

@dataclass
class CancelTaskFrame(SystemFrame):
    """
    Used to notify the pipeline task that the pipeline should be
    stopped immediately by pushing a CancelFrame downstream.
    """
    pass

StopTaskFrame

Signals that a pipeline task should be stopped while keeping processors in a running state.

@dataclass
class StopTaskFrame(SystemFrame):
    """
    Indicates that a pipeline task should be stopped but that the pipeline
    processors should be kept in a running state. This is normally queued from
    the pipeline task.
    """
    pass

Usage Examples

Pipeline Initialization

# Start pipeline with metrics enabled
start_frame = StartFrame(
    clock=SystemClock(),
    enable_metrics=True,
    allow_interruptions=True
)

# Push start frame to begin processing
await pipeline.push_frame(start_frame)

Graceful Shutdown

# End pipeline normally
async def shutdown_pipeline():
    await pipeline.push_frame(EndTaskFrame())
    # Wait for pipeline to process remaining frames
    await pipeline.wait_until_done()

Emergency Stop

# Cancel pipeline immediately
async def emergency_stop():
    await pipeline.push_frame(CancelTaskFrame())

Pipeline State Management

# Stop pipeline task while maintaining processor state
async def stop_pipeline_task():
    await pipeline.push_frame(StopTaskFrame())

# Start new pipeline task
async def start_new_task():
    await pipeline.push_frame(StartFrame(clock=pipeline.clock))

Frame Flow

Notes

  • StartFrame must be the first frame in a pipeline
  • EndFrame ensures all queued frames are processed before shutdown
  • CancelFrame immediately stops all processing
  • StopTaskFrame stops the pipeline task while keeping processors running
  • Control frames are processed in order (unlike system frames)
  • Metrics collection affects performance and should be enabled selectively