iced_runtime/
task.rs

1//! Create runtime tasks.
2use crate::core::widget;
3use crate::futures::futures::channel::mpsc;
4use crate::futures::futures::channel::oneshot;
5use crate::futures::futures::future::{self, FutureExt};
6use crate::futures::futures::never::Never;
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{boxed_stream, BoxStream, MaybeSend};
9use crate::Action;
10
11use std::future::Future;
12
13/// A set of concurrent actions to be performed by the iced runtime.
14///
15/// A [`Task`] _may_ produce a bunch of values of type `T`.
16#[allow(missing_debug_implementations)]
17#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
18pub struct Task<T>(Option<BoxStream<Action<T>>>);
19
20impl<T> Task<T> {
21    /// Creates a [`Task`] that does nothing.
22    pub fn none() -> Self {
23        Self(None)
24    }
25
26    /// Creates a new [`Task`] that instantly produces the given value.
27    pub fn done(value: T) -> Self
28    where
29        T: MaybeSend + 'static,
30    {
31        Self::future(future::ready(value))
32    }
33
34    /// Creates a [`Task`] that runs the given [`Future`] to completion and maps its
35    /// output with the given closure.
36    pub fn perform<A>(
37        future: impl Future<Output = A> + MaybeSend + 'static,
38        f: impl Fn(A) -> T + MaybeSend + 'static,
39    ) -> Self
40    where
41        T: MaybeSend + 'static,
42        A: MaybeSend + 'static,
43    {
44        Self::future(future.map(f))
45    }
46
47    /// Creates a [`Task`] that runs the given [`Stream`] to completion and maps each
48    /// item with the given closure.
49    pub fn run<A>(
50        stream: impl Stream<Item = A> + MaybeSend + 'static,
51        f: impl Fn(A) -> T + MaybeSend + 'static,
52    ) -> Self
53    where
54        T: 'static,
55    {
56        Self::stream(stream.map(f))
57    }
58
59    /// Combines the given tasks and produces a single [`Task`] that will run all of them
60    /// in parallel.
61    pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
62    where
63        T: 'static,
64    {
65        Self(Some(boxed_stream(stream::select_all(
66            tasks.into_iter().filter_map(|task| task.0),
67        ))))
68    }
69
70    /// Maps the output of a [`Task`] with the given closure.
71    pub fn map<O>(
72        self,
73        mut f: impl FnMut(T) -> O + MaybeSend + 'static,
74    ) -> Task<O>
75    where
76        T: MaybeSend + 'static,
77        O: MaybeSend + 'static,
78    {
79        self.then(move |output| Task::done(f(output)))
80    }
81
82    /// Performs a new [`Task`] for every output of the current [`Task`] using the
83    /// given closure.
84    ///
85    /// This is the monadic interface of [`Task`]—analogous to [`Future`] and
86    /// [`Stream`].
87    pub fn then<O>(
88        self,
89        mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static,
90    ) -> Task<O>
91    where
92        T: MaybeSend + 'static,
93        O: MaybeSend + 'static,
94    {
95        Task(match self.0 {
96            None => None,
97            Some(stream) => {
98                Some(boxed_stream(stream.flat_map(move |action| {
99                    match action.output() {
100                        Ok(output) => f(output)
101                            .0
102                            .unwrap_or_else(|| boxed_stream(stream::empty())),
103                        Err(action) => {
104                            boxed_stream(stream::once(async move { action }))
105                        }
106                    }
107                })))
108            }
109        })
110    }
111
112    /// Chains a new [`Task`] to be performed once the current one finishes completely.
113    pub fn chain(self, task: Self) -> Self
114    where
115        T: 'static,
116    {
117        match self.0 {
118            None => task,
119            Some(first) => match task.0 {
120                None => Task(Some(first)),
121                Some(second) => Task(Some(boxed_stream(first.chain(second)))),
122            },
123        }
124    }
125
126    /// Creates a new [`Task`] that collects all the output of the current one into a [`Vec`].
127    pub fn collect(self) -> Task<Vec<T>>
128    where
129        T: MaybeSend + 'static,
130    {
131        match self.0 {
132            None => Task::done(Vec::new()),
133            Some(stream) => Task(Some(boxed_stream(
134                stream::unfold(
135                    (stream, Some(Vec::new())),
136                    move |(mut stream, outputs)| async move {
137                        let mut outputs = outputs?;
138
139                        let Some(action) = stream.next().await else {
140                            return Some((
141                                Some(Action::Output(outputs)),
142                                (stream, None),
143                            ));
144                        };
145
146                        match action.output() {
147                            Ok(output) => {
148                                outputs.push(output);
149
150                                Some((None, (stream, Some(outputs))))
151                            }
152                            Err(action) => {
153                                Some((Some(action), (stream, Some(outputs))))
154                            }
155                        }
156                    },
157                )
158                .filter_map(future::ready),
159            ))),
160        }
161    }
162
163    /// Creates a new [`Task`] that discards the result of the current one.
164    ///
165    /// Useful if you only care about the side effects of a [`Task`].
166    pub fn discard<O>(self) -> Task<O>
167    where
168        T: MaybeSend + 'static,
169        O: MaybeSend + 'static,
170    {
171        self.then(|_| Task::none())
172    }
173
174    /// Creates a new [`Task`] that can be aborted with the returned [`Handle`].
175    pub fn abortable(self) -> (Self, Handle)
176    where
177        T: 'static,
178    {
179        match self.0 {
180            Some(stream) => {
181                let (stream, handle) = stream::abortable(stream);
182
183                (
184                    Self(Some(boxed_stream(stream))),
185                    Handle {
186                        raw: Some(handle),
187                        abort_on_drop: false,
188                    },
189                )
190            }
191            None => (
192                Self(None),
193                Handle {
194                    raw: None,
195                    abort_on_drop: false,
196                },
197            ),
198        }
199    }
200
201    /// Creates a new [`Task`] that runs the given [`Future`] and produces
202    /// its output.
203    pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
204    where
205        T: 'static,
206    {
207        Self::stream(stream::once(future))
208    }
209
210    /// Creates a new [`Task`] that runs the given [`Stream`] and produces
211    /// each of its items.
212    pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
213    where
214        T: 'static,
215    {
216        Self(Some(boxed_stream(stream.map(Action::Output))))
217    }
218}
219
220/// A handle to a [`Task`] that can be used for aborting it.
221#[derive(Debug, Clone)]
222pub struct Handle {
223    raw: Option<stream::AbortHandle>,
224    abort_on_drop: bool,
225}
226
227impl Handle {
228    /// Aborts the [`Task`] of this [`Handle`].
229    pub fn abort(&self) {
230        if let Some(handle) = &self.raw {
231            handle.abort();
232        }
233    }
234
235    /// Returns a new [`Handle`] that will call [`Handle::abort`] whenever
236    /// it is dropped.
237    ///
238    /// This can be really useful if you do not want to worry about calling
239    /// [`Handle::abort`] yourself.
240    pub fn abort_on_drop(mut self) -> Self {
241        Self {
242            raw: self.raw.take(),
243            abort_on_drop: true,
244        }
245    }
246
247    /// Returns `true` if the [`Task`] of this [`Handle`] has been aborted.
248    pub fn is_aborted(&self) -> bool {
249        if let Some(handle) = &self.raw {
250            handle.is_aborted()
251        } else {
252            true
253        }
254    }
255}
256
257impl Drop for Handle {
258    fn drop(&mut self) {
259        if self.abort_on_drop {
260            self.abort();
261        }
262    }
263}
264
265impl<T> Task<Option<T>> {
266    /// Executes a new [`Task`] after this one, only when it produces `Some` value.
267    ///
268    /// The value is provided to the closure to create the subsequent [`Task`].
269    pub fn and_then<A>(
270        self,
271        f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
272    ) -> Task<A>
273    where
274        T: MaybeSend + 'static,
275        A: MaybeSend + 'static,
276    {
277        self.then(move |option| option.map_or_else(Task::none, &f))
278    }
279}
280
281impl<T, E> Task<Result<T, E>> {
282    /// Executes a new [`Task`] after this one, only when it succeeds with an `Ok` value.
283    ///
284    /// The success value is provided to the closure to create the subsequent [`Task`].
285    pub fn and_then<A>(
286        self,
287        f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
288    ) -> Task<A>
289    where
290        T: MaybeSend + 'static,
291        E: MaybeSend + 'static,
292        A: MaybeSend + 'static,
293    {
294        self.then(move |option| option.map_or_else(|_| Task::none(), &f))
295    }
296}
297
298impl<T> From<()> for Task<T> {
299    fn from(_value: ()) -> Self {
300        Self::none()
301    }
302}
303
304/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
305/// its output.
306pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
307where
308    T: Send + 'static,
309{
310    channel(move |sender| {
311        let operation =
312            widget::operation::map(Box::new(operation), move |value| {
313                let _ = sender.clone().try_send(value);
314            });
315
316        Action::Widget(Box::new(operation))
317    })
318}
319
320/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
321/// produces the value fed to the [`oneshot::Sender`].
322pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
323where
324    T: MaybeSend + 'static,
325{
326    let (sender, receiver) = oneshot::channel();
327
328    let action = f(sender);
329
330    Task(Some(boxed_stream(
331        stream::once(async move { action }).chain(
332            receiver.into_stream().filter_map(|result| async move {
333                Some(Action::Output(result.ok()?))
334            }),
335        ),
336    )))
337}
338
339/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
340/// produces the values fed to the [`mpsc::Sender`].
341pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
342where
343    T: MaybeSend + 'static,
344{
345    let (sender, receiver) = mpsc::channel(1);
346
347    let action = f(sender);
348
349    Task(Some(boxed_stream(
350        stream::once(async move { action })
351            .chain(receiver.map(|result| Action::Output(result))),
352    )))
353}
354
355/// Creates a new [`Task`] that executes the given [`Action`] and produces no output.
356pub fn effect<T>(action: impl Into<Action<Never>>) -> Task<T> {
357    let action = action.into();
358
359    Task(Some(boxed_stream(stream::once(async move {
360        action.output().expect_err("no output")
361    }))))
362}
363
364/// Returns the underlying [`Stream`] of the [`Task`].
365pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
366    task.0
367}