Overview
PipelineRunner
provides high-level orchestration for managing multiple pipeline tasks. It handles task execution, signal handling (SIGINT/SIGTERM), and coordinated shutdown of multiple pipelines.
Class Definition
class PipelineRunner:
def __init__(
self,
*,
name: str | None = None,
handle_sigint: bool = True
):
self.id: int = obj_id()
self.name: str = name or f"{self.__class__.__name__}#{obj_count(self)}"
self._tasks = {}
Constructor Parameters
Custom name for the runner instance
Whether to handle SIGINT/SIGTERM signals
Methods
Task Management
async def run(self, task: PipelineTask):
"""
Run a pipeline task and track it
"""
async def stop_when_done(self):
"""
Schedule graceful shutdown of all tasks
"""
async def cancel(self):
"""
Immediately cancel all running tasks
"""
Signal Handling
def _setup_sigint(self):
"""
Configure SIGINT/SIGTERM handlers
"""
loop = asyncio.get_running_loop()
loop.add_signal_handler(
signal.SIGINT,
lambda *args: asyncio.create_task(self._sig_handler())
)
loop.add_signal_handler(
signal.SIGTERM,
lambda *args: asyncio.create_task(self._sig_handler())
)
Usage Examples
Basic Runner
runner = PipelineRunner(name="main-runner")
task = PipelineTask(my_pipeline)
await runner.run(task)
Multiple Tasks
runner = PipelineRunner()
audio_task = PipelineTask(audio_pipeline)
video_task = PipelineTask(video_pipeline)
await asyncio.gather(
runner.run(audio_task),
runner.run(video_task)
)
Graceful Shutdown
runner = PipelineRunner()
try:
await runner.run(task)
await runner.stop_when_done()
except Exception as e:
logger.error(f"Pipeline error: {e}")
await runner.cancel()
Task Flow
Signal Flow
Task Management
Task Tracking
self._tasks = {}
self._tasks[task.name] = task
del self._tasks[task.name]
Task Coordination
async def stop_when_done(self):
await asyncio.gather(*[
t.stop_when_done()
for t in self._tasks.values()
])
async def cancel(self):
await asyncio.gather(*[
t.cancel()
for t in self._tasks.values()
])
Error Handling
The runner provides several layers of error protection:
-
Signal Handling
- Catches SIGINT/SIGTERM
- Initiates clean shutdown
-
Task Management
- Tracks task state
- Handles task failures
-
Cleanup
- Ensures all tasks are stopped
- Releases resources
Notes
- Manages multiple pipeline tasks
- Handles system signals
- Provides graceful shutdown
- Tracks task lifecycle
- Supports concurrent execution
- Thread-safe task management
- Configurable signal handling
- Maintains task isolation