Handling Interruptions#
When moving from text chat to voice chat, one of the biggest differences is interruptions. In a text chat an LLM can be streaming out a response and the user can just go ahead and start typing their next response. That doesn’t work for voice. If the user starts talking, you need to detect that and stop the audio output so the LLM can hear what the user has to say.
You always want to track interruptions accurately in the conversation history. If the LLM had planned to say two giant paragraphs but the user interrupted after the first sentence, it’s important to track what was actually said. Otherwise, the LLM will continue on thinking that it had said the full two paragraphs to the user.
VoiceStream has all the tools you need to handle interruptions cleanly. We will walk through what that looks like here.
The code shown here is from the gpt4_gemini_showdown example in the GitHub repo.
Detecting Speech Events#
The speech step has an option to return a second stream that has all of the SpeechStart and Speech stop events. If you want to detect interruptions, you can use this parameter. Sometimes there will be a false positive, and a SpeechStart will be immediately followed by a SpeechEnd, so we add a step to require the speech to continue on for a bit before signalling an interruption.
stream, speech_start_stream = google_speech_step(
stream,
current_app.speech_async_client,
project=app.config["GCP_PROJECT_ID"],
location=app.config["GCP_SPEECH_LOCATION"],
recognizer=app.config["GCP_BROWSER_SPEECH_RECOGNIZER"],
model="latest_long",
language_codes=["en-US", "es-US"],
audio_format=None,
include_events=True,
)
speech_start_stream = filter_spurious_speech_start_events_step(
speech_start_stream, threshold_secs=1.5
)
Putting the LLM and text to speech in a substream#
The speech_start_stream can now be used to signal when we should cancel the output. To do this, we
use a :func:~voice_stream.cancelable_substream_step.
stream, text_output = cancelable_substream_step(
stream,
speech_start_stream,
create_response_stream,
cancel_messages=[
None,
lambda: array_source([{"output": "..."}, ""]),
],
)
cancelable_substream_step takes two streams as input. The first is the main stream, which in our case
is the stream generated by the speech recognizer. The items from this stream will be processed through the
substream. The second input to cancelable_substream_step is the cancellation stream. When an item comes
in this stream, it indicates that the current substream should be cancelled. In our case, this stream will
indicate that the user has started speaking, and we will want to cancel the substream that has the audio output.
The create_response_substream parameter is a function that takes a source and returns a new stream. It controls
the LLM and Text To Speech.
The cancel_messages parameter controls what is sent downstream after a cancellation. There is one element in the
array for each output of the substream. For our case, there are two outputs, the audio and the text. When
a cancellation occurs we output nothing to the audio stream, and append a “…” to the end of the text stream
to indicate that there was more there but it got cut off.
The Response Substream#
The response substream takes the text output of the speech recognition as input. It outputs audio and text streams. There are few important things going on in this stream
1 def create_response_substream(stream):
2 stream = map_step(stream, lambda x: {"query": x})
3 stream = langchain_load_memory_step(stream, memory)
4 stream = langchain_step(stream, chain, on_completion="")
5 stream = filter_step(stream, lambda x: x != "" and ("history" not in x))
6 stream = map_step(stream, lambda x: x.get("output", None), ignore_none=True)
7 stream = buffer_tts_text_step(stream)
8 stream = google_text_to_speech_step(
9 stream,
10 current_app.text_to_speech_async_client,
11 audio_format=AudioFormat.MP3,
12 )
13 stream, text_output = tts_rate_limit_step(stream, audio_format=AudioFormat.MP3)
14 text_output = map_step(text_output, lambda x: TextOutput(text=x))
15 return stream, text_output
Handling Memory#
Before we send hte data to LangChain, we use a
:func:~voice_stream.integrations.langchain.langchain_load_memory_step
WIth interruptions, you can’t use the regular LangChain RunnableWithMessageHistory
wrapper, because that will put the full LLM response in the conversation history, and
if there is an interruption, you only want a piece of the response in there.
To accomplish this we manually load the history into the LangChain input with the langchain_load_memory_step
stream = langchain_load_memory_step(stream, memory)
Later on, we will see where we update the memory with the output. That happens outside the cancellable part of the stream.
Buffering TTS#
Buffering the LLM output before sending it to TTS allows us to quickly get the first utterance back to the user, while still producing natural sounding TTS. With the buffer step, the TTS is run as soon as the first phrase is output by the LLM. It then continues to generate more TTS output based on phrases. This keeps the TTS going in time with the LLM.
Without buffering, the TTS either gets the whole LLM output at once, which creates a long delay, or gets individual tokens from the LLM, which creates choppy, robotic speech.
stream = buffer_tts_text_step(stream)
Rate-Limiting TTS#
After the TTS is generated, we use a rate limit step to break the full audio and text into small chunks, which we output at the speed at which the audio plays.
This is important because we can’t stop the audio once we send it to the client. The only way to stop the speaker during an interruption is to avoid the client ever having a lot of buffered speech.
The rate-limit step rate limits both the text and the audio. The rate-limited text is used to update the LangChain conversation memory.
stream, text_output = tts_rate_limit_step(stream, audio_format=AudioFormat.MP3)
Handling Output#
In this section we handle the outputs. The stream and text_output streams are
returned from the cancellable_substream_step. These will contain the output audio and
text, and if the speaker interrupts, they will stop.
The audio stream is sent directly to the client.
In this example the text output is split into 2 streams.
The first is formatted and out on a queue to send back to the client. This allows the spoken output to be displayed.
The second stream is used to update the LangChain memory.
The data is formatted into LLM inputs and outputs and then saved to the conversation history with :func:
voice_stream.langchain_save_memory_stepThis stream is then discarded using an
empty_sink
Finally, all the streams are awaited to run the pipeline.
1 audio_output_done = quart_websocket_sink(stream)
2
3 text_output, memory_stream = fork_step(text_output)
4 text_output = filter_step(text_output, lambda x: x != "")
5 text_output = log_step(text_output, "Events")
6 text_output_done = queue_sink(text_output, current_streams[id].outbound)
7
8 memory_stream = collect_dict_step(memory_stream)
9 # Ignore the case where there was an interruption before a full response
10 memory_stream = filter_step(memory_stream, lambda x: "output" in x)
11 memory_stream = map_step(
12 memory_stream,
13 lambda x: {
14 "input": {"query": x["query"]},
15 "output": {"output": x["output"]},
16 },
17 )
18 memory_stream = log_step(memory_stream, "Conversation Memory:")
19 memory_stream = langchain_save_memory_step(memory_stream, memory)
20 memory_done = empty_sink(memory_stream)
21
22 await wait_on_sinks(
23 audio_output_done, text_output_done, memory_done
24 )
25
Review#
Putting this altogether, when a user starts during an existing LLM output:
The speech recognizer detects that the speech has started.
There is a small delay where we confirm that there is not an immediate SpeechEnd event.
Once we’ve determine the SpeechStart was real, it cancels the current substream playing back the last response.
This cancel message immediately stops audio and text from being sent back to the client.
The conversation memory is updated with whatever text had been spoken out of the previous response, followed by ‘…’.
When the recognizer completes the speech recognition, it sends new text to the LLM and a new substream starts.