core steps#

voice_stream.async_init_step(async_iter: AsyncIterator[T], f: Callable[[AsyncIterator[T]], Coroutine], num_outputs: int = 1) AsyncIterator[Output]#

Data flow step that asyncronously initializes a step.

This step takes an AsyncIterator and a lambda that takes the iterator and returns an Awaitable. It eagerly pulls the first item from the iterator and separately calls the lambda and awaits the result in a separate task. This is useful when you ahve a step that requires an async call to initialize (which we generally avoid). It’s also useful when the initialization depends on a value produced from earlier in the data flow.

Parameters:
  • async_iter (AsyncIterator[T]) – An input asynchronous iterator.

  • f (Callable[[AsyncIterator[T]], Coroutine]) – A coroutine function that takes the input async iterator and returns another async iterator

  • num_outputs (int, optional) – The number of output async iterators to expect from f. Defaults to 1. Use this if the step you are asynchronously initializing returns more than one iterator, such as a voice_stream:fork_step. num_outputs can be 0 in the case where the asynchronous initialization is for a sink.

Returns:

if num_outputs is 1, then returns an AsyncIterator otherwise, returns a tuple of AsyncIterators the same length as num_outputs

Return type:

AsyncIterator[Output] or Tuple[AsyncIterator[Output], …]

Examples

# Copy a file, generating a name based on the first line of the file. >>> stream = text_file_source(“example.txt”) >>> stream, name = extract_value_step(stream, lambda x: x) >>> done = await async_init_step(stream, lambda x: text_file_sink(x, resolve_awaitable_or_obj(name)), num_outputs=0)

voice_stream.audio_rate_limit_step(async_iter: AsyncIterator[bytes], audio_format: AudioFormat | Awaitable[AudioFormat], buffer_seconds: float)#

Data flow step that rate-limits the audio data coming in.

This step takes in audio data and produces the same audio data with delays introduced so that the downstream iterator only gets buffer_seconds worth of audio at once. Rate-limiting provides the ability to stop the audio stream due to an interruption or other event.

Parameters:
  • async_iter (AsyncIterator[bytes]) – An asynchronous iterator returning bytes of audio data.

  • audio_format (AwaitableOrObj[voice_stream.audio.AudioFormat]) – The format of the audio data. Can be an Awaitable if the format isn’t known when the step is created.

  • buffer_seconds (float) – The amount of audio to pass to the downstream iterator.

Returns:

The same audio bytes that came in, but rate-limited so that the downstream consumer only gets buffer_seconds worth of audio.

Return type:

audio

Raises:

AudioFormatError – If the audio format is not supported.

Notes

  • This function will break up long chunks of data in a format-specific way to perform the rate-limiting.

voice_stream.buffer_tts_text_step(async_iter: AsyncIterator[str]) AsyncIterator[str]#

Data flow step that buffers text for input to Text-to-Speech (TTS).

When performing realtime TTS off of a token stream, there is a tradeoff. You want to produce audio as soon as possible after receiving the first token, but passing longer utterances to a TTS engine produces more natural sounding speech. This step buffers tokens to achieve a good balance by:

  1. Waiting for punctuation: Waits for some form of punctuation in the token stream to indicate the end of a phrase and then passes the full phrase.

  2. Buffers tokens received between pulls. This step eagerly pulls tokens from the incoming iterator and buffers them. This way, any new tokens that come in while the TTS system is generating the current utterance are grouped together.

Parameters:

async_iter (AsyncIterator[str]) – An asynchronous iterator that provides a stream of text.

Returns:

The buffered TTS text.

Return type:

AsyncIterator[str]

Example

>>> stream = array_source(["Hello", " world!", " How", " are", " you", " today", ""])
>>> stream = buffer_tts_text_step(stream)
>>> out = await array_sink(stream)
>>> assert out == ["Hello world!", " How are you today"]

Note

  • An incoming empty string indicates an end of response and will cause the buffer to be flushed.

voice_stream.byte_buffer_step(async_iter: AsyncIterator[bytes]) AsyncIterator[bytes]#

Data flow step that buffers and aggregates byte sequences to match producer and consumer rates.

This function acts as a buffer specifically for byte streams. It pulls bytes from the incoming iterator as fast

as they can come in, and then outputs aggreged byte buffers only as fast as the output iterator can pull them.

Parameters:

async_iter (AsyncIterator[bytes]) – An asynchronous iterator yielding byte sequences.

Returns:

An asynchronous iterator yielding concatenated byte sequences.

Return type:

AsyncIterator[bytes]

Examples

>>> async def rate_limit_sink(ai):
...     out = []
...     async for item in ai:
...         await asyncio.sleep(0.5)
...         out.append(item)
>>> stream = array_source([b'a',b'b',b'c',b'd'])
>>> stream = byte_buffer_step(stream)
>>> done = await rate_limit_sink(stream)
>>> assert done == [b'abcd']
voice_stream.cancelable_substream_step(async_iter: AsyncIterator[T], cancel_iter: AsyncIterator[T], substream_func: Callable[[AsyncIterator[T]], AsyncIterator[Output] | Tuple], cancel_messages: List[Callable[[], T] | T | None] | None = None) AsyncIterator[T] | Tuple#

Data flow step that runs a substream for each input, but takes a second iterator which causes the current substream to cancel.

Calls the substream_func to create a new substream for each item from the source iterator. If any item is produced from the cancel_iter during the processing of this substream, the substream is immediately stopped. When a stop occurs, cancel_messages are optionally sent down each output stream.

Parameters:
  • async_iter – The input AsyncIterator which we want to create substreams for.

  • cancel_iter – The cancel stream. If an item appears on this iterator, the processing of the current substream is immediately stopped.

  • substream_func – The function used to generate AsyncIterators for each substream.

  • cancel_messages – An optional list of items to produce in the stream when a substream is cancelled. If present, this must be a list which has the same length as the number of outputs returns by substream_func. Each element will determine how cancels our signaled down that particular data stream. If an AsyncIterator is passed, that iterator will be put into the stream. If any other object is passed, that signal object will be sent. If None is passed, then nothing will be sent down the stream.

Returns:

Either a single AsyncIterator or a tuple of multiple AsyncIterators. The length is determined by the number of iterators returns from the substream_func.

Return type:

OptionalMultipleOutputs

Notes

  • If you want to explicitly send None when a cancel occurs, use a voice_stream:none_source().

async voice_stream.chunk_bytes_step(async_iter: AsyncIterator[bytes], chunk_size: int | Awaitable[int]) AsyncIterator[bytes]#

Data flow step that breaks long byte sequences from an asynchronous iterator into fixed-size chunks.

This function takes an asynchronous iterator yielding byte sequences and a chunk size, then yields these byte sequences in chunks of the specified size. The chunk size can be provided either as a direct integer value or as an awaitable object that resolves to an integer. This is useful for processing large byte sequences in manageable sizes.

Parameters:
  • async_iter (AsyncIterator[bytes]) – An asynchronous iterator that yields byte sequences.

  • chunk_size (AwaitableOrObj[int]) – The size of each chunk as an integer, or an awaitable object that yields an integer.

Returns:

An asynchronous iterator yielding byte chunks of the specified size.

Return type:

AsyncIterator[bytes]

Examples

>>> stream = array_source([b'HelloWorld!', b'PythonAsyncIO']):
>>> stream = chunk_bytes_step(stream, chunk_size=5)
>>> done = await array_sink(stream)
>>> assert done == [b'Hello', b'World', b'!', b'Pytho', b'nAsyn', b'cIO']
async voice_stream.collect_dict_step(async_iter: AsyncIterator[dict]) AsyncIterator[dict]#

Data flow step that appends values to a dictionary until an empty value is passed.

This step combines dictionary values coming out of an iterator. It adds new keys to the output dictionary and appends values if possible (if not it replaces values). It emits the current dictionary when an empty input is pulled form the incoming iterator. Useful for combining incremental results coming from a streamed LangChain.

Parameters:

async_iter (AsyncIterator[Iterable[T]]) – An asynchronous iterator where each item is an iterable of elements.

Returns:

An asynchronous iterator yielding combined dictionaries.

Return type:

AsyncIterator[T]

Examples

>>> stream = array_source([{'text':'Hello'}, {'text':' World!', 'confidence':0.75}, None])
>>> stream = collect_dict_step(stream)
>>> done = await array_sink(stream)
>>> assert done == [{'text':"Hello World!", 'confidence': 0.75}]
async voice_stream.concat_step(*async_iters: List[AsyncIterator[T]]) AsyncIterator[T]#

Data flow step that combines multiple streams by concatenating them.

This function takes a list of asynchronous iterators and concatenates their elements into a single async iterator. It iterates over each input iterator in the order they are provided, yielding all items from each before moving on to the next.

Parameters:

async_iters (List[AsyncIterator[T]]) – A list of asynchronous iterators to be concatenated.

Returns:

A single asynchronous iterator yielding items from all provided iterators in sequence.

Return type:

AsyncIterator[T]

Examples

>>> stream1 = array_source([1,2])
>>> stream2 = array_source([3,4])
>>> stream = concat_step(stream1, stream2)
>>> done = await array_sink(stream)
>>> assert done == [1,2,3,4]
async voice_stream.count_step(async_iter: AsyncIterator[T], name: str) AsyncIterator[T]#

Data flow step that counts the number of items.

This function takes an async iterator and counts the number of items that pass through it. After the iteration completes, it logs the total count of items. This is useful for monitoring or debugging the flow of data through async streamlines.

Parameters:
  • async_iter (AsyncIterator[T]) – The asynchronous iterator whose items are to be counted.

  • name (str) – A name to use in the log message for identifying the count source.

Returns:

An asynchronous iterator that counts each item and then yields it.

Return type:

AsyncIterator[T]

Examples

>>> stream = array_source([1,2,3])
>>> stream = count_step(stream, "Value")
>>> await empty_sink()
Value count: 3
voice_stream.extract_value_step(async_iter: AsyncIterator[T], value: Callable[[T], Any], condition: Callable[[T], bool] | None = None) Tuple[AsyncIterator[T], Future[Any]]#

Extracts a value from an async iterator based on a condition, returning the iterator and a future.

This function processes an asynchronous iterator, applying a given condition to each item. The first time an item meets the condition, a specified value extraction function is applied to it, and the result is set in an asyncio.Future. The function returns an async iterator and the future containing the extracted value. The output iterator produces all the same elements that go into it. If the condition is not provided, the value function is applied to the first item.

Parameters:
  • async_iter (AsyncIterator[T]) – An input asynchronous iterator.

  • value (Callable[[T], Any]) – A function that extracts a value from an item in the iterator.

  • condition (Optional[Callable[[T], bool]], optional) – A function that returns True if the value should be extracted from the item. If None, the value is extracted from the first item.

Returns:

The modified async iterator and a future containing the extracted value.

Return type:

Tuple[AsyncIterator[T], asyncio.Future[Any]]

Examples

# Copy a file, generating a name based on the first line of the file. >>> stream = text_file_source(“example.txt”) >>> stream, name = extract_value_step(stream, lambda x: x) >>> done = await async_init_step(stream, lambda x: text_file_sink(x, resolve_awaitable_or_obj(name)), num_outputs=0)

voice_stream.filter_spurious_speech_start_events_step(async_iter: AsyncIterator[BaseEvent], threshold_secs: float = 1.0)#

Data flow step the filters a stream of speech events to remove false positives.

This step filters speech events so that if a SpeechStart is quickly followed by a SpeechEnd, it is considered spurious and ignored. This is useful for avoiding false detections which could create unnecessary interruptions.

Parameters:
  • async_iter (AsyncIterator[voice_stream.events.BaseEvent]) – An asynchronous iterator that yields bytes of audio data.

  • threshold_secs (float) – The number of seconds to wait for a SpeechEnd after a SpeechStart is received.

Returns:

A modified event stream that removes the spurious SpeechStart events.

Return type:

AsyncIterator[voice_stream.events.BaseEvent]

Example

>>> stream, speech_events = speech_step(async_iter)
>>> speech_events = filter_spurious_speech_start_events_step(speech_events)

Notes

  • Using this step does cause a delay in the movement of SpeechStart events down the stream.

async voice_stream.filter_step(async_iter: AsyncIterator[T], condition: Callable[[T], bool]) AsyncIterator[T]#

Data flow step that filters items based on a specified condition.

This function wraps an async iterator and yields only those items that satisfy a given condition. It is analogous to the built-in filter function but for asynchronous iterators. Each item from the input async iterator is passed to the condition function, and only items for which this function returns True are yielded.

Parameters:
  • async_iter (AsyncIterator[T]) – The asynchronous iterator whose items are to be filtered.

  • condition (Callable[[T], bool]) – A function that evaluates each item in the iterator. If this function returns True, the item is yielded; otherwise, it is skipped.

Returns:

An asynchronous iterator yielding only the items that satisfy the condition.

Return type:

AsyncIterator[T]

Examples

>>> stream = array_source(range(4))
>>> stream = filter_step(stream, lambda x: x % 2 == 0)
>>> done = await array_sink(stream)
>>>
# Output: 0, 2
async voice_stream.flatten_step(async_iter: AsyncIterator[Iterable[T] | AsyncIterator[T]]) AsyncIterator[T]#

Data flow step that flattens an iterator of iterators into a single iterator.

This function takes an asynchronous iterator where each item is itself an async iterator or a regular iterable (like a list or tuple) and flattens it. This means it iterates over each element of these iterables in order, yielding them individually. It’s useful for converting a stream of iterables into a flat stream of elements.

Parameters:

async_iter (AsyncIterator[Iterable[T]]) – An asynchronous iterator where each item is an iterable of elements.

Returns:

An asynchronous iterator yielding individual elements from each iterable in the input async iterator.

Return type:

AsyncIterator[T]

Examples

>>> stream = array_source([[1,2], [3,4]])
>>> stream = flatten_step(stream)
>>> done = await array_sink(stream)
>>> assert done == [1, 2, 3, 4]
voice_stream.fork_step(async_iter: AsyncIterator[T], pull_from_all: bool = False) Tuple[AsyncIterator[T], AsyncIterator[T]]#

Data flow step that splips a stream into two, creating a ‘fork’ in the stream.

This function takes an async iterator and divides its output into two separate async iterators. If pull_from_all is set to False (default), the consumption rate of items is determined by the rate of consumption of the first returned iterator; the second iterator’s output rate will match the first. If pull_from_all is True, both iterators will consume items independently as quickly as possible.

If pull_from_all is False, exceptions will only propagate to the first iterator. If True, exceptions will propagate to both iterators.

Parameters:
  • async_iter (AsyncIterator[T]) – The asynchronous iterator to be forked.

  • pull_from_all (bool, default False) – If true, both forks will pull form the incoming iterator. If False, the second iterator will only receive data as fast as the first pulls.

Returns:

A tuple of two asynchronous iterators representing the forked output.

Return type:

Tuple[AsyncIterator[T], AsyncIterator[T]]

Examples

>>> source = array_source(range(2))
>>> a, b = fork_step(source)
>>> a = await array_sink(a)
>>> b = await array_sink(b)
>>> assert a == [0, 1]
>>> assert b == [0, 1]

Notes

  • With pull_from_all set to False, the rate of iteration in the first iterator controls the overall pace of item consumption from the original iterator.

  • With pull_from_all set to True, items are consumed from the original iterator as quickly as possible, and both iterators receive items independently.

voice_stream.interruptable_substream_step(async_iter: AsyncIterator[T], substream_func: Callable[[AsyncIterator[T]], Callable[[AsyncIterator[T]], AsyncIterator[Output] | Tuple]], cancel_messages: List[Callable[[], T] | T | None] | None = None) AsyncIterator[T] | Tuple#

Data flow step that creates a substream which will get interrupted if a new value comes in.

For each input, creates a new substream and runs it. If a new input is available before the substream completes, this cancels the existing substream and starts a new one. This is similar to cancelable_substream_step() except that it uses the same iterator for input values and to trigger cancellation.

Parameters:
  • async_iter – The input AsyncIterator which we want to create substreams for.

  • substream_func – The function used to generate AsyncIterators for each substream.

  • cancel_messages – An optional list of items to produce in the stream when a substream is cancelled. If present, this must be a list which has the same length as the number of outputs returns by substream_func. Each element will determine how cancels our signaled down that particular data stream. If an AsyncIterator is passed, that iterator will be put into the stream. If any other object is passed, that signal object will be sent. If None is passed, then nothing will be sent down the stream.

Returns:

Either a single AsyncIterator or a tuple of multiple AsyncIterators. The length is determined by the number of iterators returns from the substream_func.

Return type:

OptionalMultipleOutputs

async voice_stream.log_step(async_iter: ~typing.AsyncIterator[~voice_stream.types.T], name: str, formatter: ~typing.Callable[[~voice_stream.types.T], ~typing.Any] = <function <lambda>>) AsyncIterator[T]#

Data flow step that prints using the Python logger.

This function takes an async iterator and logs each item that passes through it. The logging includes a specified name and uses an optional formatter function to convert items to a log-friendly format. This is useful for debugging or monitoring the contents of an data flow.

Parameters:
  • async_iter (AsyncIterator[T]) – The asynchronous iterator whose items are to be logged.

  • name (str) – A name to prepend to each logged message for identification.

  • formatter (Callable[[T], Any], optional) – A function to format each item before logging. Defaults to a function that returns the item unchanged.

Returns:

An asynchronous iterator that logs each item and then yields it.

Return type:

AsyncIterator[T]

Examples

>>> stream = array_source([1,2,3])
>>> stream = log_step(stream, "Value squared", lambda x: x*x)
>>> await empty_sink()
Value squared: 1
Value squared: 4
Value squared: 9
async voice_stream.map_step(async_iter: AsyncIterator[T], func: Callable[[T], Output], ignore_none: bool | None = False) AsyncIterator[Output]#

Data flow step that transforms items using a mapping function.

This function applies either a synchronous or asynchronous mapping function to each item in the input async iterator. If ignore_none is set to True, any items that are transformed to None are not yielded. This feature allows the function to perform both transformation and filtering in a single step.

Parameters:
  • async_iter (AsyncIterator[T]) – The asynchronous iterator whose items are to be transformed.

  • func (Callable[[T], Output]) – A mapping function to apply to each item. This can be either a synchronous or asynchronous function.

  • ignore_none (Optional[bool], default False) – If True, items that are transformed to None by func are not yielded.

Returns:

An asynchronous iterator yielding transformed items, optionally skipping None values.

Return type:

AsyncIterator[Output]

Examples

>>> stream = array_source(range(4))
>>> stream = map_step(stream, lambda x: x + 2)
>>> done = await array_sink(stream)
>>> assert done == [2,3,4,5]
voice_stream.map_str_to_json_step(async_iter: AsyncIterator[str]) AsyncIterator[dict]#

Data flow step that parses JSON strings into dictionaries.

Each string element from the async iterator is parsed as JSON and transformed into a dictionary.

Parameters:

async_iter (AsyncIterator[str]) – An asynchronous iterator yielding strings, each expected to be a valid JSON text.

Returns:

An asynchronous iterator yielding dictionaries resulting from JSON parsing of each string element.

Return type:

AsyncIterator[dict]

Examples

>>> stream = array_source([
...     '{"name": "Alice", "age": 30}',
...     '{"name": "Bob", "age": 25}',
... ])
>>> stream = map_str_to_json_step(stream)
>>> stream = map_step(stream, lambda x: x['age'])
>>> await array_sink(stream)
[30, 25]
return map_step(async_iter, lambda x: json.loads(x))
async voice_stream.merge_as_dict_step(dict_iters: dict[str, AsyncIterator[Any]]) AsyncIterator[dict]#

Data flow step that merges multiple streams into a single one that yields dictionaries.

Each iterator in the input dictionary is associated with a key. The function concurrently consumes items from all iterators and yields dictionaries. Each dictionary contains the most recent item from each iterator, keyed by the respective iterator’s key. The output frequency is driven by the first iterator specified in the input dictionary.

Parameters:

dict_iters (dict[str, AsyncIterator[Any]]) – A dictionary mapping keys to asynchronous iterators. The first iterator in the dictionary drives the output frequency.

Returns:

An asynchronous iterator yielding dictionaries. Each dictionary combines the most recent items from each input iterator under their corresponding keys.

Return type:

AsyncIterator[dict]

Raises:

ValueError – If dict_iters is empty.

Examples

>>> async def async_iter1():
...     for i in [1,2,3]:
...         await asyncio.sleep(0.3)
...         yield i
>>> async def async_iter2():
...     for i in [4,5,6]:
...         await asyncio.sleep(0.5)
...         yield i
>>> stream1 = async_iter1()
>>> stream2 = async_iter2()
>>> stream = merge_as_dict_step({'a':stream1, 'b':stream2})
>>> done = await array_sink(stream)
>>> assert done == [
...     {'a':1},
...     {'a':2, 'b':4},
...     {'a':3, 'b':4},
... ]
async voice_stream.merge_step(*async_iters: list[AsyncIterator[T]]) AsyncIterator[T]#

Data flow step that merges multiple streams into one, consuming from all iterators concurrently.

Unlike concatenation which consumes one iterator at a time, merge_step interleaves items from all provided iterators as they become available. The merged iterator continues until all input iterators are exhausted. If an exception occurs in any of the iterators, all others are canceled.

Parameters:

async_iters (list[AsyncIterator[T]]) – A list of asynchronous iterators to be merged.

Returns:

An asynchronous iterator yielding items from all input iterators in the order they become available.

Return type:

AsyncIterator[T]

Examples

>>> async def async_iter1():
...     for i in [1,2,3]:
...         await asyncio.sleep(0.3)
...         yield i
>>> async def async_iter2():
...     for i in [4,5,6]:
...         await asyncio.sleep(0.5)
...         yield i
>>> stream1 = async_iter1()
>>> stream2 = async_iter2()
>>> stream = merge_step(stream1, stream2)
>>> done = await array_sink(stream)
>>> assert done == [1,4,2,3,5,6]
# Output order is not guaranteed.
async voice_stream.min_tokens_step(async_iter: AsyncIterator[str], min_tokens: int) AsyncIterator[str]#

Data flow step that aggregates string inputs to ensure a minimum number of tokens.

The step assumes each input item is a single token, and concatenates the tokens until either an empty string is received or the number of concatenated tokens is at least min_tokens.

Parameters:
  • async_iter (AsyncIterator[TimedText]) – An asynchronous iterator over TimedText objects. Each TimedText object contains text and a duration in seconds.

  • min_tokens (int) – The minimum number of tokens to buffer before outputting the concatenated value.

Notes

  • If an empty token is detected, the function will automatically flush (i.e., clear and reset) the token list.

voice_stream.partition_step(async_iter: AsyncIterator[T], condition: Callable[[T], bool]) Tuple[AsyncIterator[T], AsyncIterator[T]]#

Data flow step that partitions items into two output streams based on a specified condition.

This function divides the items from the input async iterator into two separate iterators: one for items where the condition evaluates to True, and another for items where the condition is False. If an exception occurs during iteration, it is propagated to both output iterators.

Parameters:
  • async_iter (AsyncIterator[T]) – An asynchronous iterator whose items are to be partitioned.

  • condition (Callable[[T], bool]) – A function that evaluates each item in the iterator. Returns True if the item should go to the first iterator, and False for the second.

Returns:

A tuple of two asynchronous iterators: the first yields items where the condition is True, and the second yields items where the condition is False.

Return type:

Tuple[AsyncIterator[T], AsyncIterator[T]]

Examples

>>> stream = array_source(range(4))
>>> true_stream, false_stream = partition_step(stream, lambda x: x % 2 == 0)
>>> true_done = array_sink(true_stream)
>>> false_done = array_sink(false_stream)
>>> true_ret, false_ret = await asyncio.gather(true_done, false_done)
>>> assert true_ret == [0,2]
>>> assert false_ret == [1,3]
async voice_stream.raw_audio_rate_limit_step(async_iter: AsyncIterator[bytes], bytes_per_second: int | Awaitable[int], buffer_seconds: float | Awaitable[float]) AsyncIterator[bytes]#

Data flow step that performs rate-liming on chunks of audio data coming in.

This step rate limits input objects based on a given sample rate. Generally, using audio_rate_limit_step()

is preferred to using this step, as that step handles the details of different audio formats. This step does not break up long chunks or handle differing formats. It takes in bytes objects, determines the length of the audio based on bytes_per_second and outputs the identical chunks it got with a delay.

Parameters:
  • async_iter (AsyncIterator[bytes]) – The input audio data

  • bytes_per_second (AwaitableOrObj[int],) – The playback rate of the audio in bytes. This is used to compute the duration of the audio based on the length of the byte array.

  • buffer_seconds (float) – The number of seconds of audio left before we send the next chunk.

Note

Usually, you will want to put a max size chunk step in before this.

async voice_stream.recover_exception_step(async_iter: AsyncIterator[T], exception_type: Type[BaseException], exception_handler: Callable[[BaseException], Any]) AsyncIterator[T]#

Data flow step that recovers from an exception in a previous step.

This function takes an async iterator and a specified exception type. If an exception with the specified type occurs during iteration, it is caught and passed to the provided exception handler function. The result depends on what is returned by the exception handler: * If it returns an AsyncIterator, that will be yielded and this step will end when the iterator ends. * If it returns None, this step will immediately end. * If it returns anything else, that object will be yielded and this step will end.

Parameters:
  • async_iter (AsyncIterator[T]) – The asynchronous iterator to be wrapped.

  • exception_type (Type[BaseException]) – The type of exception to be caught and handled.

  • exception_handler (Callable[[BaseException], Any]) – A function to handle the caught exception. Returns an optional value that will be converted to an AsyncIterator.

Returns:

An asynchronous iterator that handles exceptions with the specified type.

Return type:

AsyncIterator[T]

Examples

>>> def gen():
...    yield 1
...    yield 2
...    raise ValueError("No more")
>>> stream = gen()
>>> stream = recover_exeption_step(stream, ValueError, lambda x: x.args[0])
>>> done = await array_sink(stream)
>>> assert done == [1,2,"No more"]

Notes

  • Once the source iterator throws the exception, it will not be accessed again.

voice_stream.speech_with_start_detection_step(async_iter: AsyncIterator[bytes], speech_step: Callable[[AsyncIterator[bytes]], Tuple[AsyncIterator[str], AsyncIterator[BaseEvent]]])#

Data flow step to perform speech recognition on a stream of audio and produce a robust start detection event.

Takes a normal speech recognition step and filters the speech events to remove false SpeechStart events.

Parameters:
  • async_iter (AsyncIterator[bytes]) – An asynchronous iterator that yields bytes of audio data.

  • speech_step (SpeechStep) – A function that takes an async iterator as input and returns a tuple of a stream and speech events.

Returns:

A tuple of the audio stream and filtered speech start events.

Return type:

tuple

Example

>>> speech_step = google_speech_v1_step(
...    stream,
...    speech_async_client,
...    audio_format=AudioFormat.WEBM_OPUS,
...    include_events=True,
... )
>>> stream = fastapi_websocket_bytes_source(websocket)
>>> stream, speech_events = speech_with_start_detection_step(stream, speech_step)
>>> stream = merge_step(stream, speech_events)
>>> await array_sink(stream)
[SpeechStart(time_since_start=1.2), SpeechEnd(time_since_start=2.5), "Hello, how are you?"]
voice_stream.str_buffer_step(async_iter: AsyncIterator[str]) AsyncIterator[str]#

Data flow step that buffers and aggregates text strings to match producer and consumer rates.

This function acts as a buffer specifically for text string. It pulls text from the incoming iterator as fast

as it can come in, and then outputs concatenated text only as fast as the output iterator can pull.

Parameters:

async_iter (AsyncIterator[bytes]) – An asynchronous iterator yielding text.

Returns:

An asynchronous iterator yielding concatenated text.

Return type:

AsyncIterator[str]

Examples

>>> async def rate_limit_sink(ai):
...     out = []
...     async for item in ai:
...         await asyncio.sleep(0.5)
...         out.append(item)
>>> stream = array_source(['a','b','c','d'])
>>> stream = str_buffer_step(stream)
>>> done = await rate_limit_sink(stream)
>>> assert done == ['abcd']
async voice_stream.substream_on_dict_key_step(async_iter: AsyncIterator[dict], key: str, substream_func: Callable[[AsyncIterator[Any]], AsyncIterator[Any]]) AsyncIterator[dict]#

Data flow step that updates a value in a dictionary with the result of a substream.

This step takes in a dictionary and produces a new dictionary that has one key modified. The modified value comes from running a substream on the existing value.

Parameters:
  • async_iter (AsyncIterator[dict]) – An asynchronous iterator that yields dictionaries.

  • key (str) – The key in the dictionary on which to perform the substreaming.

  • substream_func (callable) – A function that takes an AsyncIterator and returns a stream based off that iterator.

Returns:

An asynchronous iterator that yields the modified dictionaries.

Return type:

AsyncIterator[dict]

Example

>>> def substream(async_iter):
...     return map_step(async_iter, lambda x: x+2)
>>> stream = array_source([{'a', 1, 'b':2}])
>>> stream = substream_on_dict_key_step(stream, "b", substream)
>>> out = await array_sink(stream)
>>> assert out == [{'a', 1, 'b':4}]
async voice_stream.substream_step(async_iter: AsyncIterator[T], substream_func: Callable[[AsyncIterator[T]], AsyncIterator[Output]]) AsyncIterator[Output]#

Data flow step that runs a new stream on each item.

A substream is useful when you want to group steps together for error handling or flow control. This step calls the substream_func to create a new substream for each item from the source iterator. The output of this step is the output of the substreams. Each instance of the substream only gets one input value.

Parameters:
  • async_iter (AsyncIterator[T]) – An asynchronous iterator.

  • substream_func (str) – A function that takes an AsyncIterator and creates a stream off of it.

Returns:

An asynchronous iterator over the values produced by the substreams.

Return type:

AsyncIterator[T]

Returns:

  • >>> instance_count = 0

  • >>> def substream(async_iter)

  • … nonlocal instance_count

  • … instance_count += 1

  • … return map_step(async_iter, lambda x (x+instance_count))

  • >>> stream = array_source([1,1,1])

  • >>> stream = substream(stream, substream)

  • >>> out = await array_sink(stream)

  • >>> assert out == [2, 3, 4]

async voice_stream.timed_text_rate_limit_step(async_iter: AsyncIterator[TimedText], buffer_seconds: float = 0.5) AsyncIterator[str]#

Data flow step that takes TimedText objects and yield their text tokens based on the indicated timing.

This step is usually used in combination with a rate limiting step. The rate at which the text should be outputted is produced in a previous step and passed in TimedText objects. This step then produces the raw text with the appropriate delays.

Parameters:
  • async_iter (AsyncIterator[TimedText]) – An asynchronous iterator over TimedText objects. Each TimedText object contains text and a duration in seconds.

  • buffer_seconds (float, optional) – A buffer time in seconds to control the rate of text token output. The buffer is designed so that the downstream iterator will always have at least buffer_seconds of text to display.

Returns:

An asynchronous iterator yielding text tokens.

Return type:

AsyncIterator[str]

Notes

  • The step will impose delays to slow down the rate at which objects pass through the stream, but because streams are pull-based, the downstream iterator must be ready to consume the objects that fast. This step will never go faster than the downstream iterator allows.

voice_stream.tts_rate_limit_step(async_iter: AsyncIterator[AudioWithText], audio_format: AudioFormat | Awaitable[AudioFormat], buffer_seconds: float = 0.5, include_text_output: bool = True) AsyncIterator[bytes] | Tuple[AsyncIterator[bytes], AsyncIterator[str]]#

Applies rate limiting to an audio stream from a TTS output, optionally including synchronized text output.

This function processes an asynchronous iterator of TTS audio output, each paired with text. It applies rate limiting to ensure that audio is emitted at a realistic rate based on its duration. Additionally, it can output timed text that is synchronized with the audio output. The text timing is adjusted based on the audio format and the buffer duration. Rate-limiting allows the audio to be cancelled if an interruption occurs.

Parameters:
  • async_iter (AsyncIterator[voice_stream.AudioWithText]) – An asynchronous iterator yielding audio data paired with text.

  • audio_format (AwaitableOrObj[voice_stream.audio.AudioFormat]) – The format of the audio stream, which can be an awaitable or a direct object.

  • buffer_seconds (float, default 0.5) – The duration in seconds to buffer for rate limiting.

  • include_text_output (bool, default True) – If True, includes a text output iterator that yields text synchronized with the audio rate.

Returns:

Depending on include_text_output, either an async iterator of audio bytes or a tuple of async iterators for audio bytes and synchronized text.

Return type:

Union[AsyncIterator[bytes], Tuple[AsyncIterator[bytes], AsyncIterator[str]]]

voice_stream.tts_with_buffer_and_rate_limit_step(async_iter: AsyncIterator[str], tts_step: Callable[[AsyncIterator[str]], AsyncIterator[AudioWithText]], buffer_seconds: float = 0.5)#

Data flow step that wraps a text to speech (TTS) step with buffering before the TTS and rate limiting on the output.

Creates a data stream to handle a typical TTS flow. It buffers input tokens to produce output as soon as possible but still keeping phrases together to produce natural sounding TTS. It applies rate limiting to both the audio and text output.

Parameters:
  • async_iter (AsyncIterator[str]) – An asynchronous iterator yielding text strings to be converted to speech.

  • tts_step (TextToSpeechStep) – A function that takes an async iterator of text strings and returns an async iterator of audio bytes.

  • buffer_seconds (float, default 0.5) – The duration (in seconds) of audio to buffer for rate limiting.

Returns:

A tuple containing rate-limited iterators over the audio and text streams.

Return type:

Tuple[AsyncIterator[bytes], AsyncIterator[str]]

Examples

>>> tts_step = google_text_to_speech_step(
...     stream, text_to_speech_async_client, audio_format=AudioFormat.MP3
... )
>>> stream = array_source(["Hello", "World"])
>>> audio_stream, text_stream = tts_with_buffer_and_rate_limit_step(stream, tts_step)
>>> audio_done = empty_sink(audio_stream)
>>> text_done = array_sink(text_stream)
>>> audio_out, text_out = asyncio.gather(audio_done, text_done)
>>> assert text_out == [TimedText("Hello", 0.2), TimedText("World", 0.2)]
async voice_stream.wait_for_punctuation_step(async_iter: AsyncIterator[str]) AsyncIterator[str]#

Data flow step that buffers incoming text tokens into phrases based on punctuation.

Parameters:

async_iter (AsyncIterator[str]) – An asynchronous iterator yielding text strings to be converted to speech.

Returns:

The input token concatenated into phrases based on punctuation.

Return type:

AsyncIterator[str]