The Observer pattern in Pipecat allows non-intrusive monitoring of frames as they flow through the pipeline. Observers can watch frame traffic without affecting the pipeline’s core functionality.
DEPRECATED: The old observer pattern with individual parameters (on_push_frame(src, dst, frame, direction, timestamp)) is deprecated. Use the new pattern with data objects (on_push_frame(data: FramePushed)) instead.
Base Observer
All observers must inherit from BaseObserver and can implement two main methods:
on_push_frame(data: FramePushed): Called when a frame is pushed from one processor to another
on_process_frame(data: FrameProcessed): Called when a frame is being processed by a processor
from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed
class CustomObserver(BaseObserver):
async def on_push_frame(self, data: FramePushed):
# Your frame observation logic here
pass
async def on_process_frame(self, data: FrameProcessed):
# Your frame processing observation logic here
pass
Available Observers
Pipecat provides several built-in observers:
- LLMLogObserver: Logs LLM activity and responses
- TranscriptionLogObserver: Logs speech-to-text transcription events
- RTVIObserver: Converts internal frames to RTVI protocol messages for server to client messaging
Using Multiple Observers
You can attach multiple observers to a pipeline task. Each observer will be notified of all frames:
task = PipelineTask(
pipeline,
params=PipelineParams(
observers=[LLMLogObserver(), TranscriptionLogObserver(), CustomObserver()],
),
)
Example: Debug Observer
Here’s an example observer that logs interruptions and bot speaking events:
from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed
from pipecat.frames.frames import (
StartInterruptionFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from loguru import logger
class DebugObserver(BaseObserver):
"""Observer to log interruptions and bot speaking events to the console.
Logs all frame instances of:
- StartInterruptionFrame
- BotStartedSpeakingFrame
- BotStoppedSpeakingFrame
This allows you to see the frame flow from processor to processor through the pipeline for these frames.
Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
"""
async def on_push_frame(self, data: FramePushed):
time_sec = data.timestamp / 1_000_000_000
arrow = "→" if data.direction == FrameDirection.DOWNSTREAM else "←"
if isinstance(data.frame, StartInterruptionFrame):
logger.info(f"⚡ INTERRUPTION START: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")
elif isinstance(data.frame, BotStartedSpeakingFrame):
logger.info(f"🤖 BOT START SPEAKING: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")
elif isinstance(data.frame, BotStoppedSpeakingFrame):
logger.info(f"🤖 BOT STOP SPEAKING: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")
Common Use Cases
Observers are particularly useful for:
- Debugging frame flow
- Logging specific events
- Monitoring pipeline behavior
- Collecting metrics
- Converting internal frames to external messages