cosmic_config/
subscription.rs

1use iced_futures::futures::{SinkExt, Stream};
2use iced_futures::{futures::channel::mpsc, stream};
3use notify::RecommendedWatcher;
4use std::{borrow::Cow, hash::Hash};
5
6use crate::{Config, CosmicConfigEntry};
7
8pub enum ConfigState<T> {
9    Init(Cow<'static, str>, u64, bool),
10    Waiting(T, RecommendedWatcher, mpsc::Receiver<Vec<String>>, Config),
11    Failed,
12}
13
14pub enum ConfigUpdate<T> {
15    Update(crate::Update<T>),
16    Failed,
17}
18
19#[cold]
20pub fn config_subscription<
21    I: 'static + Copy + Send + Sync + Hash,
22    T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry,
23>(
24    id: I,
25    config_id: Cow<'static, str>,
26    config_version: u64,
27) -> iced_futures::Subscription<crate::Update<T>> {
28    iced_futures::Subscription::run_with_id(id, watcher_stream(config_id, config_version, false))
29}
30
31#[cold]
32pub fn config_state_subscription<
33    I: 'static + Copy + Send + Sync + Hash,
34    T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry,
35>(
36    id: I,
37    config_id: Cow<'static, str>,
38    config_version: u64,
39) -> iced_futures::Subscription<crate::Update<T>> {
40    iced_futures::Subscription::run_with_id(id, watcher_stream(config_id, config_version, true))
41}
42
43fn watcher_stream<T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry>(
44    config_id: Cow<'static, str>,
45    config_version: u64,
46    is_state: bool,
47) -> impl Stream<Item = crate::Update<T>> {
48    stream::channel(100, move |mut output| {
49        let config_id = config_id.clone();
50        async move {
51            let config_id = config_id.clone();
52            let mut state = ConfigState::Init(config_id, config_version, is_state);
53
54            loop {
55                state = start_listening::<T>(state, &mut output).await;
56            }
57        }
58    })
59}
60
61async fn start_listening<T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry>(
62    state: ConfigState<T>,
63    output: &mut mpsc::Sender<crate::Update<T>>,
64) -> ConfigState<T> {
65    use iced_futures::futures::{future::pending, StreamExt};
66
67    match state {
68        ConfigState::Init(config_id, version, is_state) => {
69            let (tx, rx) = mpsc::channel(100);
70            let Ok(config) = (if is_state {
71                Config::new_state(&config_id, version)
72            } else {
73                Config::new(&config_id, version)
74            }) else {
75                return ConfigState::Failed;
76            };
77            let Ok(watcher) = config.watch(move |_helper, keys| {
78                let mut tx = tx.clone();
79                let _ = tx.try_send(keys.to_vec());
80            }) else {
81                return ConfigState::Failed;
82            };
83
84            match T::get_entry(&config) {
85                Ok(t) => {
86                    let update = crate::Update {
87                        errors: Vec::new(),
88                        keys: Vec::new(),
89                        config: t.clone(),
90                    };
91                    _ = output.send(update).await;
92                    ConfigState::Waiting(t, watcher, rx, config)
93                }
94                Err((errors, t)) => {
95                    let update = crate::Update {
96                        errors: errors,
97                        keys: Vec::new(),
98                        config: t.clone(),
99                    };
100                    _ = output.send(update).await;
101                    ConfigState::Waiting(t, watcher, rx, config)
102                }
103            }
104        }
105        ConfigState::Waiting(mut conf_data, watcher, mut rx, config) => match rx.next().await {
106            Some(keys) => {
107                let (errors, changed) = conf_data.update_keys(&config, &keys);
108
109                if !changed.is_empty() {
110                    _ = output
111                        .send(crate::Update {
112                            errors,
113                            keys: changed,
114                            config: conf_data.clone(),
115                        })
116                        .await;
117                }
118                ConfigState::Waiting(conf_data, watcher, rx, config)
119            }
120            None => ConfigState::Failed,
121        },
122        ConfigState::Failed => pending().await,
123    }
124}