The Observer pattern in Pipecat allows non-intrusive monitoring of frames as they flow through the pipeline. Observers can watch frame traffic without affecting the pipeline’s core functionality.
Base Observer
All observers must inherit from BaseObserver and can implement these methods:
on_push_frame(data: FramePushed): Called when a frame is pushed from one processor to another
on_process_frame(data: FrameProcessed): Called when a frame is being processed by a processor
on_pipeline_started(): Called after the StartFrame has been processed by all processors in the pipeline
from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed
class CustomObserver(BaseObserver):
async def on_push_frame(self, data: FramePushed):
# Your frame observation logic here
pass
async def on_process_frame(self, data: FrameProcessed):
# Your frame processing observation logic here
pass
async def on_pipeline_started(self):
# Called when the pipeline has fully started
pass
Lifecycle and Cleanup
The pipeline calls setup() on each observer when it starts and cleanup() when it stops. If your observer spawns background work, use self.create_task() so the task is tracked by the pipeline’s task manager, and override cleanup() to cancel it. Always call super().cleanup(), which waits for any in-flight event handlers to finish.
class CustomObserver(BaseObserver):
def __init__(self):
super().__init__()
self._task = None
async def on_pipeline_started(self):
# Use create_task (not asyncio.create_task) so the task is tracked.
self._task = self.create_task(self._run())
async def _run(self):
# Your long-running background logic here.
pass
async def cleanup(self):
if self._task:
await self.cancel_task(self._task)
await super().cleanup()
If you give your observer a custom __init__, you must call super().__init__(). Skipping it leaves the observer partially initialized and raises errors such as 'CustomObserver' object has no attribute '_name' at runtime.
Available Observers
Pipecat provides several built-in observers:
- LLMLogObserver: Logs LLM activity and responses
- TranscriptionLogObserver: Logs speech-to-text transcription events
- RTVIObserver: Converts internal frames to RTVI protocol messages for server to client messaging
- StartupTimingObserver: Measures processor startup times and transport readiness
- UserBotLatencyObserver: Measures user-to-bot response latency
- TurnTrackingObserver: Tracks conversation turns and events
Using Multiple Observers
You can attach multiple observers to a pipeline task. Each observer will be notified of all frames:
task = PipelineWorker(
pipeline,
params=PipelineParams(
observers=[LLMLogObserver(), TranscriptionLogObserver(), CustomObserver()],
),
)
Example: Debug Observer
Here’s an example observer that logs interruptions and bot speaking events:
from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed
from pipecat.frames.frames import (
InterruptionFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from loguru import logger
class DebugObserver(BaseObserver):
"""Observer to log interruptions and bot speaking events to the console.
Logs all frame instances of:
- InterruptionFrame
- BotStartedSpeakingFrame
- BotStoppedSpeakingFrame
This allows you to see the frame flow from processor to processor through the pipeline for these frames.
Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
"""
async def on_push_frame(self, data: FramePushed):
time_sec = data.timestamp / 1_000_000_000
arrow = "→" if data.direction == FrameDirection.DOWNSTREAM else "←"
if isinstance(data.frame, InterruptionFrame):
logger.info(f"⚡ INTERRUPTION START: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")
elif isinstance(data.frame, BotStartedSpeakingFrame):
logger.info(f"🤖 BOT START SPEAKING: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")
elif isinstance(data.frame, BotStoppedSpeakingFrame):
logger.info(f"🤖 BOT STOP SPEAKING: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")
Common Use Cases
Observers are particularly useful for:
- Debugging frame flow
- Logging specific events
- Monitoring pipeline behavior
- Collecting metrics
- Converting internal frames to external messages