Understanding and implementing observers in Pipecat
The Observer pattern in Pipecat allows non-intrusive monitoring of frames as they flow through the pipeline. Observers can watch frame traffic without affecting the pipeline’s core functionality.
Here’s an example observer that logs interruptions and bot speaking events:
Copy
Ask AI
class DebugObserver(BaseObserver): """Observer to log interruptions and bot speaking events to the console. Logs all frame instances of: - StartInterruptionFrame - BotStartedSpeakingFrame - BotStoppedSpeakingFrame This allows you to see the frame flow from processor to processor through the pipeline for these frames. Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s """ async def on_push_frame( self, src: FrameProcessor, dst: FrameProcessor, frame: Frame, direction: FrameDirection, timestamp: int, ): time_sec = timestamp / 1_000_000_000 arrow = "→" if direction == FrameDirection.DOWNSTREAM else "←" if isinstance(frame, StartInterruptionFrame): logger.info(f"⚡ INTERRUPTION START: {src} {arrow} {dst} at {time_sec:.2f}s") elif isinstance(frame, BotStartedSpeakingFrame): logger.info(f"🤖 BOT START SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s") elif isinstance(frame, BotStoppedSpeakingFrame): logger.info(f"🤖 BOT STOP SPEAKING: {src} {arrow} {dst} at {time_sec:.2f}s")