iced_futures/
stream.rs

1//! Create asynchronous streams of data.
2use futures::channel::mpsc;
3use futures::stream::{self, Stream, StreamExt};
4
5use std::future::Future;
6
7/// Creates a new [`Stream`] that produces the items sent from a [`Future`]
8/// to the [`mpsc::Sender`] provided to the closure.
9///
10/// This is a more ergonomic [`stream::unfold`], which allows you to go
11/// from the "world of futures" to the "world of streams" by simply looping
12/// and publishing to an async channel from inside a [`Future`].
13pub fn channel<T, F>(
14    size: usize,
15    f: impl FnOnce(mpsc::Sender<T>) -> F,
16) -> impl Stream<Item = T>
17where
18    F: Future<Output = ()>,
19{
20    let (sender, receiver) = mpsc::channel(size);
21
22    let runner = stream::once(f(sender)).filter_map(|_| async { None });
23
24    stream::select(receiver, runner)
25}
26
27/// Creates a new [`Stream`] that produces the items sent from a [`Future`]
28/// that can fail to the [`mpsc::Sender`] provided to the closure.
29pub fn try_channel<T, E, F>(
30    size: usize,
31    f: impl FnOnce(mpsc::Sender<T>) -> F,
32) -> impl Stream<Item = Result<T, E>>
33where
34    F: Future<Output = Result<(), E>>,
35{
36    let (sender, receiver) = mpsc::channel(size);
37
38    let runner = stream::once(f(sender)).filter_map(|result| async {
39        match result {
40            Ok(()) => None,
41            Err(error) => Some(Err(error)),
42        }
43    });
44
45    stream::select(receiver.map(Ok), runner)
46}