1use std::ops::Deref;
2
3use crate::{CosmicConfigEntry, Update};
4use cosmic_settings_daemon::{Changed, ConfigProxy, CosmicSettingsDaemonProxy};
5use futures_util::SinkExt;
6use iced_futures::{
7 Subscription,
8 futures::{self, Stream, StreamExt, future::pending},
9 stream,
10};
11
12pub async fn settings_daemon_proxy() -> zbus::Result<CosmicSettingsDaemonProxy<'static>> {
13 let conn = zbus::Connection::session().await?;
14 CosmicSettingsDaemonProxy::new(&conn).await
15}
16
17#[derive(Debug)]
18pub struct Watcher {
19 proxy: ConfigProxy<'static>,
20}
21
22impl Deref for Watcher {
23 type Target = ConfigProxy<'static>;
24 #[inline]
25 fn deref(&self) -> &Self::Target {
26 &self.proxy
27 }
28}
29
30impl Watcher {
31 pub async fn new_config(
32 settings_daemon_proxy: &CosmicSettingsDaemonProxy<'static>,
33 id: &str,
34 version: u64,
35 ) -> zbus::Result<Self> {
36 let (path, name) = settings_daemon_proxy.watch_config(id, version).await?;
37 ConfigProxy::builder(settings_daemon_proxy.inner().connection())
38 .path(path)?
39 .destination(name)?
40 .build()
41 .await
42 .map(|proxy| Self { proxy })
43 }
44
45 pub async fn new_state(
46 settings_daemon_proxy: &CosmicSettingsDaemonProxy<'static>,
47 id: &str,
48 version: u64,
49 ) -> zbus::Result<Self> {
50 let (path, name) = settings_daemon_proxy.watch_state(id, version).await?;
51 ConfigProxy::builder(settings_daemon_proxy.inner().connection())
52 .path(path)?
53 .destination(name)?
54 .build()
55 .await
56 .map(|proxy| Self { proxy })
57 }
58}
59
60#[allow(clippy::too_many_lines)]
61pub fn watcher_subscription<T: CosmicConfigEntry + Send + Sync + Default + 'static + Clone>(
62 settings_daemon: CosmicSettingsDaemonProxy<'static>,
63 config_id: &'static str,
64 is_state: bool,
65) -> iced_futures::Subscription<Update<T>> {
66 let id = std::any::TypeId::of::<T>();
67 Subscription::run_with_id(
68 (id, config_id),
69 watcher_stream(settings_daemon, config_id, is_state),
70 )
71}
72
73fn watcher_stream<T: CosmicConfigEntry + Send + Sync + Default + 'static + Clone>(
74 settings_daemon: CosmicSettingsDaemonProxy<'static>,
75 config_id: &'static str,
76 is_state: bool,
77) -> impl Stream<Item = Update<T>> {
78 enum Change {
79 Changes(Changed),
80 OwnerChanged(bool),
81 }
82 stream::channel(5, move |mut tx| async move {
83 let version = T::VERSION;
84
85 let Ok(cosmic_config) = (if is_state {
86 crate::Config::new_state(config_id, version)
87 } else {
88 crate::Config::new(config_id, version)
89 }) else {
90 pending::<()>().await;
91 unreachable!();
92 };
93
94 let mut attempts = 0;
95
96 loop {
97 let watcher = if is_state {
98 Watcher::new_state(&settings_daemon, config_id, version).await
99 } else {
100 Watcher::new_config(&settings_daemon, config_id, version).await
101 };
102 let Ok(watcher) = watcher else {
103 tracing::error!("Failed to create watcher for {config_id}");
104
105 #[cfg(feature = "tokio")]
106 ::tokio::time::sleep(::tokio::time::Duration::from_secs(2_u64.pow(attempts))).await;
107 #[cfg(feature = "async-std")]
108 async_std::task::sleep(std::time::Duration::from_secs(2_u64.pow(attempts))).await;
109 #[cfg(not(any(feature = "tokio", feature = "async-std")))]
110 {
111 pending::<()>().await;
112 unreachable!();
113 }
114 attempts += 1;
115 continue;
117 };
118 let Ok(changes) = watcher.receive_changed().await else {
119 tracing::error!("Failed to listen for changes for {config_id}");
120
121 #[cfg(feature = "tokio")]
122 ::tokio::time::sleep(::tokio::time::Duration::from_secs(2_u64.pow(attempts))).await;
123 #[cfg(feature = "async-std")]
124 async_std::task::sleep(std::time::Duration::from_secs(2_u64.pow(attempts))).await;
125 #[cfg(not(any(feature = "tokio", feature = "async-std")))]
126 {
127 pending::<()>().await;
128 unreachable!();
129 }
130 attempts += 1;
131 continue;
133 };
134
135 let mut changes = changes.map(Change::Changes).fuse();
136
137 let Ok(owner_changed) = watcher.inner().receive_owner_changed().await else {
138 tracing::error!("Failed to listen for owner changes for {config_id}");
139 #[cfg(feature = "tokio")]
140 ::tokio::time::sleep(::tokio::time::Duration::from_secs(2_u64.pow(attempts))).await;
141 #[cfg(feature = "async-std")]
142 async_std::task::sleep(std::time::Duration::from_secs(2_u64.pow(attempts))).await;
143 #[cfg(not(any(feature = "tokio", feature = "async-std")))]
144 {
145 pending::<()>().await;
146 unreachable!();
147 }
148 attempts += 1;
149 continue;
151 };
152 let mut owner_changed = owner_changed
153 .map(|c| Change::OwnerChanged(c.is_some()))
154 .fuse();
155
156 let mut config = match T::get_entry(&cosmic_config) {
158 Ok(config) => config,
159 Err((errors, default)) => {
160 for why in &errors {
161 if why.is_err() {
162 if let crate::Error::GetKey(_, err) = &why {
163 if err.kind() == std::io::ErrorKind::NotFound {
164 continue;
166 }
167 }
168 tracing::error!("error getting config: {config_id} {why}");
169 }
170 }
171 default
172 }
173 };
174
175 if let Err(err) = tx
176 .send(Update {
177 errors: Vec::new(),
178 keys: Vec::new(),
179 config: config.clone(),
180 })
181 .await
182 {
183 tracing::error!("Failed to send config: {err}");
184 }
185
186 loop {
187 let change: Changed = futures::select! {
188 c = changes.next() => {
189 let Some(Change::Changes(c)) = c else {
190 break;
191 };
192 c
193 }
194 c = owner_changed.next() => {
195 let Some(Change::OwnerChanged(cont)) = c else {
196 break;
197 };
198 if cont {
199 continue;
200 } else {
201 break;
203 }
204 },
205 };
206
207 attempts = 0;
209 let Ok(args) = change.args() else {
210 break;
212 };
213 let (errors, keys) = config.update_keys(&cosmic_config, &[args.key]);
214 if !keys.is_empty() {
215 if let Err(err) = tx
216 .send(Update {
217 errors,
218 keys,
219 config: config.clone(),
220 })
221 .await
222 {
223 tracing::error!("Failed to send config update: {err}");
224 }
225 }
226 }
227 }
228 })
229}