Frame for signaling errors in the pipeline.
def __str__(self):
"""
Returns formatted string with error message and fatal status
"""
Frame for unrecoverable errors that should terminate the pipeline.
@dataclass
class FatalErrorFrame(ErrorFrame):
"""
This is used notify upstream that an unrecoverable error has occurred and
that the bot should exit.
"""
fatal: bool = field(default=True, init=False)
async def handle_service_error(error: Exception):
await pipeline.push_frame(ErrorFrame(
error=f"Service error: {str(error)}",
fatal=False
))
async def handle_critical_error(error: Exception):
await pipeline.push_frame(FatalErrorFrame(
error=f"Critical error: {str(error)}"
))
async def process_frame(self, frame: Frame):
if isinstance(frame, ErrorFrame):
if frame.fatal:
await self.shutdown_pipeline()
else:
await self.attempt_recovery()
pipeline = Pipeline([
input_processor,
error_handler,
recovery_processor,
output_processor
])
class ErrorHandler(Processor):
async def process_frame(self, frame: Frame):
if isinstance(frame, ErrorFrame):
if frame.fatal:
logger.critical(f"Fatal error: {frame.error}")
await self.push_frame(CancelFrame())
else:
logger.error(f"Error: {frame.error}")
await self.handle_error(frame)
async def handle_service_error(self, error: Exception):
if is_temporary_error(error):
await self.push_frame(ErrorFrame(
error="Temporary service disruption",
fatal=False
))
await self.retry_operation()
else:
await self.push_frame(FatalErrorFrame(
error="Service permanently unavailable"
))
class ErrorAggregator(Processor):
def __init__(self):
self.errors = []
async def process_frame(self, frame: Frame):
if isinstance(frame, ErrorFrame):
self.errors.append({
'timestamp': time.time(),
'message': frame.error,
'fatal': frame.fatal
})
await self.update_error_metrics()