Walkthrough#

In this section, we walk through the code in the QuickStart example to see what’s going on.

The first section of the code is all application infrastructure required to set up the UI and serve the page to the browser. This is followed by the VoiceStream data flow.

Application Structure#

Imports#

The quickstart begins with some standard import, which we will skip. It then imports several names from the VoiceStream package:

from voice_stream import map_step, log_step, recover_exception_step
from voice_stream.audio import AudioFormat
from voice_stream.integrations.fastapi import (
    fastapi_websocket_bytes_source,
    fastapi_websocket_bytes_sink,
)
from voice_stream.integrations.google import (
    google_speech_v1_step,
    google_text_to_speech_step,
)
from voice_stream.integrations.langchain import langchain_load_memory_step

In this case, we import the core map_step(), log_step(), and recover_exception_step() steps directly from voice_stream. Most of the core steps are there. We also import the AudioFormat enum. Most of the core source and sink functions are imported directly from voice_stream.

Integrations with 3rd party components, such as Google Cloud, are in the integrations package. Those are imported as voice_stream.integrations.<provider>. We use this to import the fastapi_websocket_bytes_source(), and fastapi_websocket_bytes_sink() to send and receive data to and from the browser.

Browser UI#

The next sections contains the HTML code for the browser UI. Because this is not hte focus of the quickstart, it’s just a minimal page with 2 buttons that start and stop the audio stream.

html = """
<!DOCTYPE html>
<html>
    <head><title>VoiceStream Quickstart</title></head>
    <body>
        <script 
        src="https://cdn.jsdelivr.net/gh/DaveDeCaprio/voice-stream@main/examples/static/audio_ws.js">
        </script>
        <button onclick="startAudio('audio-player', '/ws/audio')">Start Voice Chat</button>
        <button onclick="stopAudio()">Stop Voice Chat</button>
        <audio id="audio-player"></audio>
    </body>
</html>
"""

startAudio takes the id of an HTML ‘audio’ element for playing the audio, and the path to WebSocket endpoint for the audio. In this case, we route to /ws/audio, which will be used in our route down below.

You may notice that the HTML references a JavaScript file called audio_ws.js. This is a small JavaScript library that contains generic functions for sending and receiving audio data to and from WebSockets. It’s included as a separate file to keep the overall code length short, but if you want to understand how that works, it’s covered in the Cookbook - JS for Browser Audio.

Serve the UI#

The next lines create the FastAPI server, and provide a route to serve the HTML page we just created.

app = FastAPI()


@app.get("/")
def get():
    return HTMLResponse(html)


VoiceStream Data Flow#

All the code above was really just there to set up the server and UI. The actual VoiceBot is implemented at the bottom.

Client setup#

The next section initializes the Google speech and text-to-speech clients that we will use to convert the audio to and from text. It also creates a basic LangChain runnable to run the Google LLM. These will be used in the flow below.

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "google_creds.json"
speech_async_client = SpeechAsyncClient(
    client_options=ClientOptions(api_endpoint="us-speech.googleapis.com")
)
text_to_speech_async_client = TextToSpeechAsyncClient()
chain = (
    ChatPromptTemplate.from_messages([("human", "{query}")])
    | ChatVertexAI()
    | StrOutputParser()
)


Websocket Handler#

The meat of the work is all done in the websocket handler. This code is invoked when the browser makes a WebSocket connection to /ws/audio. It takes audio from the web socket, converts it to text using Google Speech Recognition, runs that text through the LangChain LLM, and then converts the output text to speech, which is sent back via the WebSocket.

@app.websocket("/ws/audio")
async def audio_websocket_endpoint(websocket: WebSocket):
    stream = fastapi_websocket_bytes_source(websocket)
    stream = google_speech_v1_step(
        stream,
        speech_async_client,
        audio_format=AudioFormat.WEBM_OPUS,
    )
    stream = log_step(stream, "Recognized speech")
    stream = map_step(stream, lambda x: {"query": x})
    stream = langchain_load_memory_step(stream, chain, on_completion="")
    stream = recover_exception_step(
        stream,
        Exception,
        lambda x: "Google blocked the response.  Ending conversation.",
    )
    stream = google_text_to_speech_step(
        stream, text_to_speech_async_client, audio_format=AudioFormat.MP3
    )
    stream = map_step(stream, lambda x: x.audio)
    await fastapi_websocket_bytes_sink(stream, websocket)

We’ll walk through this code in more detail. Let’s go line by line.

Quickstart Code#

@app.websocket("/ws/audio")
async def audio_websocket_endpoint(websocket: WebSocket):
    stream = fastapi_websocket_bytes_source(websocket)

First we set up the source using fastapi_websocket_bytes_source(). This creates the stream and returns an AsyncIterator[bytes] which will contain the audio data.

Speech Recognition#

@app.websocket("/ws/audio")
async def audio_websocket_endpoint(websocket: WebSocket):
    stream = fastapi_websocket_bytes_source(websocket)
    stream = google_speech_v1_step(
        stream,
        speech_async_client,
        audio_format=AudioFormat.WEBM_OPUS,
    )

Next, we send that audio data for speech recognition. We use the google_speech_v1_step() here because it requires less configuration than the standard google_speech_step(). V1 and V2 here refer to the versions of the Google Speech API. With the V1 step, we need to explicitly set the audio format. Here we set it to WEBM_OPUS, which is used by Chrome. The V2 step can detect the audio format automatically, and is what you should use in production applications.

The output of this step is an AsyncIterator[str] which contains the recognized speech.

@app.websocket("/ws/audio")
async def audio_websocket_endpoint(websocket: WebSocket):
    stream = fastapi_websocket_bytes_source(websocket)
    stream = google_speech_v1_step(
        stream,
        speech_async_client,
        audio_format=AudioFormat.WEBM_OPUS,
    )
    stream = log_step(stream, "Recognized speech")

The next step is a log_step() which writes out the recognized speech to the console. This makes it easy to watch and verify that the speech is understood correctly.

LangChain LLM#

@app.websocket("/ws/audio")
async def audio_websocket_endpoint(websocket: WebSocket):
    stream = fastapi_websocket_bytes_source(websocket)
    stream = google_speech_v1_step(
        stream,
        speech_async_client,
        audio_format=AudioFormat.WEBM_OPUS,
    )
    stream = log_step(stream, "Recognized speech")
    stream = map_step(stream, lambda x: {"query": x})

The LangChain we have created expects input in the form of a dictionary, but what’s being passed down the stream right now is a string.
We use a map_step() to format the string into the dictionary format that the LangChain expects.

@app.websocket("/ws/audio")
async def audio_websocket_endpoint(websocket: WebSocket):
    stream = fastapi_websocket_bytes_source(websocket)
    stream = google_speech_v1_step(
        stream,
        speech_async_client,
        audio_format=AudioFormat.WEBM_OPUS,
    )
    stream = log_step(stream, "Recognized speech")
    stream = map_step(stream, lambda x: {"query": x})
    stream = langchain_load_memory_step(stream, chain, on_completion="")

The langchain_step() runs the chain we created above. This step uses streaming mode by default, so it is capable of outputting a token at a time down the stream. ChatVertexAI doesn’t currently support this mode though, so that output will come out all at once. This creates a longer delay between the end of you speaking and the start of the response. In production applications, you wil probably want to use an LLM that supports streaming tokens.

The output of this step is an AsyncIterator[str] with the response from the LLM.

@app.websocket("/ws/audio")
async def audio_websocket_endpoint(websocket: WebSocket):
    stream = fastapi_websocket_bytes_source(websocket)
    stream = google_speech_v1_step(
        stream,
        speech_async_client,
        audio_format=AudioFormat.WEBM_OPUS,
    )
    stream = log_step(stream, "Recognized speech")
    stream = map_step(stream, lambda x: {"query": x})
    stream = langchain_load_memory_step(stream, chain, on_completion="")
    stream = recover_exception_step(
        stream,
        Exception,

The recover_exception_step() handles the case where for some reason the LLM blocks your response and doesn’t return a value. In that case, an exception is thrown, which propagates down the stream the say way normal data does. In this case, we recover from that exception by passing new text, which will tell the user about the error.

Text-to-Speech#

@app.websocket("/ws/audio")
async def audio_websocket_endpoint(websocket: WebSocket):
    stream = fastapi_websocket_bytes_source(websocket)
    stream = google_speech_v1_step(
        stream,
        speech_async_client,
        audio_format=AudioFormat.WEBM_OPUS,
    )
    stream = log_step(stream, "Recognized speech")
    stream = map_step(stream, lambda x: {"query": x})
    stream = langchain_load_memory_step(stream, chain, on_completion="")
    stream = recover_exception_step(
        stream,
        Exception,
        lambda x: "Google blocked the response.  Ending conversation.",
    )
    stream = google_text_to_speech_step(

We take the text from the LLM and pass it to a google_text_to_speech_step(). The output of this step is an AsyncIterator of AudioWithText objects. In this application, we only care about the audio, so we use a map_step() to extract just the audio, as shown in the next block.

@app.websocket("/ws/audio")
async def audio_websocket_endpoint(websocket: WebSocket):
    stream = fastapi_websocket_bytes_source(websocket)
    stream = google_speech_v1_step(
        stream,
        speech_async_client,
        audio_format=AudioFormat.WEBM_OPUS,
    )
    stream = log_step(stream, "Recognized speech")
    stream = map_step(stream, lambda x: {"query": x})
    stream = langchain_load_memory_step(stream, chain, on_completion="")
    stream = recover_exception_step(
        stream,
        Exception,
        lambda x: "Google blocked the response.  Ending conversation.",
    )
    stream = google_text_to_speech_step(
        stream, text_to_speech_async_client, audio_format=AudioFormat.MP3

Sink#

@app.websocket("/ws/audio")
async def audio_websocket_endpoint(websocket: WebSocket):
    stream = fastapi_websocket_bytes_source(websocket)
    stream = google_speech_v1_step(
        stream,
        speech_async_client,
        audio_format=AudioFormat.WEBM_OPUS,
    )
    stream = log_step(stream, "Recognized speech")
    stream = map_step(stream, lambda x: {"query": x})
    stream = langchain_load_memory_step(stream, chain, on_completion="")
    stream = recover_exception_step(
        stream,
        Exception,
        lambda x: "Google blocked the response.  Ending conversation.",
    )
    stream = google_text_to_speech_step(
        stream, text_to_speech_async_client, audio_format=AudioFormat.MP3
    )
    stream = map_step(stream, lambda x: x.audio)
    await fastapi_websocket_bytes_sink(stream, websocket)

Finally, the audio data is sent back to the browser for playback using an fastapi_websocket_bytes_sink().

Note that on the last line we await the sink. The sink will not actually complete until the websocket is closed. Data will keep flowing through the stream as long as it is open. This allows us to have many rounds of recognizing speech and returning responses all within the same flow.

Where To Go Now#

From here, you can go to the Concepts section to learn more about the core streaming abstractions on which VoiceStream is built, or jump right into the Cookbook or GitHub Examples to see how to accomplish specific tasks with VoiceStream.