The Observer pattern in Pipecat allows non-intrusive monitoring of frames as they flow through the pipeline. Observers can watch frame traffic without affecting the pipeline’s core functionality.
Base Observer
All observers must inherit from BaseObserver
and implement the on_push_frame
method:
from pipecat.observers.base_observer import BaseObserver
class CustomObserver(BaseObserver):
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
# Your frame observation logic here
pass
Available Observers
Pipecat provides several built-in observers:
- LLMLogObserver: Logs LLM activity and responses
- TranscriptionLogObserver: Logs speech-to-text transcription events
- RTVIObserver: Converts internal frames to RTVI protocol messages for server to client messaging
Using Multiple Observers
You can attach multiple observers to a pipeline task. Each observer will be notified of all frames:
task = PipelineTask(
pipeline,
params=PipelineParams(
observers=[LLMLogObserver(), TranscriptionLogObserver(), CustomObserver()],
),
)
Example: Debug Observer
Here’s an example observer that logs interruptions and bot speaking events:
class DebugObserver(BaseObserver):
"""Observer to log interruptions and bot speaking events to the console.
Logs all frame instances of:
- StartInterruptionFrame
- BotStartedSpeakingFrame
- BotStoppedSpeakingFrame
This allows you to see the frame flow from processor to processor through the pipeline for these frames.
Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
"""
async def on_push_frame(
self,
src: FrameProcessor,
dst: FrameProcessor,
frame: Frame,
direction: FrameDirection,
timestamp: int,
):
time_sec = timestamp / 1_000_000_000
arrow = "→" if direction == FrameDirection.DOWNSTREAM else "←"
if isinstance(frame, StartInterruptionFrame):
logger.info(f"⚡ INTERRUPTION START: {src} {arrow} {dst} at {time_sec:.2f}s")
elif isinstance(frame, BotStartedSpeakingFrame):
logger.info(f"🤖 BOT START SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")
elif isinstance(frame, BotStoppedSpeakingFrame):
logger.info(f"🤖 BOT STOP SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")
Common Use Cases
Observers are particularly useful for:
- Debugging frame flow
- Logging specific events
- Monitoring pipeline behavior
- Collecting metrics
- Converting internal frames to external messages