iced_futures/
subscription.rs

1//! Listen to external events in your application.
2mod tracker;
3
4pub use tracker::Tracker;
5
6use crate::core::event;
7use crate::core::window;
8use crate::futures::Stream;
9use crate::{BoxStream, MaybeSend};
10
11use std::any::TypeId;
12use std::hash::Hash;
13
14/// A subscription event.
15#[derive(Debug, Clone, PartialEq)]
16pub enum Event {
17    /// A user interacted with a user interface in a window.
18    Interaction {
19        /// The window holding the interface of the interaction.
20        window: window::Id,
21        /// The [`Event`] describing the interaction.
22        ///
23        /// [`Event`]: event::Event
24        event: event::Event,
25
26        /// The [`event::Status`] of the interaction.
27        status: event::Status,
28    },
29}
30
31/// A stream of runtime events.
32///
33/// It is the input of a [`Subscription`].
34pub type EventStream = BoxStream<Event>;
35
36/// The hasher used for identifying subscriptions.
37pub type Hasher = rustc_hash::FxHasher;
38
39/// A request to listen to external events.
40///
41/// Besides performing async actions on demand with `Task`, most
42/// applications also need to listen to external events passively.
43///
44/// A [`Subscription`] is normally provided to some runtime, like a `Task`,
45/// and it will generate events as long as the user keeps requesting it.
46///
47/// For instance, you can use a [`Subscription`] to listen to a `WebSocket`
48/// connection, keyboard presses, mouse events, time ticks, etc.
49///
50/// # The Lifetime of a [`Subscription`]
51/// Much like a [`Future`] or a [`Stream`], a [`Subscription`] does not produce any effects
52/// on its own. For a [`Subscription`] to run, it must be returned to the iced runtime—normally
53/// in the `subscription` function of an `application` or a `daemon`.
54///
55/// When a [`Subscription`] is provided to the runtime for the first time, the runtime will
56/// start running it asynchronously. Running a [`Subscription`] consists in building its underlying
57/// [`Stream`] and executing it in an async runtime.
58///
59/// Therefore, you can think of a [`Subscription`] as a "stream builder". It simply represents a way
60/// to build a certain [`Stream`] together with some way to _identify_ it.
61///
62/// Identification is important because when a specific [`Subscription`] stops being returned to the
63/// iced runtime, the runtime will kill its associated [`Stream`]. The runtime uses the identity of a
64/// [`Subscription`] to keep track of it.
65///
66/// This way, iced allows you to declaratively __subscribe__ to particular streams of data temporarily
67/// and whenever necessary.
68///
69/// ```
70/// # mod iced {
71/// #     pub mod time {
72/// #         pub use iced_futures::backend::default::time::every;
73/// #         pub use std::time::{Duration, Instant};
74/// #     }
75/// #
76/// #     pub use iced_futures::Subscription;
77/// # }
78/// use iced::time::{self, Duration, Instant};
79/// use iced::Subscription;
80///
81/// struct State {
82///     timer_enabled: bool,
83/// }
84///
85/// fn subscription(state: &State) -> Subscription<Instant> {
86///     if state.timer_enabled {
87///         time::every(Duration::from_secs(1))
88///     } else {
89///         Subscription::none()
90///     }
91/// }
92/// ```
93///
94/// [`Future`]: std::future::Future
95#[must_use = "`Subscription` must be returned to the runtime to take effect; normally in your `subscription` function."]
96pub struct Subscription<T> {
97    recipes: Vec<Box<dyn Recipe<Output = T>>>,
98}
99
100impl<T> Subscription<T> {
101    /// Returns an empty [`Subscription`] that will not produce any output.
102    pub fn none() -> Self {
103        Self {
104            recipes: Vec::new(),
105        }
106    }
107
108    /// Returns a [`Subscription`] that will call the given function to create and
109    /// asynchronously run the given [`Stream`].
110    ///
111    /// # Creating an asynchronous worker with bidirectional communication
112    /// You can leverage this helper to create a [`Subscription`] that spawns
113    /// an asynchronous worker in the background and establish a channel of
114    /// communication with an `iced` application.
115    ///
116    /// You can achieve this by creating an `mpsc` channel inside the closure
117    /// and returning the `Sender` as a `Message` for the `Application`:
118    ///
119    /// ```
120    /// # mod iced {
121    /// #     pub use iced_futures::Subscription;   
122    /// #     pub use iced_futures::futures;
123    /// #     pub use iced_futures::stream;
124    /// # }
125    /// use iced::futures::channel::mpsc;
126    /// use iced::futures::sink::SinkExt;
127    /// use iced::futures::Stream;
128    /// use iced::stream;
129    /// use iced::Subscription;
130    ///
131    /// pub enum Event {
132    ///     Ready(mpsc::Sender<Input>),
133    ///     WorkFinished,
134    ///     // ...
135    /// }
136    ///
137    /// enum Input {
138    ///     DoSomeWork,
139    ///     // ...
140    /// }
141    ///
142    /// fn some_worker() -> impl Stream<Item = Event> {
143    ///     stream::channel(100, |mut output| async move {
144    ///         // Create channel
145    ///         let (sender, mut receiver) = mpsc::channel(100);
146    ///
147    ///         // Send the sender back to the application
148    ///         output.send(Event::Ready(sender)).await;
149    ///
150    ///         loop {
151    ///             use iced_futures::futures::StreamExt;
152    ///
153    ///             // Read next input sent from `Application`
154    ///             let input = receiver.select_next_some().await;
155    ///
156    ///             match input {
157    ///                 Input::DoSomeWork => {
158    ///                     // Do some async work...
159    ///
160    ///                     // Finally, we can optionally produce a message to tell the
161    ///                     // `Application` the work is done
162    ///                     output.send(Event::WorkFinished).await;
163    ///                 }
164    ///             }
165    ///         }
166    ///     })
167    /// }
168    ///
169    /// fn subscription() -> Subscription<Event> {
170    ///     Subscription::run(some_worker)
171    /// }
172    /// ```
173    ///
174    /// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket`
175    /// connection open.
176    ///
177    /// [`websocket`]: https://github.com/iced-rs/iced/tree/0.13/examples/websocket
178    pub fn run<S>(builder: fn() -> S) -> Self
179    where
180        S: Stream<Item = T> + MaybeSend + 'static,
181        T: 'static,
182    {
183        from_recipe(Runner {
184            id: builder,
185            spawn: move |_| builder(),
186        })
187    }
188
189    /// Returns a [`Subscription`] that will create and asynchronously run the
190    /// given [`Stream`].
191    ///
192    /// The `id` will be used to uniquely identify the [`Subscription`].
193    pub fn run_with_id<I, S>(id: I, stream: S) -> Subscription<T>
194    where
195        I: Hash + 'static,
196        S: Stream<Item = T> + MaybeSend + 'static,
197        T: 'static,
198    {
199        from_recipe(Runner {
200            id,
201            spawn: move |_| stream,
202        })
203    }
204
205    /// Batches all the provided subscriptions and returns the resulting
206    /// [`Subscription`].
207    pub fn batch(
208        subscriptions: impl IntoIterator<Item = Subscription<T>>,
209    ) -> Self {
210        Self {
211            recipes: subscriptions
212                .into_iter()
213                .flat_map(|subscription| subscription.recipes)
214                .collect(),
215        }
216    }
217
218    /// Adds a value to the [`Subscription`] context.
219    ///
220    /// The value will be part of the identity of a [`Subscription`].
221    pub fn with<A>(mut self, value: A) -> Subscription<(A, T)>
222    where
223        T: 'static,
224        A: std::hash::Hash + Clone + Send + Sync + 'static,
225    {
226        Subscription {
227            recipes: self
228                .recipes
229                .drain(..)
230                .map(|recipe| {
231                    Box::new(With::new(recipe, value.clone()))
232                        as Box<dyn Recipe<Output = (A, T)>>
233                })
234                .collect(),
235        }
236    }
237
238    /// Transforms the [`Subscription`] output with the given function.
239    ///
240    /// # Panics
241    /// The closure provided must be a non-capturing closure. The method
242    /// will panic in debug mode otherwise.
243    pub fn map<F, A>(mut self, f: F) -> Subscription<A>
244    where
245        T: 'static,
246        F: Fn(T) -> A + MaybeSend + Clone + 'static,
247        A: 'static,
248    {
249        debug_assert!(
250            std::mem::size_of::<F>() == 0,
251            "the closure {} provided in `Subscription::map` is capturing",
252            std::any::type_name::<F>(),
253        );
254
255        Subscription {
256            recipes: self
257                .recipes
258                .drain(..)
259                .map(move |recipe| {
260                    Box::new(Map::new(recipe, f.clone()))
261                        as Box<dyn Recipe<Output = A>>
262                })
263                .collect(),
264        }
265    }
266}
267
268/// Creates a [`Subscription`] from a [`Recipe`] describing it.
269pub fn from_recipe<T>(
270    recipe: impl Recipe<Output = T> + 'static,
271) -> Subscription<T> {
272    Subscription {
273        recipes: vec![Box::new(recipe)],
274    }
275}
276
277/// Returns the different recipes of the [`Subscription`].
278pub fn into_recipes<T>(
279    subscription: Subscription<T>,
280) -> Vec<Box<dyn Recipe<Output = T>>> {
281    subscription.recipes
282}
283
284impl<T> std::fmt::Debug for Subscription<T> {
285    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286        f.debug_struct("Subscription").finish()
287    }
288}
289
290/// The description of a [`Subscription`].
291///
292/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used
293/// by runtimes to run and identify subscriptions. You can use it to create your
294/// own!
295///
296/// # Examples
297/// The repository has a couple of [examples] that use a custom [`Recipe`]:
298///
299/// - [`download_progress`], a basic application that asynchronously downloads
300///   a dummy file of 100 MB and tracks the download progress.
301/// - [`stopwatch`], a watch with start/stop and reset buttons showcasing how
302///   to listen to time.
303///
304/// [examples]: https://github.com/iced-rs/iced/tree/0.13/examples
305/// [`download_progress`]: https://github.com/iced-rs/iced/tree/0.13/examples/download_progress
306/// [`stopwatch`]: https://github.com/iced-rs/iced/tree/0.13/examples/stopwatch
307pub trait Recipe {
308    /// The events that will be produced by a [`Subscription`] with this
309    /// [`Recipe`].
310    type Output;
311
312    /// Hashes the [`Recipe`].
313    ///
314    /// This is used by runtimes to uniquely identify a [`Subscription`].
315    fn hash(&self, state: &mut Hasher);
316
317    /// Executes the [`Recipe`] and produces the stream of events of its
318    /// [`Subscription`].
319    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>;
320}
321
322struct Map<A, B, F>
323where
324    F: Fn(A) -> B + 'static,
325{
326    id: TypeId,
327    recipe: Box<dyn Recipe<Output = A>>,
328    mapper: F,
329}
330
331impl<A, B, F> Map<A, B, F>
332where
333    F: Fn(A) -> B + 'static,
334{
335    fn new(recipe: Box<dyn Recipe<Output = A>>, mapper: F) -> Self {
336        Map {
337            id: TypeId::of::<F>(),
338            recipe,
339            mapper,
340        }
341    }
342}
343
344impl<A, B, F> Recipe for Map<A, B, F>
345where
346    A: 'static,
347    B: 'static,
348    F: Fn(A) -> B + 'static + MaybeSend,
349{
350    type Output = B;
351
352    fn hash(&self, state: &mut Hasher) {
353        self.id.hash(state);
354        self.recipe.hash(state);
355    }
356
357    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
358        use futures::StreamExt;
359
360        let mapper = self.mapper;
361
362        Box::pin(self.recipe.stream(input).map(mapper))
363    }
364}
365
366struct With<A, B> {
367    recipe: Box<dyn Recipe<Output = A>>,
368    value: B,
369}
370
371impl<A, B> With<A, B> {
372    fn new(recipe: Box<dyn Recipe<Output = A>>, value: B) -> Self {
373        With { recipe, value }
374    }
375}
376
377impl<A, B> Recipe for With<A, B>
378where
379    A: 'static,
380    B: 'static + std::hash::Hash + Clone + Send + Sync,
381{
382    type Output = (B, A);
383
384    fn hash(&self, state: &mut Hasher) {
385        std::any::TypeId::of::<B>().hash(state);
386        self.value.hash(state);
387        self.recipe.hash(state);
388    }
389
390    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
391        use futures::StreamExt;
392
393        let value = self.value;
394
395        Box::pin(
396            self.recipe
397                .stream(input)
398                .map(move |element| (value.clone(), element)),
399        )
400    }
401}
402
403pub(crate) fn filter_map<I, F, T>(id: I, f: F) -> Subscription<T>
404where
405    I: Hash + 'static,
406    F: Fn(Event) -> Option<T> + MaybeSend + 'static,
407    T: 'static + MaybeSend,
408{
409    from_recipe(Runner {
410        id,
411        spawn: |events| {
412            use futures::future;
413            use futures::stream::StreamExt;
414
415            events.filter_map(move |event| future::ready(f(event)))
416        },
417    })
418}
419
420struct Runner<I, F, S, T>
421where
422    F: FnOnce(EventStream) -> S,
423    S: Stream<Item = T>,
424{
425    id: I,
426    spawn: F,
427}
428
429impl<I, F, S, T> Recipe for Runner<I, F, S, T>
430where
431    I: Hash + 'static,
432    F: FnOnce(EventStream) -> S,
433    S: Stream<Item = T> + MaybeSend + 'static,
434{
435    type Output = T;
436
437    fn hash(&self, state: &mut Hasher) {
438        std::any::TypeId::of::<I>().hash(state);
439        self.id.hash(state);
440    }
441
442    fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
443        crate::boxed_stream((self.spawn)(input))
444    }
445}