iced_futures/subscription.rs
1//! Listen to external events in your application.
2mod tracker;
3
4pub use tracker::Tracker;
5
6use crate::core::event;
7use crate::core::window;
8use crate::futures::Stream;
9use crate::{BoxStream, MaybeSend};
10
11use std::any::TypeId;
12use std::hash::Hash;
13
14/// A subscription event.
15#[derive(Debug, Clone, PartialEq)]
16pub enum Event {
17 /// A user interacted with a user interface in a window.
18 Interaction {
19 /// The window holding the interface of the interaction.
20 window: window::Id,
21 /// The [`Event`] describing the interaction.
22 ///
23 /// [`Event`]: event::Event
24 event: event::Event,
25
26 /// The [`event::Status`] of the interaction.
27 status: event::Status,
28 },
29}
30
31/// A stream of runtime events.
32///
33/// It is the input of a [`Subscription`].
34pub type EventStream = BoxStream<Event>;
35
36/// The hasher used for identifying subscriptions.
37pub type Hasher = rustc_hash::FxHasher;
38
39/// A request to listen to external events.
40///
41/// Besides performing async actions on demand with `Task`, most
42/// applications also need to listen to external events passively.
43///
44/// A [`Subscription`] is normally provided to some runtime, like a `Task`,
45/// and it will generate events as long as the user keeps requesting it.
46///
47/// For instance, you can use a [`Subscription`] to listen to a `WebSocket`
48/// connection, keyboard presses, mouse events, time ticks, etc.
49///
50/// # The Lifetime of a [`Subscription`]
51/// Much like a [`Future`] or a [`Stream`], a [`Subscription`] does not produce any effects
52/// on its own. For a [`Subscription`] to run, it must be returned to the iced runtime—normally
53/// in the `subscription` function of an `application` or a `daemon`.
54///
55/// When a [`Subscription`] is provided to the runtime for the first time, the runtime will
56/// start running it asynchronously. Running a [`Subscription`] consists in building its underlying
57/// [`Stream`] and executing it in an async runtime.
58///
59/// Therefore, you can think of a [`Subscription`] as a "stream builder". It simply represents a way
60/// to build a certain [`Stream`] together with some way to _identify_ it.
61///
62/// Identification is important because when a specific [`Subscription`] stops being returned to the
63/// iced runtime, the runtime will kill its associated [`Stream`]. The runtime uses the identity of a
64/// [`Subscription`] to keep track of it.
65///
66/// This way, iced allows you to declaratively __subscribe__ to particular streams of data temporarily
67/// and whenever necessary.
68///
69/// ```
70/// # mod iced {
71/// # pub mod time {
72/// # pub use iced_futures::backend::default::time::every;
73/// # pub use std::time::{Duration, Instant};
74/// # }
75/// #
76/// # pub use iced_futures::Subscription;
77/// # }
78/// use iced::time::{self, Duration, Instant};
79/// use iced::Subscription;
80///
81/// struct State {
82/// timer_enabled: bool,
83/// }
84///
85/// fn subscription(state: &State) -> Subscription<Instant> {
86/// if state.timer_enabled {
87/// time::every(Duration::from_secs(1))
88/// } else {
89/// Subscription::none()
90/// }
91/// }
92/// ```
93///
94/// [`Future`]: std::future::Future
95#[must_use = "`Subscription` must be returned to the runtime to take effect; normally in your `subscription` function."]
96pub struct Subscription<T> {
97 recipes: Vec<Box<dyn Recipe<Output = T>>>,
98}
99
100impl<T> Subscription<T> {
101 /// Returns an empty [`Subscription`] that will not produce any output.
102 pub fn none() -> Self {
103 Self {
104 recipes: Vec::new(),
105 }
106 }
107
108 /// Returns a [`Subscription`] that will call the given function to create and
109 /// asynchronously run the given [`Stream`].
110 ///
111 /// # Creating an asynchronous worker with bidirectional communication
112 /// You can leverage this helper to create a [`Subscription`] that spawns
113 /// an asynchronous worker in the background and establish a channel of
114 /// communication with an `iced` application.
115 ///
116 /// You can achieve this by creating an `mpsc` channel inside the closure
117 /// and returning the `Sender` as a `Message` for the `Application`:
118 ///
119 /// ```
120 /// # mod iced {
121 /// # pub use iced_futures::Subscription;
122 /// # pub use iced_futures::futures;
123 /// # pub use iced_futures::stream;
124 /// # }
125 /// use iced::futures::channel::mpsc;
126 /// use iced::futures::sink::SinkExt;
127 /// use iced::futures::Stream;
128 /// use iced::stream;
129 /// use iced::Subscription;
130 ///
131 /// pub enum Event {
132 /// Ready(mpsc::Sender<Input>),
133 /// WorkFinished,
134 /// // ...
135 /// }
136 ///
137 /// enum Input {
138 /// DoSomeWork,
139 /// // ...
140 /// }
141 ///
142 /// fn some_worker() -> impl Stream<Item = Event> {
143 /// stream::channel(100, |mut output| async move {
144 /// // Create channel
145 /// let (sender, mut receiver) = mpsc::channel(100);
146 ///
147 /// // Send the sender back to the application
148 /// output.send(Event::Ready(sender)).await;
149 ///
150 /// loop {
151 /// use iced_futures::futures::StreamExt;
152 ///
153 /// // Read next input sent from `Application`
154 /// let input = receiver.select_next_some().await;
155 ///
156 /// match input {
157 /// Input::DoSomeWork => {
158 /// // Do some async work...
159 ///
160 /// // Finally, we can optionally produce a message to tell the
161 /// // `Application` the work is done
162 /// output.send(Event::WorkFinished).await;
163 /// }
164 /// }
165 /// }
166 /// })
167 /// }
168 ///
169 /// fn subscription() -> Subscription<Event> {
170 /// Subscription::run(some_worker)
171 /// }
172 /// ```
173 ///
174 /// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket`
175 /// connection open.
176 ///
177 /// [`websocket`]: https://github.com/iced-rs/iced/tree/0.13/examples/websocket
178 pub fn run<S>(builder: fn() -> S) -> Self
179 where
180 S: Stream<Item = T> + MaybeSend + 'static,
181 T: 'static,
182 {
183 from_recipe(Runner {
184 id: builder,
185 spawn: move |_| builder(),
186 })
187 }
188
189 /// Returns a [`Subscription`] that will create and asynchronously run the
190 /// given [`Stream`].
191 ///
192 /// The `id` will be used to uniquely identify the [`Subscription`].
193 pub fn run_with_id<I, S>(id: I, stream: S) -> Subscription<T>
194 where
195 I: Hash + 'static,
196 S: Stream<Item = T> + MaybeSend + 'static,
197 T: 'static,
198 {
199 from_recipe(Runner {
200 id,
201 spawn: move |_| stream,
202 })
203 }
204
205 /// Batches all the provided subscriptions and returns the resulting
206 /// [`Subscription`].
207 pub fn batch(
208 subscriptions: impl IntoIterator<Item = Subscription<T>>,
209 ) -> Self {
210 Self {
211 recipes: subscriptions
212 .into_iter()
213 .flat_map(|subscription| subscription.recipes)
214 .collect(),
215 }
216 }
217
218 /// Adds a value to the [`Subscription`] context.
219 ///
220 /// The value will be part of the identity of a [`Subscription`].
221 pub fn with<A>(mut self, value: A) -> Subscription<(A, T)>
222 where
223 T: 'static,
224 A: std::hash::Hash + Clone + Send + Sync + 'static,
225 {
226 Subscription {
227 recipes: self
228 .recipes
229 .drain(..)
230 .map(|recipe| {
231 Box::new(With::new(recipe, value.clone()))
232 as Box<dyn Recipe<Output = (A, T)>>
233 })
234 .collect(),
235 }
236 }
237
238 /// Transforms the [`Subscription`] output with the given function.
239 ///
240 /// # Panics
241 /// The closure provided must be a non-capturing closure. The method
242 /// will panic in debug mode otherwise.
243 pub fn map<F, A>(mut self, f: F) -> Subscription<A>
244 where
245 T: 'static,
246 F: Fn(T) -> A + MaybeSend + Clone + 'static,
247 A: 'static,
248 {
249 debug_assert!(
250 std::mem::size_of::<F>() == 0,
251 "the closure {} provided in `Subscription::map` is capturing",
252 std::any::type_name::<F>(),
253 );
254
255 Subscription {
256 recipes: self
257 .recipes
258 .drain(..)
259 .map(move |recipe| {
260 Box::new(Map::new(recipe, f.clone()))
261 as Box<dyn Recipe<Output = A>>
262 })
263 .collect(),
264 }
265 }
266}
267
268/// Creates a [`Subscription`] from a [`Recipe`] describing it.
269pub fn from_recipe<T>(
270 recipe: impl Recipe<Output = T> + 'static,
271) -> Subscription<T> {
272 Subscription {
273 recipes: vec![Box::new(recipe)],
274 }
275}
276
277/// Returns the different recipes of the [`Subscription`].
278pub fn into_recipes<T>(
279 subscription: Subscription<T>,
280) -> Vec<Box<dyn Recipe<Output = T>>> {
281 subscription.recipes
282}
283
284impl<T> std::fmt::Debug for Subscription<T> {
285 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286 f.debug_struct("Subscription").finish()
287 }
288}
289
290/// The description of a [`Subscription`].
291///
292/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used
293/// by runtimes to run and identify subscriptions. You can use it to create your
294/// own!
295///
296/// # Examples
297/// The repository has a couple of [examples] that use a custom [`Recipe`]:
298///
299/// - [`download_progress`], a basic application that asynchronously downloads
300/// a dummy file of 100 MB and tracks the download progress.
301/// - [`stopwatch`], a watch with start/stop and reset buttons showcasing how
302/// to listen to time.
303///
304/// [examples]: https://github.com/iced-rs/iced/tree/0.13/examples
305/// [`download_progress`]: https://github.com/iced-rs/iced/tree/0.13/examples/download_progress
306/// [`stopwatch`]: https://github.com/iced-rs/iced/tree/0.13/examples/stopwatch
307pub trait Recipe {
308 /// The events that will be produced by a [`Subscription`] with this
309 /// [`Recipe`].
310 type Output;
311
312 /// Hashes the [`Recipe`].
313 ///
314 /// This is used by runtimes to uniquely identify a [`Subscription`].
315 fn hash(&self, state: &mut Hasher);
316
317 /// Executes the [`Recipe`] and produces the stream of events of its
318 /// [`Subscription`].
319 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output>;
320}
321
322struct Map<A, B, F>
323where
324 F: Fn(A) -> B + 'static,
325{
326 id: TypeId,
327 recipe: Box<dyn Recipe<Output = A>>,
328 mapper: F,
329}
330
331impl<A, B, F> Map<A, B, F>
332where
333 F: Fn(A) -> B + 'static,
334{
335 fn new(recipe: Box<dyn Recipe<Output = A>>, mapper: F) -> Self {
336 Map {
337 id: TypeId::of::<F>(),
338 recipe,
339 mapper,
340 }
341 }
342}
343
344impl<A, B, F> Recipe for Map<A, B, F>
345where
346 A: 'static,
347 B: 'static,
348 F: Fn(A) -> B + 'static + MaybeSend,
349{
350 type Output = B;
351
352 fn hash(&self, state: &mut Hasher) {
353 self.id.hash(state);
354 self.recipe.hash(state);
355 }
356
357 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
358 use futures::StreamExt;
359
360 let mapper = self.mapper;
361
362 Box::pin(self.recipe.stream(input).map(mapper))
363 }
364}
365
366struct With<A, B> {
367 recipe: Box<dyn Recipe<Output = A>>,
368 value: B,
369}
370
371impl<A, B> With<A, B> {
372 fn new(recipe: Box<dyn Recipe<Output = A>>, value: B) -> Self {
373 With { recipe, value }
374 }
375}
376
377impl<A, B> Recipe for With<A, B>
378where
379 A: 'static,
380 B: 'static + std::hash::Hash + Clone + Send + Sync,
381{
382 type Output = (B, A);
383
384 fn hash(&self, state: &mut Hasher) {
385 std::any::TypeId::of::<B>().hash(state);
386 self.value.hash(state);
387 self.recipe.hash(state);
388 }
389
390 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
391 use futures::StreamExt;
392
393 let value = self.value;
394
395 Box::pin(
396 self.recipe
397 .stream(input)
398 .map(move |element| (value.clone(), element)),
399 )
400 }
401}
402
403pub(crate) fn filter_map<I, F, T>(id: I, f: F) -> Subscription<T>
404where
405 I: Hash + 'static,
406 F: Fn(Event) -> Option<T> + MaybeSend + 'static,
407 T: 'static + MaybeSend,
408{
409 from_recipe(Runner {
410 id,
411 spawn: |events| {
412 use futures::future;
413 use futures::stream::StreamExt;
414
415 events.filter_map(move |event| future::ready(f(event)))
416 },
417 })
418}
419
420struct Runner<I, F, S, T>
421where
422 F: FnOnce(EventStream) -> S,
423 S: Stream<Item = T>,
424{
425 id: I,
426 spawn: F,
427}
428
429impl<I, F, S, T> Recipe for Runner<I, F, S, T>
430where
431 I: Hash + 'static,
432 F: FnOnce(EventStream) -> S,
433 S: Stream<Item = T> + MaybeSend + 'static,
434{
435 type Output = T;
436
437 fn hash(&self, state: &mut Hasher) {
438 std::any::TypeId::of::<I>().hash(state);
439 self.id.hash(state);
440 }
441
442 fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
443 crate::boxed_stream((self.spawn)(input))
444 }
445}