Skip to main content
The Observer pattern in Pipecat allows non-intrusive monitoring of frames as they flow through the pipeline. Observers can watch frame traffic without affecting the pipeline’s core functionality.
DEPRECATED: The old observer pattern with individual parameters (on_push_frame(src, dst, frame, direction, timestamp)) is deprecated. Use the new pattern with data objects (on_push_frame(data: FramePushed)) instead.

Base Observer

All observers must inherit from BaseObserver and can implement two main methods:
  • on_push_frame(data: FramePushed): Called when a frame is pushed from one processor to another
  • on_process_frame(data: FrameProcessed): Called when a frame is being processed by a processor
from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed

class CustomObserver(BaseObserver):
    async def on_push_frame(self, data: FramePushed):
        # Your frame observation logic here
        pass
    
    async def on_process_frame(self, data: FrameProcessed):
        # Your frame processing observation logic here
        pass

Available Observers

Pipecat provides several built-in observers:
  • LLMLogObserver: Logs LLM activity and responses
  • TranscriptionLogObserver: Logs speech-to-text transcription events
  • RTVIObserver: Converts internal frames to RTVI protocol messages for server to client messaging

Using Multiple Observers

You can attach multiple observers to a pipeline task. Each observer will be notified of all frames:
task = PipelineTask(
    pipeline,
    params=PipelineParams(
        observers=[LLMLogObserver(), TranscriptionLogObserver(), CustomObserver()],
    ),
)

Example: Debug Observer

Here’s an example observer that logs interruptions and bot speaking events:
from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed
from pipecat.frames.frames import (
    StartInterruptionFrame,
    BotStartedSpeakingFrame,
    BotStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from loguru import logger

class DebugObserver(BaseObserver):
    """Observer to log interruptions and bot speaking events to the console.

    Logs all frame instances of:
    - StartInterruptionFrame
    - BotStartedSpeakingFrame
    - BotStoppedSpeakingFrame

    This allows you to see the frame flow from processor to processor through the pipeline for these frames.
    Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
    """

    async def on_push_frame(self, data: FramePushed):
        time_sec = data.timestamp / 1_000_000_000
        arrow = "→" if data.direction == FrameDirection.DOWNSTREAM else "←"

        if isinstance(data.frame, StartInterruptionFrame):
            logger.info(f"⚡ INTERRUPTION START: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")
        elif isinstance(data.frame, BotStartedSpeakingFrame):
            logger.info(f"🤖 BOT START SPEAKING: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")
        elif isinstance(data.frame, BotStoppedSpeakingFrame):
            logger.info(f"🤖 BOT STOP SPEAKING: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")

Common Use Cases

Observers are particularly useful for:
  • Debugging frame flow
  • Logging specific events
  • Monitoring pipeline behavior
  • Collecting metrics
  • Converting internal frames to external messages