Observer Pattern

The Observer pattern in Pipecat allows non-intrusive monitoring of frames as they flow through the pipeline. Observers can watch frame traffic without affecting the pipeline’s core functionality.

Base Observer

All observers must inherit from BaseObserver and implement the on_push_frame method:

from pipecat.observers.base_observer import BaseObserver

class CustomObserver(BaseObserver):
    async def on_push_frame(
        self,
        src: FrameProcessor,
        dst: FrameProcessor,
        frame: Frame,
        direction: FrameDirection,
        timestamp: int,
    ):
        # Your frame observation logic here
        pass

Using Multiple Observers

You can attach multiple observers to a pipeline task. Each observer will be notified of all frames:

task = PipelineTask(
    pipeline,
    PipelineParams(
        observers=[DebugObserver(), LoggingObserver()],
    ),
)

Example: Debug Observer

Here’s an example observer that logs interruptions and bot speaking events:

class DebugObserver(BaseObserver):
    """Observer to log interruptions and bot speaking events to the console.

    Logs all frame instances of:
    - StartInterruptionFrame
    - BotStartedSpeakingFrame
    - BotStoppedSpeakingFrame

    This allows you to see the frame flow from processor to processor through the pipeline for these frames.
    Log format: [EVENT TYPE]: [source processor][destination processor] at [timestamp]s
    """

    async def on_push_frame(
        self,
        src: FrameProcessor,
        dst: FrameProcessor,
        frame: Frame,
        direction: FrameDirection,
        timestamp: int,
    ):
        time_sec = timestamp / 1_000_000_000
        arrow = "→" if direction == FrameDirection.DOWNSTREAM else "←"

        if isinstance(frame, StartInterruptionFrame):
            logger.info(f"⚡ INTERRUPTION START: {src} {arrow} {dst} at {time_sec:.2f}s")
        elif isinstance(frame, BotStartedSpeakingFrame):
            logger.info(f"🤖 BOT START SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")
        elif isinstance(frame, BotStoppedSpeakingFrame):
            logger.info(f"🤖 BOT STOP SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")

Common Use Cases

Observers are particularly useful for:

  • Debugging frame flow
  • Logging specific events
  • Monitoring pipeline behavior
  • Collecting metrics
  • Converting internal frames to external messages