1use crate::core::widget;
3use crate::futures::futures::channel::mpsc;
4use crate::futures::futures::channel::oneshot;
5use crate::futures::futures::future::{self, FutureExt};
6use crate::futures::futures::never::Never;
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{boxed_stream, BoxStream, MaybeSend};
9use crate::Action;
10
11use std::future::Future;
12
13#[allow(missing_debug_implementations)]
17#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
18pub struct Task<T>(Option<BoxStream<Action<T>>>);
19
20impl<T> Task<T> {
21    pub fn none() -> Self {
23        Self(None)
24    }
25
26    pub fn done(value: T) -> Self
28    where
29        T: MaybeSend + 'static,
30    {
31        Self::future(future::ready(value))
32    }
33
34    pub fn perform<A>(
37        future: impl Future<Output = A> + MaybeSend + 'static,
38        f: impl Fn(A) -> T + MaybeSend + 'static,
39    ) -> Self
40    where
41        T: MaybeSend + 'static,
42        A: MaybeSend + 'static,
43    {
44        Self::future(future.map(f))
45    }
46
47    pub fn run<A>(
50        stream: impl Stream<Item = A> + MaybeSend + 'static,
51        f: impl Fn(A) -> T + MaybeSend + 'static,
52    ) -> Self
53    where
54        T: 'static,
55    {
56        Self::stream(stream.map(f))
57    }
58
59    pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
62    where
63        T: 'static,
64    {
65        Self(Some(boxed_stream(stream::select_all(
66            tasks.into_iter().filter_map(|task| task.0),
67        ))))
68    }
69
70    pub fn map<O>(
72        self,
73        mut f: impl FnMut(T) -> O + MaybeSend + 'static,
74    ) -> Task<O>
75    where
76        T: MaybeSend + 'static,
77        O: MaybeSend + 'static,
78    {
79        self.then(move |output| Task::done(f(output)))
80    }
81
82    pub fn then<O>(
88        self,
89        mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static,
90    ) -> Task<O>
91    where
92        T: MaybeSend + 'static,
93        O: MaybeSend + 'static,
94    {
95        Task(match self.0 {
96            None => None,
97            Some(stream) => {
98                Some(boxed_stream(stream.flat_map(move |action| {
99                    match action.output() {
100                        Ok(output) => f(output)
101                            .0
102                            .unwrap_or_else(|| boxed_stream(stream::empty())),
103                        Err(action) => {
104                            boxed_stream(stream::once(async move { action }))
105                        }
106                    }
107                })))
108            }
109        })
110    }
111
112    pub fn chain(self, task: Self) -> Self
114    where
115        T: 'static,
116    {
117        match self.0 {
118            None => task,
119            Some(first) => match task.0 {
120                None => Task(Some(first)),
121                Some(second) => Task(Some(boxed_stream(first.chain(second)))),
122            },
123        }
124    }
125
126    pub fn collect(self) -> Task<Vec<T>>
128    where
129        T: MaybeSend + 'static,
130    {
131        match self.0 {
132            None => Task::done(Vec::new()),
133            Some(stream) => Task(Some(boxed_stream(
134                stream::unfold(
135                    (stream, Some(Vec::new())),
136                    move |(mut stream, outputs)| async move {
137                        let mut outputs = outputs?;
138
139                        let Some(action) = stream.next().await else {
140                            return Some((
141                                Some(Action::Output(outputs)),
142                                (stream, None),
143                            ));
144                        };
145
146                        match action.output() {
147                            Ok(output) => {
148                                outputs.push(output);
149
150                                Some((None, (stream, Some(outputs))))
151                            }
152                            Err(action) => {
153                                Some((Some(action), (stream, Some(outputs))))
154                            }
155                        }
156                    },
157                )
158                .filter_map(future::ready),
159            ))),
160        }
161    }
162
163    pub fn discard<O>(self) -> Task<O>
167    where
168        T: MaybeSend + 'static,
169        O: MaybeSend + 'static,
170    {
171        self.then(|_| Task::none())
172    }
173
174    pub fn abortable(self) -> (Self, Handle)
176    where
177        T: 'static,
178    {
179        match self.0 {
180            Some(stream) => {
181                let (stream, handle) = stream::abortable(stream);
182
183                (
184                    Self(Some(boxed_stream(stream))),
185                    Handle {
186                        raw: Some(handle),
187                        abort_on_drop: false,
188                    },
189                )
190            }
191            None => (
192                Self(None),
193                Handle {
194                    raw: None,
195                    abort_on_drop: false,
196                },
197            ),
198        }
199    }
200
201    pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
204    where
205        T: 'static,
206    {
207        Self::stream(stream::once(future))
208    }
209
210    pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
213    where
214        T: 'static,
215    {
216        Self(Some(boxed_stream(stream.map(Action::Output))))
217    }
218}
219
220#[derive(Debug, Clone)]
222pub struct Handle {
223    raw: Option<stream::AbortHandle>,
224    abort_on_drop: bool,
225}
226
227impl Handle {
228    pub fn abort(&self) {
230        if let Some(handle) = &self.raw {
231            handle.abort();
232        }
233    }
234
235    pub fn abort_on_drop(mut self) -> Self {
241        Self {
242            raw: self.raw.take(),
243            abort_on_drop: true,
244        }
245    }
246
247    pub fn is_aborted(&self) -> bool {
249        if let Some(handle) = &self.raw {
250            handle.is_aborted()
251        } else {
252            true
253        }
254    }
255}
256
257impl Drop for Handle {
258    fn drop(&mut self) {
259        if self.abort_on_drop {
260            self.abort();
261        }
262    }
263}
264
265impl<T> Task<Option<T>> {
266    pub fn and_then<A>(
270        self,
271        f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
272    ) -> Task<A>
273    where
274        T: MaybeSend + 'static,
275        A: MaybeSend + 'static,
276    {
277        self.then(move |option| option.map_or_else(Task::none, &f))
278    }
279}
280
281impl<T, E> Task<Result<T, E>> {
282    pub fn and_then<A>(
286        self,
287        f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
288    ) -> Task<A>
289    where
290        T: MaybeSend + 'static,
291        E: MaybeSend + 'static,
292        A: MaybeSend + 'static,
293    {
294        self.then(move |option| option.map_or_else(|_| Task::none(), &f))
295    }
296}
297
298impl<T> From<()> for Task<T> {
299    fn from(_value: ()) -> Self {
300        Self::none()
301    }
302}
303
304pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
307where
308    T: Send + 'static,
309{
310    channel(move |sender| {
311        let operation =
312            widget::operation::map(Box::new(operation), move |value| {
313                let _ = sender.clone().try_send(value);
314            });
315
316        Action::Widget(Box::new(operation))
317    })
318}
319
320pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
323where
324    T: MaybeSend + 'static,
325{
326    let (sender, receiver) = oneshot::channel();
327
328    let action = f(sender);
329
330    Task(Some(boxed_stream(
331        stream::once(async move { action }).chain(
332            receiver.into_stream().filter_map(|result| async move {
333                Some(Action::Output(result.ok()?))
334            }),
335        ),
336    )))
337}
338
339pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
342where
343    T: MaybeSend + 'static,
344{
345    let (sender, receiver) = mpsc::channel(1);
346
347    let action = f(sender);
348
349    Task(Some(boxed_stream(
350        stream::once(async move { action })
351            .chain(receiver.map(|result| Action::Output(result))),
352    )))
353}
354
355pub fn effect<T>(action: impl Into<Action<Never>>) -> Task<T> {
357    let action = action.into();
358
359    Task(Some(boxed_stream(stream::once(async move {
360        action.output().expect_err("no output")
361    }))))
362}
363
364pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
366    task.0
367}