langchain#

The LangChain is a framework for building LLM based applications. VoiceStream allows any LangChain runnable to be used to power a voice application.

voice_stream.integrations.langchain.cancelable_substream_step(async_iter: AsyncIterator[T], cancel_iter: AsyncIterator[T], substream_func: Callable[[AsyncIterator[T]], AsyncIterator[Output] | Tuple], cancel_messages: List[Callable[[], T] | T | None] | None = None) AsyncIterator[T] | Tuple#

Data flow step that runs a substream for each input, but takes a second iterator which causes the current substream to cancel.

Calls the substream_func to create a new substream for each item from the source iterator. If any item is produced from the cancel_iter during the processing of this substream, the substream is immediately stopped. When a stop occurs, cancel_messages are optionally sent down each output stream.

Parameters:
  • async_iter – The input AsyncIterator which we want to create substreams for.

  • cancel_iter – The cancel stream. If an item appears on this iterator, the processing of the current substream is immediately stopped.

  • substream_func – The function used to generate AsyncIterators for each substream.

  • cancel_messages – An optional list of items to produce in the stream when a substream is cancelled. If present, this must be a list which has the same length as the number of outputs returns by substream_func. Each element will determine how cancels our signaled down that particular data stream. If an AsyncIterator is passed, that iterator will be put into the stream. If any other object is passed, that signal object will be sent. If None is passed, then nothing will be sent down the stream.

Returns:

Either a single AsyncIterator or a tuple of multiple AsyncIterators. The length is determined by the number of iterators returns from the substream_func.

Return type:

OptionalMultipleOutputs

Notes

  • If you want to explicitly send None when a cancel occurs, use a voice_stream:none_source().

voice_stream.integrations.langchain.filter_spurious_speech_start_events_step(async_iter: AsyncIterator[BaseEvent], threshold_secs: float = 1.0)#

Data flow step the filters a stream of speech events to remove false positives.

This step filters speech events so that if a SpeechStart is quickly followed by a SpeechEnd, it is considered spurious and ignored. This is useful for avoiding false detections which could create unnecessary interruptions.

Parameters:
  • async_iter (AsyncIterator[voice_stream.events.BaseEvent]) – An asynchronous iterator that yields bytes of audio data.

  • threshold_secs (float) – The number of seconds to wait for a SpeechEnd after a SpeechStart is received.

Returns:

A modified event stream that removes the spurious SpeechStart events.

Return type:

AsyncIterator[voice_stream.events.BaseEvent]

Example

>>> stream, speech_events = speech_step(async_iter)
>>> speech_events = filter_spurious_speech_start_events_step(speech_events)

Notes

  • Using this step does cause a delay in the movement of SpeechStart events down the stream.

voice_stream.integrations.langchain.fork_step(async_iter: AsyncIterator[T], pull_from_all: bool = False) Tuple[AsyncIterator[T], AsyncIterator[T]]#

Data flow step that splips a stream into two, creating a ‘fork’ in the stream.

This function takes an async iterator and divides its output into two separate async iterators. If pull_from_all is set to False (default), the consumption rate of items is determined by the rate of consumption of the first returned iterator; the second iterator’s output rate will match the first. If pull_from_all is True, both iterators will consume items independently as quickly as possible.

If pull_from_all is False, exceptions will only propagate to the first iterator. If True, exceptions will propagate to both iterators.

Parameters:
  • async_iter (AsyncIterator[T]) – The asynchronous iterator to be forked.

  • pull_from_all (bool, default False) – If true, both forks will pull form the incoming iterator. If False, the second iterator will only receive data as fast as the first pulls.

Returns:

A tuple of two asynchronous iterators representing the forked output.

Return type:

Tuple[AsyncIterator[T], AsyncIterator[T]]

Examples

>>> source = array_source(range(2))
>>> a, b = fork_step(source)
>>> a = await array_sink(a)
>>> b = await array_sink(b)
>>> assert a == [0, 1]
>>> assert b == [0, 1]

Notes

  • With pull_from_all set to False, the rate of iteration in the first iterator controls the overall pace of item consumption from the original iterator.

  • With pull_from_all set to True, items are consumed from the original iterator as quickly as possible, and both iterators receive items independently.

voice_stream.integrations.langchain.langchain_load_memory_step(async_iter: AsyncIterator[Dict], memory: BaseMemory) AsyncIterator[Dict]#

Data flow step that adds variables from LangChain memory into a dictionary.

This step is used to insert variables from the memory into the current flow. It expects a dictionary as input and returns a dictionary as output. Used to prepare input for a langchain_step.

Parameters:
  • async_iter (AsyncIterator[str]) – An asynchronous iterator that provides dictionaries.

  • memory (BaseMemory) – A Langchain BaseMemory that contains the variables to insert

Returns:

An asynchronous iterator yielding dictionaries with memory variables added.

Return type:

AsyncIterator[Dic]

Notes

  • If the incoming dictionary contains any of the variables that are included in the history, they will override the values in the memory.

voice_stream.integrations.langchain.langchain_save_memory_step(async_iter: AsyncIterator[Dict], memory: BaseMemory, input_key='input', output_key='output') AsyncIterator[Dict]#

Data flow step that adds saves variables into LangChain memory.

This step is used to save variables to memory into the current flow. It expects a dictionary as input and returns the same dictionary as output. It has a side effect of updating the memory. Updating memory requires two dictionaries, one for input and one for output. These are found by looking at the input_key and output_key values within the dictionary.

Parameters:
  • async_iter (AsyncIterator[str]) – An asynchronous iterator that provides dictionaries.

  • memory (BaseMemory) – A Langchain BaseMemory to update

  • input_key (str) – Key within the input dictionary whose value will be used as the input parameter to save_context.

  • output_key (str) – Key within the input dictionary whose value will be used as the output parameter to save_context.

Returns:

An asynchronous iterator that is a copy of the input iterator.

Return type:

AsyncIterator[Dic]

async voice_stream.integrations.langchain.langchain_step(async_iter: AsyncIterator[Input], chain: Runnable[Input, Output], input_key: str | None = None, config_key: str | None = None, on_completion: Callable[[], T] | T | None = None) AsyncIterator[Output]#

Data flow step that passes each input item to a LangChain runnable and streams the output.

This step is used to call LLMs or run any other LangChain runnable. It receives text items, processes them through a specified ‘chain’, and yields the resulting output tokens asynchronously. If input_key is not specified, the input will be directly passed to the runnable. If input_key is specified, then the items coming from the source iterator must be dictionaries, and the input_key specifies which item should be passed as input to the runnable. The config_key can also be used to pass configuration to the Runnable.

Parameters:
  • async_iter (AsyncIterator[str]) – An asynchronous iterator that provides input text items.

  • chain (Runnable[Input, Output]) – A Langchain Runnable that processes each input item.

  • input_key (Optional[str], optional) – If specified, the item from the incoming dictionary that should be used as the input to the Runnable. If empty, then the incoming item will be passed directly.

  • config_key (Optional[str], optional) – If specified, the item from the incoming dictionary that should be used as the config argument to the Runnable. Can only be specified if input_key is specified.

  • on_completion (SourceConvertable, optional) – An optional source to be converted and iterated upon completion of processing each text item. Can be used to signal the end of output if that isn’t clear form the output of the chain itself.

Returns:

An asynchronous iterator yielding the output from the LangChain Runnable.

Return type:

AsyncIterator[Output]

async voice_stream.integrations.langchain.map_step(async_iter: AsyncIterator[T], func: Callable[[T], Output], ignore_none: bool | None = False) AsyncIterator[Output]#

Data flow step that transforms items using a mapping function.

This function applies either a synchronous or asynchronous mapping function to each item in the input async iterator. If ignore_none is set to True, any items that are transformed to None are not yielded. This feature allows the function to perform both transformation and filtering in a single step.

Parameters:
  • async_iter (AsyncIterator[T]) – The asynchronous iterator whose items are to be transformed.

  • func (Callable[[T], Output]) – A mapping function to apply to each item. This can be either a synchronous or asynchronous function.

  • ignore_none (Optional[bool], default False) – If True, items that are transformed to None by func are not yielded.

Returns:

An asynchronous iterator yielding transformed items, optionally skipping None values.

Return type:

AsyncIterator[Output]

Examples

>>> stream = array_source(range(4))
>>> stream = map_step(stream, lambda x: x + 2)
>>> done = await array_sink(stream)
>>> assert done == [2,3,4,5]
voice_stream.integrations.langchain.to_source(x: Callable[[], T] | T | None) AsyncIterator[T]#

Creates a source from a callable or value.

Tries to turn the input value into a VoiceStream source. - If it is a callable, assumes that is a callable that returns an AsyncIterator - If it is an object, return a single_source containing the object. - If it is None, return an empty_source.

To create a source that explicitly yields None, pass lambda x: none_source()

voice_stream.integrations.langchain.tts_with_buffer_and_rate_limit_step(async_iter: AsyncIterator[str], tts_step: Callable[[AsyncIterator[str]], AsyncIterator[AudioWithText]], buffer_seconds: float = 0.5)#

Data flow step that wraps a text to speech (TTS) step with buffering before the TTS and rate limiting on the output.

Creates a data stream to handle a typical TTS flow. It buffers input tokens to produce output as soon as possible but still keeping phrases together to produce natural sounding TTS. It applies rate limiting to both the audio and text output.

Parameters:
  • async_iter (AsyncIterator[str]) – An asynchronous iterator yielding text strings to be converted to speech.

  • tts_step (TextToSpeechStep) – A function that takes an async iterator of text strings and returns an async iterator of audio bytes.

  • buffer_seconds (float, default 0.5) – The duration (in seconds) of audio to buffer for rate limiting.

Returns:

A tuple containing rate-limited iterators over the audio and text streams.

Return type:

Tuple[AsyncIterator[bytes], AsyncIterator[str]]

Examples

>>> tts_step = google_text_to_speech_step(
...     stream, text_to_speech_async_client, audio_format=AudioFormat.MP3
... )
>>> stream = array_source(["Hello", "World"])
>>> audio_stream, text_stream = tts_with_buffer_and_rate_limit_step(stream, tts_step)
>>> audio_done = empty_sink(audio_stream)
>>> text_done = array_sink(text_stream)
>>> audio_out, text_out = asyncio.gather(audio_done, text_done)
>>> assert text_out == [TimedText("Hello", 0.2), TimedText("World", 0.2)]