twilio#
Twilio provides an API for making and receiving telephone calls. You can use Twilio along with VoiceStream to automate calls with an LLM.
- class voice_stream.integrations.twilio.TwilioInputFlow(audio: AsyncIterator[bytes], events: AsyncIterator[BaseEvent], all_twilio_messages: AsyncIterator[dict] | None, call_sid_f: Future[str], stream_sid_f: Future[str], inbound_queue_f: Future[Queue[BaseEvent]], outbound_queue_f: Future[Queue[BaseEvent]], current_calls: Dict[str, Any])#
Bases:
objectExperimental
- all_twilio_messages: AsyncIterator[dict] | None#
- audio: AsyncIterator[bytes]#
- call_sid_f: Future[str]#
- classmethod create(source: AsyncIterator[dict], close_func: Callable[[], None], current_calls: Dict[str, Any], expose_all_messages: bool = False) TwilioInputFlow#
Creates a basic flow to receive audio and call event data from Twilio. expose_all_messages - If true, all incoming twilio messages, including raw audio, will be set through the ‘all_twilio_messages’ iterator. Otherwise this iterator will be empty.
- current_calls: Dict[str, Any]#
- stream_sid_f: Future[str]#
- class voice_stream.integrations.twilio.TwilioSequenceError#
Bases:
ValueErrorIndicates a problem in the sequence numbers for messages received from Twilio.
- async voice_stream.integrations.twilio.audio_bytes_to_twilio_media_step(async_iter: AsyncIterator[bytes], stream_sid: str | Awaitable[str]) AsyncIterator[dict]#
Data flow step that formats audio bytes into outgoing Twilio media stream messages.
To play audio over a Twilio media stream, messages must be formatted as JSON with the audio bytes base64-encoded. This step converts raw audio data into the correct format for Twilio.
- Parameters:
async_iter (AsyncIterator[bytes]) – An asynchronous iterator over audio bytes
stream_sid (AwaitableOrObj[str]) – The Twilio stream id for the outgoing audio. This is passed with the
CallStartmessage when the call is initiated.
- Returns:
An asynchronous iterator yielding outgoing Twilio media stream messages.
- Return type:
AsyncIterator[bytes]
- voice_stream.integrations.twilio.extract_value_step(async_iter: AsyncIterator[T], value: Callable[[T], Any], condition: Callable[[T], bool] | None = None) Tuple[AsyncIterator[T], Future[Any]]#
Extracts a value from an async iterator based on a condition, returning the iterator and a future.
This function processes an asynchronous iterator, applying a given condition to each item. The first time an item meets the condition, a specified value extraction function is applied to it, and the result is set in an asyncio.Future. The function returns an async iterator and the future containing the extracted value. The output iterator produces all the same elements that go into it. If the condition is not provided, the value function is applied to the first item.
- Parameters:
async_iter (AsyncIterator[T]) – An input asynchronous iterator.
value (Callable[[T], Any]) – A function that extracts a value from an item in the iterator.
condition (Optional[Callable[[T], bool]], optional) – A function that returns True if the value should be extracted from the item. If None, the value is extracted from the first item.
- Returns:
The modified async iterator and a future containing the extracted value.
- Return type:
Tuple[AsyncIterator[T], asyncio.Future[Any]]
Examples
# Copy a file, generating a name based on the first line of the file. >>> stream = text_file_source(“example.txt”) >>> stream, name = extract_value_step(stream, lambda x: x) >>> done = await async_init_step(stream, lambda x: text_file_sink(x, resolve_awaitable_or_obj(name)), num_outputs=0)
- async voice_stream.integrations.twilio.filter_step(async_iter: AsyncIterator[T], condition: Callable[[T], bool]) AsyncIterator[T]#
Data flow step that filters items based on a specified condition.
This function wraps an async iterator and yields only those items that satisfy a given condition. It is analogous to the built-in filter function but for asynchronous iterators. Each item from the input async iterator is passed to the condition function, and only items for which this function returns True are yielded.
- Parameters:
async_iter (AsyncIterator[T]) – The asynchronous iterator whose items are to be filtered.
condition (Callable[[T], bool]) – A function that evaluates each item in the iterator. If this function returns True, the item is yielded; otherwise, it is skipped.
- Returns:
An asynchronous iterator yielding only the items that satisfy the condition.
- Return type:
AsyncIterator[T]
Examples
>>> stream = array_source(range(4)) >>> stream = filter_step(stream, lambda x: x % 2 == 0) >>> done = await array_sink(stream) >>> # Output: 0, 2
- voice_stream.integrations.twilio.fork_step(async_iter: AsyncIterator[T], pull_from_all: bool = False) Tuple[AsyncIterator[T], AsyncIterator[T]]#
Data flow step that splips a stream into two, creating a ‘fork’ in the stream.
This function takes an async iterator and divides its output into two separate async iterators. If pull_from_all is set to False (default), the consumption rate of items is determined by the rate of consumption of the first returned iterator; the second iterator’s output rate will match the first. If pull_from_all is True, both iterators will consume items independently as quickly as possible.
If pull_from_all is False, exceptions will only propagate to the first iterator. If True, exceptions will propagate to both iterators.
- Parameters:
async_iter (AsyncIterator[T]) – The asynchronous iterator to be forked.
pull_from_all (bool, default False) – If true, both forks will pull form the incoming iterator. If False, the second iterator will only receive data as fast as the first pulls.
- Returns:
A tuple of two asynchronous iterators representing the forked output.
- Return type:
Tuple[AsyncIterator[T], AsyncIterator[T]]
Examples
>>> source = array_source(range(2)) >>> a, b = fork_step(source) >>> a = await array_sink(a) >>> b = await array_sink(b) >>> assert a == [0, 1] >>> assert b == [0, 1]
Notes
With pull_from_all set to False, the rate of iteration in the first iterator controls the overall pace of item consumption from the original iterator.
With pull_from_all set to True, items are consumed from the original iterator as quickly as possible, and both iterators receive items independently.
- voice_stream.integrations.twilio.map_future(f: Future[T], func: Callable[[T], Output]) Future[Output]#
Returns a future that will be resolved with the result of applying func to the result of f.
Words are made lowercase and punctuation is removed before counting.
- Parameters:
input_file (str) – Path to text file.
- Returns:
dict-like object where keys are words and values are counts.
- Return type:
collections.Counter
Examples
>>> count_words("text.txt")
- async voice_stream.integrations.twilio.map_step(async_iter: AsyncIterator[T], func: Callable[[T], Output], ignore_none: bool | None = False) AsyncIterator[Output]#
Data flow step that transforms items using a mapping function.
This function applies either a synchronous or asynchronous mapping function to each item in the input async iterator. If ignore_none is set to True, any items that are transformed to None are not yielded. This feature allows the function to perform both transformation and filtering in a single step.
- Parameters:
async_iter (AsyncIterator[T]) – The asynchronous iterator whose items are to be transformed.
func (Callable[[T], Output]) – A mapping function to apply to each item. This can be either a synchronous or asynchronous function.
ignore_none (Optional[bool], default False) – If True, items that are transformed to None by func are not yielded.
- Returns:
An asynchronous iterator yielding transformed items, optionally skipping None values.
- Return type:
AsyncIterator[Output]
Examples
>>> stream = array_source(range(4)) >>> stream = map_step(stream, lambda x: x + 2) >>> done = await array_sink(stream) >>> assert done == [2,3,4,5]
- voice_stream.integrations.twilio.map_str_to_json_step(async_iter: AsyncIterator[str]) AsyncIterator[dict]#
Data flow step that parses JSON strings into dictionaries.
Each string element from the async iterator is parsed as JSON and transformed into a dictionary.
- Parameters:
async_iter (AsyncIterator[str]) – An asynchronous iterator yielding strings, each expected to be a valid JSON text.
- Returns:
An asynchronous iterator yielding dictionaries resulting from JSON parsing of each string element.
- Return type:
AsyncIterator[dict]
Examples
>>> stream = array_source([ ... '{"name": "Alice", "age": 30}', ... '{"name": "Bob", "age": 25}', ... ]) >>> stream = map_str_to_json_step(stream) >>> stream = map_step(stream, lambda x: x['age']) >>> await array_sink(stream) [30, 25] return map_step(async_iter, lambda x: json.loads(x))
- async voice_stream.integrations.twilio.merge_step(*async_iters: list[AsyncIterator[T]]) AsyncIterator[T]#
Data flow step that merges multiple streams into one, consuming from all iterators concurrently.
Unlike concatenation which consumes one iterator at a time, merge_step interleaves items from all provided iterators as they become available. The merged iterator continues until all input iterators are exhausted. If an exception occurs in any of the iterators, all others are canceled.
- Parameters:
async_iters (list[AsyncIterator[T]]) – A list of asynchronous iterators to be merged.
- Returns:
An asynchronous iterator yielding items from all input iterators in the order they become available.
- Return type:
AsyncIterator[T]
Examples
>>> async def async_iter1(): ... for i in [1,2,3]: ... await asyncio.sleep(0.3) ... yield i >>> async def async_iter2(): ... for i in [4,5,6]: ... await asyncio.sleep(0.5) ... yield i >>> stream1 = async_iter1() >>> stream2 = async_iter2() >>> stream = merge_step(stream1, stream2) >>> done = await array_sink(stream) >>> assert done == [1,4,2,3,5,6] # Output order is not guaranteed.
- voice_stream.integrations.twilio.partition_step(async_iter: AsyncIterator[T], condition: Callable[[T], bool]) Tuple[AsyncIterator[T], AsyncIterator[T]]#
Data flow step that partitions items into two output streams based on a specified condition.
This function divides the items from the input async iterator into two separate iterators: one for items where the condition evaluates to True, and another for items where the condition is False. If an exception occurs during iteration, it is propagated to both output iterators.
- Parameters:
async_iter (AsyncIterator[T]) – An asynchronous iterator whose items are to be partitioned.
condition (Callable[[T], bool]) – A function that evaluates each item in the iterator. Returns True if the item should go to the first iterator, and False for the second.
- Returns:
A tuple of two asynchronous iterators: the first yields items where the condition is True, and the second yields items where the condition is False.
- Return type:
Tuple[AsyncIterator[T], AsyncIterator[T]]
Examples
>>> stream = array_source(range(4)) >>> true_stream, false_stream = partition_step(stream, lambda x: x % 2 == 0) >>> true_done = array_sink(true_stream) >>> false_done = array_sink(false_stream) >>> true_ret, false_ret = await asyncio.gather(true_done, false_done) >>> assert true_ret == [0,2] >>> assert false_ret == [1,3]
- voice_stream.integrations.twilio.queue_source(queue: Queue[T] | Awaitable[Queue[T]] | None = None) AsyncIterator[T] | QueueAsyncIterator#
Data flow source that yields items from an asyncio.Queue.
This function returns an asynchronous iterator that consumes items from an asyncio.Queue. The iteration continues until an EndOfStreamMarker is encountered in the queue, signaling the end of iteration.
- Parameters:
queue (AwaitableOrObj[asyncio.Queue[T]], optional) – An instance of asyncio.Queue from which the items will be consumed. This can be an instance of the queue or an Awaitable that returns the queue, which can be useful if the queue isn’t yet created. This parameter is optional. If not provided, the AsyncIterator will be a
QueueAsyncIterator()which allows items to be added to the queue via the put method.- Returns:
An asynchronous iterator over the items in the queue.
- Return type:
AsyncIterator[T]
Examples
>>> queue = asyncio.Queue() >>> queue.put(1) >>> queue.put(2) >>> queue.put(EndOfStreamMarker) >>> stream = queue_source() >>> done = await array_sink(stream) >>> assert done == [1,2]
Notes
The function expects that the queue will be closed by putting EndOfStreamMarker into it.
If an Awaitable[Queue] is passed, this function will return immediately and the queue will be awaited when the iterator is started.
- async voice_stream.integrations.twilio.resolve_awaitable_or_obj(obj: T | Awaitable[T]) T#
Returns the result of an object or a future
- async voice_stream.integrations.twilio.twilio_check_sequence_step(async_iter: AsyncIterator[dict]) AsyncIterator[dict]#
Data flow step that verifies that no Twilio websocket messages were lost.
This step checks each message in the provided asynchronous iterator for a sequence number and validates that the messages are in the correct sequence. If a message is found to be out of sequence or missing a sequence number, a TwilioSequenceError is raised.
- Parameters:
async_iter (AsyncIterator[dict]) – An asynchronous iterator over Twilio websocket messages, where each message is expected to be a dictionary containing a ‘sequenceNumber’ key.
- Yields:
AsyncIterator[dict] – An asynchronous iterator yielding the same Twilio websocket messages after verifying their sequence.
- Raises:
TwilioSequenceError – If a message is missing a sequence number or if the sequence of messages is found to be out of order.
Notes
The function starts by expecting a sequence number of 1 and increments this expectation with each message.
- async voice_stream.integrations.twilio.twilio_close_on_stop_step(async_iter: AsyncIterator[dict], close_func: Callable[[], None | Coroutine[Any, Any, None]]) AsyncIterator[dict]#
Data flow step that calls a close function when a stop message is received.
Receives Twilio websocket messages and checks for a ‘stop’ event. Upon receiving a ‘stop’ message, it triggers a closure function (typically to close the websocket) and continues to yield the remaining messages.
- Parameters:
async_iter (AsyncIterator[dict]) – An asynchronous iterator over Twilio websocket messages.
close_func (Callable[[], None]) – A asynchronous callable function that is executed when a ‘stop’ event is received. This function is awaited when the ‘stop’ event occurs and is intended to close the websocket or perform other cleanup actions.
- Returns:
An asynchronous iterator yielding Twilio websocket messages.
- Return type:
AsyncIterator[dict]
Notes
The function does not stop yielding messages after the ‘stop’ event; it continues to yield any remaining messages in the iterator after performing the closure action.
- async voice_stream.integrations.twilio.twilio_format_events_step(async_iter: AsyncIterator[Dict]) AsyncIterator[BaseEvent]#
Data flow step to formats Twilio event messages into VoiceStream call status objects.
This function converts Twilio event messages into the VoiceStream generic call event classes. These include
CallStarted,CallEnded, andAnsweringMachineDetection- Parameters:
async_iter (AsyncIterator[bytes]) – An asynchronous iterator over Twilio event messages.
- Yields:
AsyncIterator[BaseEvent] – An asynchronous iterator yielding BaseEvent objects that represent formatted Twilio events.
Notes
Twilio messages that don’t correspond to events are ignored.
For ‘start’ events, it creates a CallStarted event object with the call and stream IDs.
For ‘stop’ events, it creates a CallEnded event object.
- async voice_stream.integrations.twilio.twilio_media_to_audio_bytes_step(async_iter: AsyncIterator[dict]) AsyncIterator[bytes]#
Data flow step that extracts the audio bytes from Twilio media stream messages.
Twilio media stream messages come in as JSON, which are usually converted to Python dictionaries. Each dictionary contains a ‘media’ key with a ‘payload’ sub-key. The function decodes the base64-encoded payload into bytes representing audio data and yields these bytes.
- Parameters:
async_iter (AsyncIterator[dict]) – An asynchronous iterator over Twilio media stream messages.
- Yields:
AsyncIterator[bytes] – An asynchronous iterator yielding audio data in bytes.
- Raises:
ValueError – If a message in the stream does not contain a ‘media’ key or if the ‘media’ key does not have a ‘payload’ sub-key.
Notes
The function is specifically designed to handle media streams from Twilio, converting the audio into bytes objects for further processing.
- voice_stream.integrations.twilio.twilio_split_media_step(async_iter: AsyncIterator[dict]) Tuple[AsyncIterator[dict], AsyncIterator[dict]]#
Data flow step that splits a Twilio stream into two separate streams, one for media stream and another for control messages.
- Parameters:
async_iter (AsyncIterator[dict]) – An asynchronous iterator over Twilio stream events. Each event is represented as a dictionary.
- Returns:
A tuple of two asynchronous iterators. The first iterator yields dictionaries of media stream events, and the second yields dictionaries of control messages.
- Return type:
Tuple[AsyncIterator[dict], AsyncIterator[dict]]
Notes
The function relies on the presence of an “event” key in each dictionary to determine the type of the message.
This function is useful for handling different types of events in a Twilio stream separately, especially in applications that need to process media and control messages differently.
Examples
>>> media_stream, control_messages = twilio_split_media_step(async_iter)