cosmic_config/
dbus.rs

1use std::ops::Deref;
2
3use crate::{CosmicConfigEntry, Update};
4use cosmic_settings_daemon::{Changed, ConfigProxy, CosmicSettingsDaemonProxy};
5use futures_util::SinkExt;
6use iced_futures::{
7    Subscription,
8    futures::{self, Stream, StreamExt, future::pending},
9    stream,
10};
11
12pub async fn settings_daemon_proxy() -> zbus::Result<CosmicSettingsDaemonProxy<'static>> {
13    let conn = zbus::Connection::session().await?;
14    CosmicSettingsDaemonProxy::new(&conn).await
15}
16
17#[derive(Debug)]
18pub struct Watcher {
19    proxy: ConfigProxy<'static>,
20}
21
22impl Deref for Watcher {
23    type Target = ConfigProxy<'static>;
24    #[inline]
25    fn deref(&self) -> &Self::Target {
26        &self.proxy
27    }
28}
29
30impl Watcher {
31    pub async fn new_config(
32        settings_daemon_proxy: &CosmicSettingsDaemonProxy<'static>,
33        id: &str,
34        version: u64,
35    ) -> zbus::Result<Self> {
36        let (path, name) = settings_daemon_proxy.watch_config(id, version).await?;
37        ConfigProxy::builder(settings_daemon_proxy.inner().connection())
38            .path(path)?
39            .destination(name)?
40            .build()
41            .await
42            .map(|proxy| Self { proxy })
43    }
44
45    pub async fn new_state(
46        settings_daemon_proxy: &CosmicSettingsDaemonProxy<'static>,
47        id: &str,
48        version: u64,
49    ) -> zbus::Result<Self> {
50        let (path, name) = settings_daemon_proxy.watch_state(id, version).await?;
51        ConfigProxy::builder(settings_daemon_proxy.inner().connection())
52            .path(path)?
53            .destination(name)?
54            .build()
55            .await
56            .map(|proxy| Self { proxy })
57    }
58}
59
60#[allow(clippy::too_many_lines)]
61pub fn watcher_subscription<T: CosmicConfigEntry + Send + Sync + Default + 'static + Clone>(
62    settings_daemon: CosmicSettingsDaemonProxy<'static>,
63    config_id: &'static str,
64    is_state: bool,
65) -> iced_futures::Subscription<Update<T>> {
66    let id = std::any::TypeId::of::<T>();
67    Subscription::run_with_id(
68        (id, config_id),
69        watcher_stream(settings_daemon, config_id, is_state),
70    )
71}
72
73fn watcher_stream<T: CosmicConfigEntry + Send + Sync + Default + 'static + Clone>(
74    settings_daemon: CosmicSettingsDaemonProxy<'static>,
75    config_id: &'static str,
76    is_state: bool,
77) -> impl Stream<Item = Update<T>> {
78    enum Change {
79        Changes(Changed),
80        OwnerChanged(bool),
81    }
82    stream::channel(5, move |mut tx| async move {
83        let version = T::VERSION;
84
85        let Ok(cosmic_config) = (if is_state {
86            crate::Config::new_state(config_id, version)
87        } else {
88            crate::Config::new(config_id, version)
89        }) else {
90            pending::<()>().await;
91            unreachable!();
92        };
93
94        let mut attempts = 0;
95
96        loop {
97            let watcher = if is_state {
98                Watcher::new_state(&settings_daemon, config_id, version).await
99            } else {
100                Watcher::new_config(&settings_daemon, config_id, version).await
101            };
102            let Ok(watcher) = watcher else {
103                tracing::error!("Failed to create watcher for {config_id}");
104
105                #[cfg(feature = "tokio")]
106                ::tokio::time::sleep(::tokio::time::Duration::from_secs(2_u64.pow(attempts))).await;
107                #[cfg(feature = "async-std")]
108                async_std::task::sleep(std::time::Duration::from_secs(2_u64.pow(attempts))).await;
109                #[cfg(not(any(feature = "tokio", feature = "async-std")))]
110                {
111                    pending::<()>().await;
112                    unreachable!();
113                }
114                attempts += 1;
115                // The settings daemon has exited
116                continue;
117            };
118            let Ok(changes) = watcher.receive_changed().await else {
119                tracing::error!("Failed to listen for changes for {config_id}");
120
121                #[cfg(feature = "tokio")]
122                ::tokio::time::sleep(::tokio::time::Duration::from_secs(2_u64.pow(attempts))).await;
123                #[cfg(feature = "async-std")]
124                async_std::task::sleep(std::time::Duration::from_secs(2_u64.pow(attempts))).await;
125                #[cfg(not(any(feature = "tokio", feature = "async-std")))]
126                {
127                    pending::<()>().await;
128                    unreachable!();
129                }
130                attempts += 1;
131                // The settings daemon has exited
132                continue;
133            };
134
135            let mut changes = changes.map(Change::Changes).fuse();
136
137            let Ok(owner_changed) = watcher.inner().receive_owner_changed().await else {
138                tracing::error!("Failed to listen for owner changes for {config_id}");
139                #[cfg(feature = "tokio")]
140                ::tokio::time::sleep(::tokio::time::Duration::from_secs(2_u64.pow(attempts))).await;
141                #[cfg(feature = "async-std")]
142                async_std::task::sleep(std::time::Duration::from_secs(2_u64.pow(attempts))).await;
143                #[cfg(not(any(feature = "tokio", feature = "async-std")))]
144                {
145                    pending::<()>().await;
146                    unreachable!();
147                }
148                attempts += 1;
149                // The settings daemon has exited
150                continue;
151            };
152            let mut owner_changed = owner_changed
153                .map(|c| Change::OwnerChanged(c.is_some()))
154                .fuse();
155
156            // update now, just in case we missed changes while setting up stream
157            let mut config = match T::get_entry(&cosmic_config) {
158                Ok(config) => config,
159                Err((errors, default)) => {
160                    for why in &errors {
161                        if why.is_err() {
162                            if let crate::Error::GetKey(_, err) = &why {
163                                if err.kind() == std::io::ErrorKind::NotFound {
164                                    // No system default config installed; don't error
165                                    continue;
166                                }
167                            }
168                            tracing::error!("error getting config: {config_id} {why}");
169                        }
170                    }
171                    default
172                }
173            };
174
175            if let Err(err) = tx
176                .send(Update {
177                    errors: Vec::new(),
178                    keys: Vec::new(),
179                    config: config.clone(),
180                })
181                .await
182            {
183                tracing::error!("Failed to send config: {err}");
184            }
185
186            loop {
187                let change: Changed = futures::select! {
188                    c = changes.next() => {
189                        let Some(Change::Changes(c)) = c else {
190                            break;
191                        };
192                        c
193                    }
194                    c = owner_changed.next() => {
195                        let Some(Change::OwnerChanged(cont)) = c else {
196                            break;
197                        };
198                        if cont {
199                            continue;
200                        } else {
201                            // The settings daemon has exited
202                            break;
203                        }
204                    },
205                };
206
207                // Reset the attempts counter if we received a change
208                attempts = 0;
209                let Ok(args) = change.args() else {
210                    // The settings daemon has exited
211                    break;
212                };
213                let (errors, keys) = config.update_keys(&cosmic_config, &[args.key]);
214                if !keys.is_empty() {
215                    if let Err(err) = tx
216                        .send(Update {
217                            errors,
218                            keys,
219                            config: config.clone(),
220                        })
221                        .await
222                    {
223                        tracing::error!("Failed to send config update: {err}");
224                    }
225                }
226            }
227        }
228    })
229}