Frame for carrying performance and operational metrics data.
class MetricsData:
timestamp: float
category: str
name: str
value: float
unit: str
tags: Dict[str, str]
metrics_frame = MetricsFrame(data=[
MetricsData(
timestamp=time.time(),
category="processing",
name="latency",
value=125.5,
unit="ms",
tags={"processor": "stt", "model": "whisper"}
)
])
class MetricsCollector(Processor):
async def process_frame(self, frame: Frame):
start_time = time.time()
result = await self.process(frame)
processing_time = (time.time() - start_time) * 1000
await self.push_frame(MetricsFrame(data=[
MetricsData(
timestamp=time.time(),
category="performance",
name="processing_time",
value=processing_time,
unit="ms",
tags={
"frame_type": frame.__class__.__name__,
"processor": self.__class__.__name__
}
)
]))
pipeline = Pipeline([
input_processor,
metrics_collector,
metrics_aggregator,
metrics_reporter,
output_processor
])
latency_metrics = MetricsFrame(data=[
MetricsData(
timestamp=time.time(),
category="latency",
name="ttfb",
value=75.0,
unit="ms",
tags={"service": "llm"}
)
])
throughput_metrics = MetricsFrame(data=[
MetricsData(
timestamp=time.time(),
category="throughput",
name="frames_processed",
value=100,
unit="frames/s",
tags={"processor": "audio"}
)
])
resource_metrics = MetricsFrame(data=[
MetricsData(
timestamp=time.time(),
category="resources",
name="memory_usage",
value=1024.5,
unit="MB",
tags={"component": "pipeline"}
)
])
class MetricsAggregator(Processor):
def __init__(self):
self.metrics_buffer = []
self.aggregation_interval = 60
async def process_frame(self, frame: Frame):
if isinstance(frame, MetricsFrame):
self.metrics_buffer.extend(frame.data)
if self.should_aggregate():
aggregated = self.aggregate_metrics()
await self.push_frame(MetricsFrame(
data=aggregated
))
self.metrics_buffer.clear()
class MetricsReporter(Processor):
async def process_frame(self, frame: Frame):
if isinstance(frame, MetricsFrame):
for metric in frame.data:
await self.report_metric(
name=metric.name,
value=metric.value,
tags=metric.tags
)