Browser Text and Audio#

The Quickstart provides a very basic intro to a browser based audio application. In that application, two way audio was set up with the browser and the audio was run through a VoiceStream to create a conversation with a Google LLM.

In this example, we expand on the quickstart by having the browser UI display the transcript of the conversation on the page. This demonstrates some of the flow controls available within VoiceStream.

browser_with_text

The full code for this application is in the examples directory of the VoiceStream repo. There is a full copy of the source at the bottom of this page also.

Design Approach#

This example is similar to the Quickstart, except that the browser makes two Websocket connections to the server. One for the audio, and a second connection to pass the text transcript. Additionally, the client now generates a unique id for the call so that the server can associate the two websockets together.

Within the server, the core audio flow stays the same, but we add two :func:~voice_stream.fork_steps to extract the text. One after the speech recognition and another before the text-to-speech.

The text is converted to TextInput and TextOutput objects that are put on a queue. The text websocket reads from this queue and sends all items to the browser.

Browser Code#

The startAudio and stopAudio functions from the quickstart are still used to handle the audio websocket. We add startChat and stopChat functions to set up the unique id and set up the second websocket.

startChat sets up a websocket handler that adds the text sent from the server to the page.

async function startChat() {
    const uuid = generateUUID();
    await startAudio("audio-player",`/ws/audio?id=${uuid}`);

    var resultDiv = document.getElementById('result');
    resultDiv.innerHTML = "";

    const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
    const host = window.location.host; // Includes hostname and port if present
    const wsUrl = `${protocol}//${host}/ws/text?id=${uuid}`;
    callStatusWebsocket = new WebSocket(wsUrl);
    var lastEventName = "";
    
    callStatusWebsocket.onmessage = function (event) {
        const data = JSON.parse(event.data)
        if (lastEventName === data.event_name)
            resultDiv.innerHTML += data.text;
        else
            resultDiv.innerHTML += `<br>${data.event_name}: ${data.text}`;
        lastEventName = data.event_name;
    }    
}

Text Websocket#

The handler for the text websocket is very simple. We first set up a global variable called call_queues to hold the queues which will be used to send the transcripts.

The handler first gets the appropriate queue, which will be set up in the call to the audio websocket.

It then routes everything from the queue directly to the websocket using :func:~voice_stream.queue_source and :func:~voice_stream.integrations.fastapi.fastapi_websocket_text_sink.

call_queues = {}


@app.websocket("/ws/text")
async def text_websocket_endpoint(websocket: WebSocket, id: str):
    queue = call_queues[id]
    stream = queue_source(queue)
    await fastapi_websocket_text_sink(stream, websocket)


Audio Websocket#

Here is the audio websocket endpoint. The differences between this and the quickstart are highlighted.

 1@app.websocket("/ws/audio")
 2async def audio_websocket_endpoint(websocket: WebSocket, id: str):
 3    call_queues[id] = asyncio.Queue()
 4    stream = fastapi_websocket_bytes_source(websocket)
 5    stream = google_speech_v1_step(
 6        stream,
 7        speech_async_client,
 8        audio_format=AudioFormat.WEBM_OPUS,
 9    )
10    stream, text_input_stream = fork_step(stream)
11    stream = map_step(stream, lambda x: {"query": x})
12    stream = langchain_load_memory_step(stream, chain, on_completion="")
13    stream = recover_exception_step(
14        stream,
15        Exception,
16        lambda x: "Google blocked the response.  Ending conversation.",
17    )
18    stream, text_output_stream = fork_step(stream)
19    stream = google_text_to_speech_step(
20        stream, text_to_speech_async_client, audio_format=AudioFormat.MP3
21    )
22    stream = map_step(stream, lambda x: x.audio)
23    done = fastapi_websocket_bytes_sink(stream, websocket)
24
25    text_input_stream = map_step(text_input_stream, lambda x: TextInput(text=x))
26    text_output_stream = map_step(text_output_stream, lambda x: TextOutput(text=x))
27    output_stream = merge_step(text_input_stream, text_output_stream)
28    queue_done = queue_sink(output_stream, call_queues[id])
29    await asyncio.gather(done, queue_done)
30
31
32logger.info("INIT")
  • In line 3, we create a queue and assign it to the id passed from the browser. This will be used by the text websocket.

  • In lines 10 and 18, we use a :func:~voice_stream.fork_step to get a copy of the text input and output to send to the text websocket.

  • In lines 25-28 the input and output streams are converted into objects, merged together, and sent to the queue.

  • In line 29, we await both the original audio stream and text stream. It’s important to always await on all streams in a data flow.

Full Source Code#

"""
Expands on the basic quickstart by adding in logging of the input and output text.
"""

import asyncio
import logging
import os

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
from google.api_core.client_options import ClientOptions
from google.cloud.speech_v1 import SpeechAsyncClient
from google.cloud.texttospeech_v1 import TextToSpeechAsyncClient
from langchain_community.chat_models import ChatVertexAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

from examples.example_events import TextInput, TextOutput

# 0 - VoiceStream imports
from voice_stream import (
    map_step,
    recover_exception_step,
    fork_step,
    merge_step,
    queue_sink,
    queue_source,
)
from voice_stream.audio import AudioFormat
from voice_stream.integrations.fastapi import (
    fastapi_websocket_bytes_source,
    fastapi_websocket_bytes_sink,
    fastapi_websocket_text_sink,
)
from voice_stream.integrations.google import (
    google_speech_v1_step,
    google_text_to_speech_step,
)
from voice_stream.integrations.langchain import langchain_load_memory_step

logging.basicConfig(
    level=logging.DEBUG, format="%(asctime)s %(name)s %(levelname)s - %(message)s"
)

logger = logging.getLogger(__name__)

# HTML shown by the browser
html = """
<!DOCTYPE html>
<html>
    <head><title>VoiceStream Quickstart</title></head>
    <body>
        <script 
        src="https://cdn.jsdelivr.net/gh/DaveDeCaprio/voice-stream@main/examples/static/audio_ws.js">
        </script>
        <button onclick="startChat()">Start Voice Chat</button>
        <button onclick="stopChat()">Stop Voice Chat</button>
        <audio id="audio-player"></audio>
        <div id="result"></div>
<script>
let textWebsocket;

/* Set up the websockets */
async function startChat() {
    const uuid = generateUUID();
    await startAudio("audio-player",`/ws/audio?id=${uuid}`);

    var resultDiv = document.getElementById('result');
    resultDiv.innerHTML = "";

    const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
    const host = window.location.host; // Includes hostname and port if present
    const wsUrl = `${protocol}//${host}/ws/text?id=${uuid}`;
    callStatusWebsocket = new WebSocket(wsUrl);
    var lastEventName = "";
    
    callStatusWebsocket.onmessage = function (event) {
        const data = JSON.parse(event.data)
        if (lastEventName === data.event_name)
            resultDiv.innerHTML += data.text;
        else
            resultDiv.innerHTML += `<br>${data.event_name}: ${data.text}`;
        lastEventName = data.event_name;
    }    
}

/* Close the websockets */
async function stopChat() {
    await stopAudio();
    if(textWebsocket) {
        textWebsocket.close()
    }
}

function generateUUID() {
    return ([1e7] + -1e3 + -4e3 + -8e3 + -1e11).replace(/[018]/g, c =>
            (c ^ crypto.getRandomValues(new Uint8Array(1))[0] & 15 >> c / 4).toString(16)
    );
}
</script>
        
    </body>
</html>
"""
# End HTML

app = FastAPI()


@app.get("/")
def get():
    return HTMLResponse(html)


os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "google_creds.json"
speech_async_client = SpeechAsyncClient(
    client_options=ClientOptions(api_endpoint="us-speech.googleapis.com")
)
text_to_speech_async_client = TextToSpeechAsyncClient()
chain = (
    ChatPromptTemplate.from_messages([("human", "{query}")])
    | ChatVertexAI()
    | StrOutputParser()
)

# Text Endpoint
call_queues = {}


@app.websocket("/ws/text")
async def text_websocket_endpoint(websocket: WebSocket, id: str):
    queue = call_queues[id]
    stream = queue_source(queue)
    await fastapi_websocket_text_sink(stream, websocket)


# Audio Endpoint
@app.websocket("/ws/audio")
async def audio_websocket_endpoint(websocket: WebSocket, id: str):
    call_queues[id] = asyncio.Queue()
    stream = fastapi_websocket_bytes_source(websocket)
    stream = google_speech_v1_step(
        stream,
        speech_async_client,
        audio_format=AudioFormat.WEBM_OPUS,
    )
    stream, text_input_stream = fork_step(stream)
    stream = map_step(stream, lambda x: {"query": x})
    stream = langchain_load_memory_step(stream, chain, on_completion="")
    stream = recover_exception_step(
        stream,
        Exception,
        lambda x: "Google blocked the response.  Ending conversation.",
    )
    stream, text_output_stream = fork_step(stream)
    stream = google_text_to_speech_step(
        stream, text_to_speech_async_client, audio_format=AudioFormat.MP3
    )
    stream = map_step(stream, lambda x: x.audio)
    done = fastapi_websocket_bytes_sink(stream, websocket)

    text_input_stream = map_step(text_input_stream, lambda x: TextInput(text=x))
    text_output_stream = map_step(text_output_stream, lambda x: TextOutput(text=x))
    output_stream = merge_step(text_input_stream, text_output_stream)
    queue_done = queue_sink(output_stream, call_queues[id])
    await asyncio.gather(done, queue_done)


logger.info("INIT")