Overview
In some scenarios, turn detection happens externally, either through a dedicated processor or an external service. Pipecat provides ExternalUserTurnStrategies, a user turn strategy that defers turn handling to these external sources.
External turn management might be needed when:
- Multiple context aggregators: Parallel pipelines with multiple LLMs need a single, shared source of turn events
- External services with turn detection: Services like Deepgram Flux or Speechmatics provide their own turn detection
In both cases, you need to configure your context aggregators with ExternalUserTurnStrategies to defer turn handling to the external source.
External Services
Some speech-to-text services provide built-in turn detection. When using these services, configure your context aggregator with ExternalUserTurnStrategies to let the service handle turn management:
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies
# Configure aggregator to use external turn strategies
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=ExternalUserTurnStrategies()
),
)
UserTurnProcessor
UserTurnProcessor is a frame processor for managing user turn lifecycle when you need a single source of turn events shared across multiple context aggregators. It emits UserStartedSpeakingFrame and UserStoppedSpeakingFrame frames and handles interruptions.
UserTurnProcessor only manages user turn start and end events. It does not handle transcription aggregation, that remains the responsibility of the context aggregators.
Constructor Parameters
user_turn_strategies
UserTurnStrategies
default:"UserTurnStrategies()"
Configured strategies for starting and stopping user turns. See User Turn Strategies for available options.
Timeout in seconds to automatically stop a user turn if no stop strategy triggers.
Event Handlers
UserTurnProcessor provides event handlers for turn lifecycle events:
@user_turn_processor.event_handler("on_user_turn_started")
async def on_user_turn_started(processor, strategy):
# Called when a user turn starts
pass
@user_turn_processor.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(processor, strategy):
# Called when a user turn stops
pass
@user_turn_processor.event_handler("on_user_turn_stop_timeout")
async def on_user_turn_stop_timeout(processor):
# Called if no stop strategy triggers before timeout
pass
Usage with Parallel Pipelines
When using parallel pipelines with multiple context aggregators, place UserTurnProcessor before the parallel pipeline and configure each context aggregator with ExternalUserTurnStrategies:
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy
from pipecat.turns.user_turn_processor import UserTurnProcessor
from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies, UserTurnStrategies
# Create the external user turn processor with your preferred strategies
user_turn_processor = UserTurnProcessor(
user_turn_strategies=UserTurnStrategies(
stop=[
TurnAnalyzerUserTurnStopStrategy(
turn_analyzer=LocalSmartTurnAnalyzerV3()
)
]
),
)
# Create contexts for each LLM
openai_context = LLMContext(openai_messages)
groq_context = LLMContext(groq_messages)
# Configure aggregators to use external turn strategies
openai_context_aggregator = LLMContextAggregatorPair(
openai_context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=ExternalUserTurnStrategies()
),
)
groq_context_aggregator = LLMContextAggregatorPair(
groq_context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=ExternalUserTurnStrategies()
),
)
# Build the pipeline with UserTurnProcessor before the parallel branches
pipeline = Pipeline(
[
transport.input(),
stt,
user_turn_processor, # Handles turn management for all branches
ParallelPipeline(
[
openai_context_aggregator.user(),
openai_llm,
transport.output(),
openai_context_aggregator.assistant(),
],
[
groq_context_aggregator.user(),
groq_llm,
groq_context_aggregator.assistant(),
],
),
]
)