1//! Create asynchronous streams of data.
2use futures::channel::mpsc;
3use futures::stream::{self, Stream, StreamExt};
45use std::future::Future;
67/// Creates a new [`Stream`] that produces the items sent from a [`Future`]
8/// to the [`mpsc::Sender`] provided to the closure.
9///
10/// This is a more ergonomic [`stream::unfold`], which allows you to go
11/// from the "world of futures" to the "world of streams" by simply looping
12/// and publishing to an async channel from inside a [`Future`].
13pub fn channel<T, F>(
14 size: usize,
15 f: impl FnOnce(mpsc::Sender<T>) -> F,
16) -> impl Stream<Item = T>
17where
18F: Future<Output = ()>,
19{
20let (sender, receiver) = mpsc::channel(size);
2122let runner = stream::once(f(sender)).filter_map(|_| async { None });
2324 stream::select(receiver, runner)
25}
2627/// Creates a new [`Stream`] that produces the items sent from a [`Future`]
28/// that can fail to the [`mpsc::Sender`] provided to the closure.
29pub fn try_channel<T, E, F>(
30 size: usize,
31 f: impl FnOnce(mpsc::Sender<T>) -> F,
32) -> impl Stream<Item = Result<T, E>>
33where
34F: Future<Output = Result<(), E>>,
35{
36let (sender, receiver) = mpsc::channel(size);
3738let runner = stream::once(f(sender)).filter_map(|result| async {
39match result {
40Ok(()) => None,
41Err(error) => Some(Err(error)),
42 }
43 });
4445 stream::select(receiver.map(Ok), runner)
46}