1use futures::channel::mpsc;
3use futures::stream::{self, Stream, StreamExt};
4
5use std::future::Future;
6
7pub fn channel<T, F>(
14 size: usize,
15 f: impl FnOnce(mpsc::Sender<T>) -> F,
16) -> impl Stream<Item = T>
17where
18 F: Future<Output = ()>,
19{
20 let (sender, receiver) = mpsc::channel(size);
21
22 let runner = stream::once(f(sender)).filter_map(|_| async { None });
23
24 stream::select(receiver, runner)
25}
26
27pub fn try_channel<T, E, F>(
30 size: usize,
31 f: impl FnOnce(mpsc::Sender<T>) -> F,
32) -> impl Stream<Item = Result<T, E>>
33where
34 F: Future<Output = Result<(), E>>,
35{
36 let (sender, receiver) = mpsc::channel(size);
37
38 let runner = stream::once(f(sender)).filter_map(|result| async {
39 match result {
40 Ok(()) => None,
41 Err(error) => Some(Err(error)),
42 }
43 });
44
45 stream::select(receiver.map(Ok), runner)
46}