1use crate::core::widget;
3use crate::futures::futures::channel::mpsc;
4use crate::futures::futures::channel::oneshot;
5use crate::futures::futures::future::{self, FutureExt};
6use crate::futures::futures::never::Never;
7use crate::futures::futures::stream::{self, Stream, StreamExt};
8use crate::futures::{boxed_stream, BoxStream, MaybeSend};
9use crate::Action;
10
11use std::future::Future;
12
13#[allow(missing_debug_implementations)]
17#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
18pub struct Task<T>(Option<BoxStream<Action<T>>>);
19
20impl<T> Task<T> {
21 pub fn none() -> Self {
23 Self(None)
24 }
25
26 pub fn done(value: T) -> Self
28 where
29 T: MaybeSend + 'static,
30 {
31 Self::future(future::ready(value))
32 }
33
34 pub fn perform<A>(
37 future: impl Future<Output = A> + MaybeSend + 'static,
38 f: impl Fn(A) -> T + MaybeSend + 'static,
39 ) -> Self
40 where
41 T: MaybeSend + 'static,
42 A: MaybeSend + 'static,
43 {
44 Self::future(future.map(f))
45 }
46
47 pub fn run<A>(
50 stream: impl Stream<Item = A> + MaybeSend + 'static,
51 f: impl Fn(A) -> T + MaybeSend + 'static,
52 ) -> Self
53 where
54 T: 'static,
55 {
56 Self::stream(stream.map(f))
57 }
58
59 pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
62 where
63 T: 'static,
64 {
65 Self(Some(boxed_stream(stream::select_all(
66 tasks.into_iter().filter_map(|task| task.0),
67 ))))
68 }
69
70 pub fn map<O>(
72 self,
73 mut f: impl FnMut(T) -> O + MaybeSend + 'static,
74 ) -> Task<O>
75 where
76 T: MaybeSend + 'static,
77 O: MaybeSend + 'static,
78 {
79 self.then(move |output| Task::done(f(output)))
80 }
81
82 pub fn then<O>(
88 self,
89 mut f: impl FnMut(T) -> Task<O> + MaybeSend + 'static,
90 ) -> Task<O>
91 where
92 T: MaybeSend + 'static,
93 O: MaybeSend + 'static,
94 {
95 Task(match self.0 {
96 None => None,
97 Some(stream) => {
98 Some(boxed_stream(stream.flat_map(move |action| {
99 match action.output() {
100 Ok(output) => f(output)
101 .0
102 .unwrap_or_else(|| boxed_stream(stream::empty())),
103 Err(action) => {
104 boxed_stream(stream::once(async move { action }))
105 }
106 }
107 })))
108 }
109 })
110 }
111
112 pub fn chain(self, task: Self) -> Self
114 where
115 T: 'static,
116 {
117 match self.0 {
118 None => task,
119 Some(first) => match task.0 {
120 None => Task(Some(first)),
121 Some(second) => Task(Some(boxed_stream(first.chain(second)))),
122 },
123 }
124 }
125
126 pub fn collect(self) -> Task<Vec<T>>
128 where
129 T: MaybeSend + 'static,
130 {
131 match self.0 {
132 None => Task::done(Vec::new()),
133 Some(stream) => Task(Some(boxed_stream(
134 stream::unfold(
135 (stream, Some(Vec::new())),
136 move |(mut stream, outputs)| async move {
137 let mut outputs = outputs?;
138
139 let Some(action) = stream.next().await else {
140 return Some((
141 Some(Action::Output(outputs)),
142 (stream, None),
143 ));
144 };
145
146 match action.output() {
147 Ok(output) => {
148 outputs.push(output);
149
150 Some((None, (stream, Some(outputs))))
151 }
152 Err(action) => {
153 Some((Some(action), (stream, Some(outputs))))
154 }
155 }
156 },
157 )
158 .filter_map(future::ready),
159 ))),
160 }
161 }
162
163 pub fn discard<O>(self) -> Task<O>
167 where
168 T: MaybeSend + 'static,
169 O: MaybeSend + 'static,
170 {
171 self.then(|_| Task::none())
172 }
173
174 pub fn abortable(self) -> (Self, Handle)
176 where
177 T: 'static,
178 {
179 match self.0 {
180 Some(stream) => {
181 let (stream, handle) = stream::abortable(stream);
182
183 (
184 Self(Some(boxed_stream(stream))),
185 Handle {
186 raw: Some(handle),
187 abort_on_drop: false,
188 },
189 )
190 }
191 None => (
192 Self(None),
193 Handle {
194 raw: None,
195 abort_on_drop: false,
196 },
197 ),
198 }
199 }
200
201 pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
204 where
205 T: 'static,
206 {
207 Self::stream(stream::once(future))
208 }
209
210 pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
213 where
214 T: 'static,
215 {
216 Self(Some(boxed_stream(stream.map(Action::Output))))
217 }
218}
219
220#[derive(Debug, Clone)]
222pub struct Handle {
223 raw: Option<stream::AbortHandle>,
224 abort_on_drop: bool,
225}
226
227impl Handle {
228 pub fn abort(&self) {
230 if let Some(handle) = &self.raw {
231 handle.abort();
232 }
233 }
234
235 pub fn abort_on_drop(mut self) -> Self {
241 Self {
242 raw: self.raw.take(),
243 abort_on_drop: true,
244 }
245 }
246
247 pub fn is_aborted(&self) -> bool {
249 if let Some(handle) = &self.raw {
250 handle.is_aborted()
251 } else {
252 true
253 }
254 }
255}
256
257impl Drop for Handle {
258 fn drop(&mut self) {
259 if self.abort_on_drop {
260 self.abort();
261 }
262 }
263}
264
265impl<T> Task<Option<T>> {
266 pub fn and_then<A>(
270 self,
271 f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
272 ) -> Task<A>
273 where
274 T: MaybeSend + 'static,
275 A: MaybeSend + 'static,
276 {
277 self.then(move |option| option.map_or_else(Task::none, &f))
278 }
279}
280
281impl<T, E> Task<Result<T, E>> {
282 pub fn and_then<A>(
286 self,
287 f: impl Fn(T) -> Task<A> + MaybeSend + 'static,
288 ) -> Task<A>
289 where
290 T: MaybeSend + 'static,
291 E: MaybeSend + 'static,
292 A: MaybeSend + 'static,
293 {
294 self.then(move |option| option.map_or_else(|_| Task::none(), &f))
295 }
296}
297
298impl<T> From<()> for Task<T> {
299 fn from(_value: ()) -> Self {
300 Self::none()
301 }
302}
303
304pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
307where
308 T: Send + 'static,
309{
310 channel(move |sender| {
311 let operation =
312 widget::operation::map(Box::new(operation), move |value| {
313 let _ = sender.clone().try_send(value);
314 });
315
316 Action::Widget(Box::new(operation))
317 })
318}
319
320pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
323where
324 T: MaybeSend + 'static,
325{
326 let (sender, receiver) = oneshot::channel();
327
328 let action = f(sender);
329
330 Task(Some(boxed_stream(
331 stream::once(async move { action }).chain(
332 receiver.into_stream().filter_map(|result| async move {
333 Some(Action::Output(result.ok()?))
334 }),
335 ),
336 )))
337}
338
339pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
342where
343 T: MaybeSend + 'static,
344{
345 let (sender, receiver) = mpsc::channel(1);
346
347 let action = f(sender);
348
349 Task(Some(boxed_stream(
350 stream::once(async move { action })
351 .chain(receiver.map(|result| Action::Output(result))),
352 )))
353}
354
355pub fn effect<T>(action: impl Into<Action<Never>>) -> Task<T> {
357 let action = action.into();
358
359 Task(Some(boxed_stream(stream::once(async move {
360 action.output().expect_err("no output")
361 }))))
362}
363
364pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
366 task.0
367}