core sinks#
- async voice_stream.array_sink(async_iter: AsyncIterator[T]) list[T]#
Data flow sink that collects items into a list.
This function asynchronously iterates over all items from the provided iterator and collects them into a list. This is useful for when you want to convert an asynchronous stream of data into a synchronous data structure like a list.
- Parameters:
async_iter (AsyncIterator[T]) – An asynchronous iterator from which items will be consumed. The generic type T can represent any type of item that the iterator yields.
- Returns:
A list containing all the items yielded by the asynchronous iterator.
- Return type:
list[T]
Examples
>>> stream = array_source([1,2,3]) >>> stream = log_step(stream, "Value") >>> out = await array_sink(stream) >>> assert out == [1,2,3]
Notes
This sink is ideal for scenarios where the entirety of an asynchronous data stream needs to be collected and processed at once.
The sink does end up holding all the collected items in memory, which can cause memory issues with very large streams.
- async voice_stream.binary_file_sink(async_iter: AsyncIterator[bytes], filename: str | Awaitable[str]) None#
Data flow sink that writes chunks of bytes to a binary file.
This function takes each block of bytes yielded by the provided async iterator and appends it to the specified binary file. The filename can be either a string or an awaitable object that resolves to a string.
- Parameters:
async_iter (AsyncIterator[bytes]) – An asynchronous iterator yielding blocks of bytes to be written to the file.
filename (AwaitableOrObj[str]) – The path to the binary file where the data will be appended. This can be a string or an awaitable object that yields a string.
- Returns:
This function does not return a value but completes when all blocks of bytes have been appended to the file.
- Return type:
None
- Raises:
IOError – If an I/O error occurs while opening or writing to the file.
Examples
>>> stream = binary_file_source("example.bin") >>> stream = log_step(stream, "Bytes read", lambda x: len(x)) >>> done = await binary_file_sink("copy.bin") Bytes read 4096 Bytes read 4096 Bytes read 1024 # Example output for a 9216-byte file
Notes
If an awaitable is passed, it is not waited on until the first item has been retrieved from the iterator.
- async voice_stream.empty_sink(async_iter: AsyncIterator[T]) None#
Data flow sink that performs no action on the received items.
This function asynchronously iterates over all items from the provided iterator, effectively ‘emptying’ it. Each item is retrieved but not used. This is useful in cases where the useful work in a data flow is done as a side effect of a previous iterator. Since data flows are “pull-based”, each branch must end in some kind of sink.
- Parameters:
async_iter (AsyncIterator[T]) – An asynchronous iterator whose items will be consumed. The type T is generic and can represent any type of item that the iterator yields.
- Returns:
This function does not return any value.
- Return type:
None
Examples
>>> # If your only goal is to log values and not do anything with them >>> # The data flow still needs to have a sink to run correctly. >>> >>> stream = array_source([1,2,3]) >>> stream = log_step(stream, "Value") >>> await empty_sink(stream)
- async voice_stream.queue_sink(async_iter: ~typing.AsyncIterator[~voice_stream.types.T], queue: ~asyncio.queues.Queue | ~typing.Awaitable[~asyncio.queues.Queue] | None = None, end_of_stream=<voice_stream.types._EndOfStreamType object>) Queue#
Data flow sink that writes items to a queue.
This function takes an asynchronous iterator and writes each element it yields into a queue. If a queue is provided, it is used; otherwise, a new asyncio.Queue is created. Upon completion or exception it signals the end of the stream.
- Parameters:
async_iter (AsyncIterator[T]) – An asynchronous iterator from which elements are read.
queue (Optional[AwaitableOrObj[asyncio.Queue]], optional) – An optional asyncio.Queue or an awaitable resulting in an asyncio.Queue. If not provided, a new
voice_stream:QueueWithExceptionis created.end_of_stream (The item to enqueue to indicate the stream has ended. Defaults to EndOfStreamMarker) –
- Returns:
The queue to which the elements from async_iter are written.
- Return type:
asyncio.Queue
Examples
>>> stream = array_source([1,2]) >>> queue = await queue_sink() >>> assert len(queue) == 2 >>> assert queue.get() == 1 >>> assert queue.get() == 2 >>> assert queue.empty()
Notes
If the Queue has a set_exception method, that will be called when an exception is thrown by the iterator. The
voice_stream:QueueWithExceptionhas this method, and uses it to throw the exception on the next call to queue.get.
- async voice_stream.text_file_sink(async_iter: AsyncIterator[Any], filename: str | Awaitable[str]) None#
Data flow sink that writes each element to a file, each as a new line.
This function takes each element from the provided async iterator and writes it to the specified text file. Each element is converted to a string and written as a separate line. The filename can be either a string or an awaitable object that resolves to a string.
- Parameters:
async_iter (AsyncIterator[Any]) – An asynchronous iterator whose elements are to be written to the file.
filename (AwaitableOrObj[str]) – The path to the file where the data will be written. This can be a string or an awaitable object that yields a string. The object will be awaited only after the first element is received from the iterator.
- Returns:
This function does not return a value but completes when all elements have been written to the file.
- Return type:
None
Examples
>>> stream = text_file_source("example.txt") >>> done = await text_file_sink(stream, "copy.txt")