cosmic_config/
subscription.rs
1use iced_futures::futures::{SinkExt, Stream};
2use iced_futures::{futures::channel::mpsc, stream};
3use notify::RecommendedWatcher;
4use std::{borrow::Cow, hash::Hash};
5
6use crate::{Config, CosmicConfigEntry};
7
8pub enum ConfigState<T> {
9 Init(Cow<'static, str>, u64, bool),
10 Waiting(T, RecommendedWatcher, mpsc::Receiver<Vec<String>>, Config),
11 Failed,
12}
13
14pub enum ConfigUpdate<T> {
15 Update(crate::Update<T>),
16 Failed,
17}
18
19#[cold]
20pub fn config_subscription<
21 I: 'static + Copy + Send + Sync + Hash,
22 T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry,
23>(
24 id: I,
25 config_id: Cow<'static, str>,
26 config_version: u64,
27) -> iced_futures::Subscription<crate::Update<T>> {
28 iced_futures::Subscription::run_with_id(id, watcher_stream(config_id, config_version, false))
29}
30
31#[cold]
32pub fn config_state_subscription<
33 I: 'static + Copy + Send + Sync + Hash,
34 T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry,
35>(
36 id: I,
37 config_id: Cow<'static, str>,
38 config_version: u64,
39) -> iced_futures::Subscription<crate::Update<T>> {
40 iced_futures::Subscription::run_with_id(id, watcher_stream(config_id, config_version, true))
41}
42
43fn watcher_stream<T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry>(
44 config_id: Cow<'static, str>,
45 config_version: u64,
46 is_state: bool,
47) -> impl Stream<Item = crate::Update<T>> {
48 stream::channel(100, move |mut output| {
49 let config_id = config_id.clone();
50 async move {
51 let config_id = config_id.clone();
52 let mut state = ConfigState::Init(config_id, config_version, is_state);
53
54 loop {
55 state = start_listening::<T>(state, &mut output).await;
56 }
57 }
58 })
59}
60
61async fn start_listening<T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry>(
62 state: ConfigState<T>,
63 output: &mut mpsc::Sender<crate::Update<T>>,
64) -> ConfigState<T> {
65 use iced_futures::futures::{future::pending, StreamExt};
66
67 match state {
68 ConfigState::Init(config_id, version, is_state) => {
69 let (tx, rx) = mpsc::channel(100);
70 let Ok(config) = (if is_state {
71 Config::new_state(&config_id, version)
72 } else {
73 Config::new(&config_id, version)
74 }) else {
75 return ConfigState::Failed;
76 };
77 let Ok(watcher) = config.watch(move |_helper, keys| {
78 let mut tx = tx.clone();
79 let _ = tx.try_send(keys.to_vec());
80 }) else {
81 return ConfigState::Failed;
82 };
83
84 match T::get_entry(&config) {
85 Ok(t) => {
86 let update = crate::Update {
87 errors: Vec::new(),
88 keys: Vec::new(),
89 config: t.clone(),
90 };
91 _ = output.send(update).await;
92 ConfigState::Waiting(t, watcher, rx, config)
93 }
94 Err((errors, t)) => {
95 let update = crate::Update {
96 errors: errors,
97 keys: Vec::new(),
98 config: t.clone(),
99 };
100 _ = output.send(update).await;
101 ConfigState::Waiting(t, watcher, rx, config)
102 }
103 }
104 }
105 ConfigState::Waiting(mut conf_data, watcher, mut rx, config) => match rx.next().await {
106 Some(keys) => {
107 let (errors, changed) = conf_data.update_keys(&config, &keys);
108
109 if !changed.is_empty() {
110 _ = output
111 .send(crate::Update {
112 errors,
113 keys: changed,
114 config: conf_data.clone(),
115 })
116 .await;
117 }
118 ConfigState::Waiting(conf_data, watcher, rx, config)
119 }
120 None => ConfigState::Failed,
121 },
122 ConfigState::Failed => pending().await,
123 }
124}