1use enumflags2::{bitflags, BitFlags};
4use event_listener::{Event, EventListener};
5use futures_core::{ready, stream};
6use ordered_stream::{join as join_streams, FromFuture, Join, Map, OrderedStream, PollResult};
7use std::{
8 collections::{HashMap, HashSet},
9 fmt,
10 future::Future,
11 ops::Deref,
12 pin::Pin,
13 sync::{Arc, OnceLock, RwLock, RwLockReadGuard},
14 task::{Context, Poll},
15};
16use tracing::{debug, info_span, instrument, trace, Instrument};
17
18use zbus_names::{BusName, InterfaceName, MemberName, UniqueName};
19use zvariant::{ObjectPath, OwnedValue, Str, Value};
20
21use crate::{
22 fdo::{self, IntrospectableProxy, NameOwnerChanged, PropertiesChangedStream, PropertiesProxy},
23 message::{Flags, Message, Sequence, Type},
24 AsyncDrop, Connection, Error, Executor, MatchRule, MessageStream, OwnedMatchRule, Result, Task,
25};
26
27mod builder;
28pub use builder::{Builder, CacheProperties};
29
30mod defaults;
31pub use defaults::Defaults;
32
33#[derive(Clone, Debug)]
71pub struct Proxy<'a> {
72 pub(crate) inner: Arc<ProxyInner<'a>>,
73}
74
75pub(crate) struct ProxyInnerStatic {
78 pub(crate) conn: Connection,
79 dest_owner_change_match_rule: OnceLock<OwnedMatchRule>,
80}
81
82impl fmt::Debug for ProxyInnerStatic {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 f.debug_struct("ProxyInnerStatic")
85 .field(
86 "dest_owner_change_match_rule",
87 &self.dest_owner_change_match_rule,
88 )
89 .finish_non_exhaustive()
90 }
91}
92
93#[derive(Debug)]
94pub(crate) struct ProxyInner<'a> {
95 inner_without_borrows: ProxyInnerStatic,
96 pub(crate) destination: BusName<'a>,
97 pub(crate) path: ObjectPath<'a>,
98 pub(crate) interface: InterfaceName<'a>,
99
100 property_cache: Option<OnceLock<(Arc<PropertiesCache>, Task<()>)>>,
102 uncached_properties: HashSet<Str<'a>>,
105}
106
107impl Drop for ProxyInnerStatic {
108 fn drop(&mut self) {
109 if let Some(rule) = self.dest_owner_change_match_rule.take() {
110 self.conn.queue_remove_match(rule);
111 }
112 }
113}
114
115pub struct PropertyChanged<'a, T> {
119 name: &'a str,
120 properties: Arc<PropertiesCache>,
121 proxy: Proxy<'a>,
122 phantom: std::marker::PhantomData<T>,
123}
124
125impl<T> PropertyChanged<'_, T> {
126 pub fn name(&self) -> &str {
128 self.name
129 }
130
131 pub async fn get_raw(&self) -> Result<impl Deref<Target = Value<'static>> + '_> {
137 struct Wrapper<'w> {
138 name: &'w str,
139 values: RwLockReadGuard<'w, HashMap<String, PropertyValue>>,
140 }
141
142 impl Deref for Wrapper<'_> {
143 type Target = Value<'static>;
144
145 fn deref(&self) -> &Self::Target {
146 self.values
147 .get(self.name)
148 .expect("PropertyStream with no corresponding property")
149 .value
150 .as_ref()
151 .expect("PropertyStream with no corresponding property")
152 }
153 }
154
155 {
156 let values = self.properties.values.read().expect("lock poisoned");
157 if values
158 .get(self.name)
159 .expect("PropertyStream with no corresponding property")
160 .value
161 .is_some()
162 {
163 return Ok(Wrapper {
164 name: self.name,
165 values,
166 });
167 }
168 }
169
170 let properties_proxy = self.proxy.properties_proxy();
172 let value = properties_proxy
173 .get(self.proxy.inner.interface.clone(), self.name)
174 .await
175 .map_err(crate::Error::from)?;
176
177 {
179 let mut values = self.properties.values.write().expect("lock poisoned");
180
181 values
182 .get_mut(self.name)
183 .expect("PropertyStream with no corresponding property")
184 .value = Some(value);
185 }
186
187 Ok(Wrapper {
188 name: self.name,
189 values: self.properties.values.read().expect("lock poisoned"),
190 })
191 }
192}
193
194impl<T> PropertyChanged<'_, T>
195where
196 T: TryFrom<zvariant::OwnedValue>,
197 T::Error: Into<crate::Error>,
198{
199 pub async fn get(&self) -> Result<T> {
205 self.get_raw()
206 .await
207 .and_then(|v| T::try_from(OwnedValue::try_from(&*v)?).map_err(Into::into))
208 }
209}
210
211#[derive(Debug)]
215pub struct PropertyStream<'a, T> {
216 name: &'a str,
217 proxy: Proxy<'a>,
218 changed_listener: EventListener,
219 phantom: std::marker::PhantomData<T>,
220}
221
222impl<'a, T> stream::Stream for PropertyStream<'a, T>
223where
224 T: Unpin,
225{
226 type Item = PropertyChanged<'a, T>;
227
228 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
229 let m = self.get_mut();
230 let properties = match m.proxy.get_property_cache() {
231 Some(properties) => properties.clone(),
232 None => return Poll::Ready(None),
234 };
235 ready!(Pin::new(&mut m.changed_listener).poll(cx));
236
237 m.changed_listener = properties
238 .values
239 .read()
240 .expect("lock poisoned")
241 .get(m.name)
242 .expect("PropertyStream with no corresponding property")
243 .event
244 .listen();
245
246 Poll::Ready(Some(PropertyChanged {
247 name: m.name,
248 properties,
249 proxy: m.proxy.clone(),
250 phantom: std::marker::PhantomData,
251 }))
252 }
253}
254
255#[derive(Debug)]
256pub(crate) struct PropertiesCache {
257 values: RwLock<HashMap<String, PropertyValue>>,
258 caching_result: RwLock<CachingResult>,
259}
260
261#[derive(Debug)]
262enum CachingResult {
263 Caching { ready: Event },
264 Cached { result: Result<()> },
265}
266
267impl PropertiesCache {
268 #[instrument(skip_all)]
269 fn new(
270 proxy: PropertiesProxy<'static>,
271 interface: InterfaceName<'static>,
272 executor: &Executor<'_>,
273 uncached_properties: HashSet<zvariant::Str<'static>>,
274 ) -> (Arc<Self>, Task<()>) {
275 let cache = Arc::new(PropertiesCache {
276 values: Default::default(),
277 caching_result: RwLock::new(CachingResult::Caching {
278 ready: Event::new(),
279 }),
280 });
281
282 let cache_clone = cache.clone();
283 let task_name = format!("{interface} proxy caching");
284 let proxy_caching = async move {
285 let result = cache_clone
286 .init(proxy, interface, uncached_properties)
287 .await;
288 let (prop_changes, interface, uncached_properties) = {
289 let mut caching_result = cache_clone.caching_result.write().expect("lock poisoned");
290 let ready = match &*caching_result {
291 CachingResult::Caching { ready } => ready,
292 _ => unreachable!(),
295 };
296 match result {
297 Ok((prop_changes, interface, uncached_properties)) => {
298 ready.notify(usize::MAX);
299 *caching_result = CachingResult::Cached { result: Ok(()) };
300
301 (prop_changes, interface, uncached_properties)
302 }
303 Err(e) => {
304 ready.notify(usize::MAX);
305 *caching_result = CachingResult::Cached { result: Err(e) };
306
307 return;
308 }
309 }
310 };
311
312 if let Err(e) = cache_clone
313 .keep_updated(prop_changes, interface, uncached_properties)
314 .await
315 {
316 debug!("Error keeping properties cache updated: {e}");
317 }
318 }
319 .instrument(info_span!("{}", task_name));
320 let task = executor.spawn(proxy_caching, &task_name);
321
322 (cache, task)
323 }
324
325 async fn init(
327 &self,
328 proxy: PropertiesProxy<'static>,
329 interface: InterfaceName<'static>,
330 uncached_properties: HashSet<zvariant::Str<'static>>,
331 ) -> Result<(
332 PropertiesChangedStream,
333 InterfaceName<'static>,
334 HashSet<zvariant::Str<'static>>,
335 )> {
336 use ordered_stream::OrderedStreamExt;
337
338 let prop_changes = proxy.receive_properties_changed().await?.map(Either::Left);
339
340 let get_all = proxy
341 .inner()
342 .connection()
343 .call_method_raw(
344 Some(proxy.inner().destination()),
345 proxy.inner().path(),
346 Some(proxy.inner().interface()),
347 "GetAll",
348 BitFlags::empty(),
349 &interface,
350 )
351 .await
352 .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
353
354 let mut join = join_streams(prop_changes, get_all);
355
356 loop {
357 match join.next().await {
358 Some(Either::Left(_update)) => {
359 }
361 Some(Either::Right(populate)) => {
362 populate?.body().deserialize().map(|values| {
363 self.update_cache(&uncached_properties, &values, &[], &interface);
364 })?;
365 break;
366 }
367 None => break,
368 }
369 }
370 if let Some((Either::Left(update), _)) = Pin::new(&mut join).take_buffered() {
371 if let Ok(args) = update.args() {
374 if args.interface_name == interface {
375 self.update_cache(
376 &uncached_properties,
377 &args.changed_properties,
378 &args.invalidated_properties,
379 &interface,
380 );
381 }
382 }
383 }
384 let prop_changes = join.into_inner().0.into_inner();
388
389 Ok((prop_changes, interface, uncached_properties))
390 }
391
392 #[instrument(skip_all)]
394 async fn keep_updated(
395 &self,
396 mut prop_changes: PropertiesChangedStream,
397 interface: InterfaceName<'static>,
398 uncached_properties: HashSet<zvariant::Str<'static>>,
399 ) -> Result<()> {
400 use futures_lite::StreamExt;
401
402 trace!("Listening for property changes on {interface}...");
403 while let Some(update) = prop_changes.next().await {
404 if let Ok(args) = update.args() {
405 if args.interface_name == interface {
406 self.update_cache(
407 &uncached_properties,
408 &args.changed_properties,
409 &args.invalidated_properties,
410 &interface,
411 );
412 }
413 }
414 }
415
416 Ok(())
417 }
418
419 fn update_cache(
420 &self,
421 uncached_properties: &HashSet<Str<'_>>,
422 changed: &HashMap<&str, Value<'_>>,
423 invalidated: &[&str],
424 interface: &InterfaceName<'_>,
425 ) {
426 let mut values = self.values.write().expect("lock poisoned");
427
428 for inval in invalidated {
429 if uncached_properties.contains(&Str::from(*inval)) {
430 debug!(
431 "Ignoring invalidation of uncached property `{}.{}`",
432 interface, inval
433 );
434 continue;
435 }
436 trace!("Property `{interface}.{inval}` invalidated");
437
438 if let Some(entry) = values.get_mut(*inval) {
439 entry.value = None;
440 entry.event.notify(usize::MAX);
441 }
442 }
443
444 for (property_name, value) in changed {
445 if uncached_properties.contains(&Str::from(*property_name)) {
446 debug!(
447 "Ignoring update of uncached property `{}.{}`",
448 interface, property_name
449 );
450 continue;
451 }
452 trace!("Property `{interface}.{property_name}` updated");
453
454 let entry = values.entry(property_name.to_string()).or_default();
455
456 let value = match OwnedValue::try_from(value) {
457 Ok(value) => value,
458 Err(e) => {
459 debug!(
460 "Failed to convert property `{interface}.{property_name}` to OwnedValue: {e}"
461 );
462 continue;
463 }
464 };
465 entry.value = Some(value);
466 entry.event.notify(usize::MAX);
467 }
468 }
469
470 pub(crate) async fn ready(&self) -> Result<()> {
472 let listener = match &*self.caching_result.read().expect("lock poisoned") {
473 CachingResult::Caching { ready } => ready.listen(),
474 CachingResult::Cached { result } => return result.clone(),
475 };
476 listener.await;
477
478 match &*self.caching_result.read().expect("lock poisoned") {
480 CachingResult::Caching { .. } => unreachable!(),
483 CachingResult::Cached { result } => result.clone(),
484 }
485 }
486}
487
488impl<'a> ProxyInner<'a> {
489 pub(crate) fn new(
490 conn: Connection,
491 destination: BusName<'a>,
492 path: ObjectPath<'a>,
493 interface: InterfaceName<'a>,
494 cache: CacheProperties,
495 uncached_properties: HashSet<Str<'a>>,
496 ) -> Self {
497 let property_cache = match cache {
498 CacheProperties::Yes | CacheProperties::Lazily => Some(OnceLock::new()),
499 CacheProperties::No => None,
500 };
501 Self {
502 inner_without_borrows: ProxyInnerStatic {
503 conn,
504 dest_owner_change_match_rule: OnceLock::new(),
505 },
506 destination,
507 path,
508 interface,
509 property_cache,
510 uncached_properties,
511 }
512 }
513
514 pub(crate) async fn subscribe_dest_owner_change(&self) -> Result<()> {
518 if !self.inner_without_borrows.conn.is_bus() {
519 return Ok(());
521 }
522
523 let well_known_name = match &self.destination {
524 BusName::WellKnown(well_known_name) => well_known_name,
525 BusName::Unique(_) => return Ok(()),
526 };
527
528 if self
529 .inner_without_borrows
530 .dest_owner_change_match_rule
531 .get()
532 .is_some()
533 {
534 return Ok(());
536 }
537
538 let conn = &self.inner_without_borrows.conn;
539 let signal_rule: OwnedMatchRule = MatchRule::builder()
540 .msg_type(Type::Signal)
541 .sender("org.freedesktop.DBus")?
542 .path("/org/freedesktop/DBus")?
543 .interface("org.freedesktop.DBus")?
544 .member("NameOwnerChanged")?
545 .add_arg(well_known_name.as_str())?
546 .build()
547 .to_owned()
548 .into();
549
550 conn.add_match(
551 signal_rule.clone(),
552 Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
553 )
554 .await?;
555
556 if self
557 .inner_without_borrows
558 .dest_owner_change_match_rule
559 .set(signal_rule.clone())
560 .is_err()
561 {
562 conn.remove_match(signal_rule).await?;
564 }
565
566 Ok(())
567 }
568}
569
570const MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED: usize = 8;
571
572impl<'a> Proxy<'a> {
573 pub async fn new<D, P, I>(
575 conn: &Connection,
576 destination: D,
577 path: P,
578 interface: I,
579 ) -> Result<Proxy<'a>>
580 where
581 D: TryInto<BusName<'a>>,
582 P: TryInto<ObjectPath<'a>>,
583 I: TryInto<InterfaceName<'a>>,
584 D::Error: Into<Error>,
585 P::Error: Into<Error>,
586 I::Error: Into<Error>,
587 {
588 Builder::new(conn)
589 .destination(destination)?
590 .path(path)?
591 .interface(interface)?
592 .build()
593 .await
594 }
595
596 pub async fn new_owned<D, P, I>(
599 conn: Connection,
600 destination: D,
601 path: P,
602 interface: I,
603 ) -> Result<Proxy<'a>>
604 where
605 D: TryInto<BusName<'static>>,
606 P: TryInto<ObjectPath<'static>>,
607 I: TryInto<InterfaceName<'static>>,
608 D::Error: Into<Error>,
609 P::Error: Into<Error>,
610 I::Error: Into<Error>,
611 {
612 Builder::new(&conn)
613 .destination(destination)?
614 .path(path)?
615 .interface(interface)?
616 .build()
617 .await
618 }
619
620 pub fn connection(&self) -> &Connection {
622 &self.inner.inner_without_borrows.conn
623 }
624
625 pub fn destination(&self) -> &BusName<'a> {
627 &self.inner.destination
628 }
629
630 pub fn path(&self) -> &ObjectPath<'a> {
632 &self.inner.path
633 }
634
635 pub fn interface(&self) -> &InterfaceName<'a> {
637 &self.inner.interface
638 }
639
640 pub async fn introspect(&self) -> fdo::Result<String> {
645 let proxy = IntrospectableProxy::builder(&self.inner.inner_without_borrows.conn)
646 .destination(&self.inner.destination)?
647 .path(&self.inner.path)?
648 .build()
649 .await?;
650
651 proxy.introspect().await
652 }
653
654 fn properties_proxy(&self) -> PropertiesProxy<'_> {
655 PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
656 .destination(self.inner.destination.as_ref())
658 .unwrap()
659 .path(self.inner.path.as_ref())
661 .unwrap()
662 .cache_properties(CacheProperties::No)
664 .build_internal()
665 .unwrap()
666 .into()
667 }
668
669 fn owned_properties_proxy(&self) -> PropertiesProxy<'static> {
670 PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
671 .destination(self.inner.destination.to_owned())
673 .unwrap()
674 .path(self.inner.path.to_owned())
676 .unwrap()
677 .cache_properties(CacheProperties::No)
679 .build_internal()
680 .unwrap()
681 .into()
682 }
683
684 pub(crate) fn get_property_cache(&self) -> Option<&Arc<PropertiesCache>> {
689 let cache = match &self.inner.property_cache {
690 Some(cache) => cache,
691 None => return None,
692 };
693 let (cache, _) = &cache.get_or_init(|| {
694 let proxy = self.owned_properties_proxy();
695 let interface = self.interface().to_owned();
696 let uncached_properties: HashSet<zvariant::Str<'static>> = self
697 .inner
698 .uncached_properties
699 .iter()
700 .map(|s| s.to_owned())
701 .collect();
702 let executor = self.connection().executor();
703
704 PropertiesCache::new(proxy, interface, executor, uncached_properties)
705 });
706
707 Some(cache)
708 }
709
710 pub fn cached_property<T>(&self, property_name: &str) -> Result<Option<T>>
717 where
718 T: TryFrom<OwnedValue>,
719 T::Error: Into<Error>,
720 {
721 self.cached_property_raw(property_name)
722 .as_deref()
723 .map(|v| T::try_from(OwnedValue::try_from(v)?).map_err(Into::into))
724 .transpose()
725 }
726
727 pub fn cached_property_raw<'p>(
732 &'p self,
733 property_name: &'p str,
734 ) -> Option<impl Deref<Target = Value<'static>> + 'p> {
735 if let Some(values) = self
736 .inner
737 .property_cache
738 .as_ref()
739 .and_then(OnceLock::get)
740 .map(|c| c.0.values.read().expect("lock poisoned"))
741 {
742 values
744 .get(property_name)
745 .and_then(|e| e.value.as_ref())?;
747
748 struct Wrapper<'a> {
749 values: RwLockReadGuard<'a, HashMap<String, PropertyValue>>,
750 property_name: &'a str,
751 }
752
753 impl Deref for Wrapper<'_> {
754 type Target = Value<'static>;
755
756 fn deref(&self) -> &Self::Target {
757 self.values
758 .get(self.property_name)
759 .and_then(|e| e.value.as_ref())
760 .map(|v| v.deref())
761 .expect("inexistent property")
762 }
763 }
764
765 Some(Wrapper {
766 values,
767 property_name,
768 })
769 } else {
770 None
771 }
772 }
773
774 async fn get_proxy_property(&self, property_name: &str) -> Result<OwnedValue> {
775 Ok(self
776 .properties_proxy()
777 .get(self.inner.interface.as_ref(), property_name)
778 .await?)
779 }
780
781 pub async fn get_property<T>(&self, property_name: &str) -> Result<T>
786 where
787 T: TryFrom<OwnedValue>,
788 T::Error: Into<Error>,
789 {
790 if let Some(cache) = self.get_property_cache() {
791 cache.ready().await?;
792 }
793 if let Some(value) = self.cached_property(property_name)? {
794 return Ok(value);
795 }
796
797 let value = self.get_proxy_property(property_name).await?;
798 value.try_into().map_err(Into::into)
799 }
800
801 pub async fn set_property<'t, T>(&self, property_name: &str, value: T) -> fdo::Result<()>
805 where
806 T: 't + Into<Value<'t>>,
807 {
808 self.properties_proxy()
809 .set(self.inner.interface.as_ref(), property_name, value.into())
810 .await
811 }
812
813 pub async fn call_method<'m, M, B>(&self, method_name: M, body: &B) -> Result<Message>
821 where
822 M: TryInto<MemberName<'m>>,
823 M::Error: Into<Error>,
824 B: serde::ser::Serialize + zvariant::DynamicType,
825 {
826 self.inner
827 .inner_without_borrows
828 .conn
829 .call_method(
830 Some(&self.inner.destination),
831 self.inner.path.as_str(),
832 Some(&self.inner.interface),
833 method_name,
834 body,
835 )
836 .await
837 }
838
839 pub async fn call<'m, M, B, R>(&self, method_name: M, body: &B) -> Result<R>
845 where
846 M: TryInto<MemberName<'m>>,
847 M::Error: Into<Error>,
848 B: serde::ser::Serialize + zvariant::DynamicType,
849 R: for<'d> zvariant::DynamicDeserialize<'d>,
850 {
851 let reply = self.call_method(method_name, body).await?;
852
853 reply.body().deserialize()
854 }
855
856 pub async fn call_with_flags<'m, M, B, R>(
866 &self,
867 method_name: M,
868 flags: BitFlags<MethodFlags>,
869 body: &B,
870 ) -> Result<Option<R>>
871 where
872 M: TryInto<MemberName<'m>>,
873 M::Error: Into<Error>,
874 B: serde::ser::Serialize + zvariant::DynamicType,
875 R: for<'d> zvariant::DynamicDeserialize<'d>,
876 {
877 let flags = flags.iter().map(Flags::from).collect::<BitFlags<_>>();
878 match self
879 .inner
880 .inner_without_borrows
881 .conn
882 .call_method_raw(
883 Some(self.destination()),
884 self.path(),
885 Some(self.interface()),
886 method_name,
887 flags,
888 body,
889 )
890 .await?
891 {
892 Some(reply) => reply.await?.body().deserialize().map(Some),
893 None => Ok(None),
894 }
895 }
896
897 pub async fn call_noreply<'m, M, B>(&self, method_name: M, body: &B) -> Result<()>
901 where
902 M: TryInto<MemberName<'m>>,
903 M::Error: Into<Error>,
904 B: serde::ser::Serialize + zvariant::DynamicType,
905 {
906 self.call_with_flags::<_, _, ()>(method_name, MethodFlags::NoReplyExpected.into(), body)
907 .await?;
908 Ok(())
909 }
910
911 pub async fn receive_signal<'m, M>(&self, signal_name: M) -> Result<SignalStream<'m>>
919 where
920 M: TryInto<MemberName<'m>>,
921 M::Error: Into<Error>,
922 {
923 self.receive_signal_with_args(signal_name, &[]).await
924 }
925
926 pub async fn receive_signal_with_args<'m, M>(
935 &self,
936 signal_name: M,
937 args: &[(u8, &str)],
938 ) -> Result<SignalStream<'m>>
939 where
940 M: TryInto<MemberName<'m>>,
941 M::Error: Into<Error>,
942 {
943 let signal_name = signal_name.try_into().map_err(Into::into)?;
944 self.receive_signals(Some(signal_name), args).await
945 }
946
947 async fn receive_signals<'m>(
948 &self,
949 signal_name: Option<MemberName<'m>>,
950 args: &[(u8, &str)],
951 ) -> Result<SignalStream<'m>> {
952 self.inner.subscribe_dest_owner_change().await?;
953
954 SignalStream::new(self.clone(), signal_name, args).await
955 }
956
957 pub async fn receive_all_signals(&self) -> Result<SignalStream<'static>> {
959 self.receive_signals(None, &[]).await
960 }
961
962 pub async fn receive_property_changed<'name: 'a, T>(
970 &self,
971 name: &'name str,
972 ) -> PropertyStream<'a, T> {
973 let properties = self.get_property_cache();
974 let changed_listener = if let Some(properties) = &properties {
975 let mut values = properties.values.write().expect("lock poisoned");
976 let entry = values
977 .entry(name.to_string())
978 .or_insert_with(PropertyValue::default);
979 let listener = entry.event.listen();
980 if entry.value.is_some() {
981 entry.event.notify(1);
982 }
983 listener
984 } else {
985 Event::new().listen()
986 };
987
988 PropertyStream {
989 name,
990 proxy: self.clone(),
991 changed_listener,
992 phantom: std::marker::PhantomData,
993 }
994 }
995
996 pub async fn receive_owner_changed(&self) -> Result<OwnerChangedStream<'a>> {
1008 use ordered_stream::OrderedStreamExt;
1009 let dbus_proxy = fdo::DBusProxy::builder(self.connection())
1010 .cache_properties(CacheProperties::No)
1011 .build()
1012 .await?;
1013 Ok(OwnerChangedStream {
1014 stream: dbus_proxy
1015 .receive_name_owner_changed_with_args(&[(0, self.destination().as_str())])
1016 .await?
1017 .map(Box::new(move |signal| {
1018 let args = signal.args().unwrap();
1019 let new_owner = args.new_owner().as_ref().map(|owner| owner.to_owned());
1020
1021 new_owner
1022 })),
1023 name: self.destination().clone(),
1024 })
1025 }
1026}
1027
1028#[derive(Debug, Default)]
1029struct PropertyValue {
1030 value: Option<OwnedValue>,
1031 event: Event,
1032}
1033
1034#[bitflags]
1036#[repr(u8)]
1037#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1038pub enum MethodFlags {
1039 NoReplyExpected = 0x1,
1048
1049 NoAutoStart = 0x2,
1057
1058 AllowInteractiveAuth = 0x4,
1064}
1065
1066impl From<MethodFlags> for Flags {
1067 fn from(method_flag: MethodFlags) -> Self {
1068 match method_flag {
1069 MethodFlags::NoReplyExpected => Self::NoReplyExpected,
1070 MethodFlags::NoAutoStart => Self::NoAutoStart,
1071 MethodFlags::AllowInteractiveAuth => Self::AllowInteractiveAuth,
1072 }
1073 }
1074}
1075
1076type OwnerChangedStreamMap = Map<
1077 fdo::NameOwnerChangedStream,
1078 Box<dyn FnMut(fdo::NameOwnerChanged) -> Option<UniqueName<'static>> + Send + Sync + Unpin>,
1079>;
1080
1081pub struct OwnerChangedStream<'a> {
1085 stream: OwnerChangedStreamMap,
1086 name: BusName<'a>,
1087}
1088
1089impl<'a> OwnerChangedStream<'a> {
1090 pub fn name(&self) -> &BusName<'a> {
1092 &self.name
1093 }
1094}
1095
1096impl stream::Stream for OwnerChangedStream<'_> {
1097 type Item = Option<UniqueName<'static>>;
1098
1099 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1100 OrderedStream::poll_next_before(self, cx, None).map(|res| res.into_data())
1101 }
1102}
1103
1104impl OrderedStream for OwnerChangedStream<'_> {
1105 type Data = Option<UniqueName<'static>>;
1106 type Ordering = Sequence;
1107
1108 fn poll_next_before(
1109 self: Pin<&mut Self>,
1110 cx: &mut Context<'_>,
1111 before: Option<&Self::Ordering>,
1112 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
1113 Pin::new(&mut self.get_mut().stream).poll_next_before(cx, before)
1114 }
1115}
1116
1117#[derive(Debug)]
1124pub struct SignalStream<'a> {
1125 stream: Join<MessageStream, Option<MessageStream>>,
1126 src_unique_name: Option<UniqueName<'static>>,
1127 signal_name: Option<MemberName<'a>>,
1128}
1129
1130impl<'a> SignalStream<'a> {
1131 pub fn name(&self) -> Option<&MemberName<'a>> {
1133 self.signal_name.as_ref()
1134 }
1135
1136 async fn new(
1137 proxy: Proxy<'_>,
1138 signal_name: Option<MemberName<'a>>,
1139 args: &[(u8, &str)],
1140 ) -> Result<SignalStream<'a>> {
1141 let mut rule_builder = MatchRule::builder()
1142 .msg_type(Type::Signal)
1143 .sender(proxy.destination())?
1144 .path(proxy.path())?
1145 .interface(proxy.interface())?;
1146 if let Some(name) = &signal_name {
1147 rule_builder = rule_builder.member(name)?;
1148 }
1149 for (i, arg) in args {
1150 rule_builder = rule_builder.arg(*i, *arg)?;
1151 }
1152 let signal_rule: OwnedMatchRule = rule_builder.build().to_owned().into();
1153 let conn = proxy.connection();
1154
1155 let (src_unique_name, stream) = match proxy.destination().to_owned() {
1156 BusName::Unique(name) => (
1157 Some(name),
1158 join_streams(
1159 MessageStream::for_match_rule(signal_rule, conn, None).await?,
1160 None,
1161 ),
1162 ),
1163 BusName::WellKnown(name) => {
1164 use ordered_stream::OrderedStreamExt;
1165
1166 let name_owner_changed_rule = MatchRule::builder()
1167 .msg_type(Type::Signal)
1168 .sender("org.freedesktop.DBus")?
1169 .path("/org/freedesktop/DBus")?
1170 .interface("org.freedesktop.DBus")?
1171 .member("NameOwnerChanged")?
1172 .add_arg(name.as_str())?
1173 .build();
1174 let name_owner_changed_stream = MessageStream::for_match_rule(
1175 name_owner_changed_rule,
1176 conn,
1177 Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
1178 )
1179 .await?
1180 .map(Either::Left);
1181
1182 let get_name_owner = conn
1183 .call_method_raw(
1184 Some("org.freedesktop.DBus"),
1185 "/org/freedesktop/DBus",
1186 Some("org.freedesktop.DBus"),
1187 "GetNameOwner",
1188 BitFlags::empty(),
1189 &name,
1190 )
1191 .await
1192 .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
1193
1194 let mut join = join_streams(name_owner_changed_stream, get_name_owner);
1195
1196 let mut src_unique_name = loop {
1197 match join.next().await {
1198 Some(Either::Left(Ok(msg))) => {
1199 let signal = NameOwnerChanged::from_message(msg)
1200 .expect("`NameOwnerChanged` signal stream got wrong message");
1201 {
1202 break signal
1203 .args()
1204 .expect("`NameOwnerChanged` signal has no args")
1207 .new_owner()
1208 .as_ref()
1209 .map(UniqueName::to_owned);
1210 }
1211 }
1212 Some(Either::Left(Err(_))) => (),
1213 Some(Either::Right(Ok(response))) => {
1214 break Some(response.body().deserialize::<UniqueName<'_>>()?.to_owned())
1215 }
1216 Some(Either::Right(Err(e))) => {
1217 debug!("Failed to get owner of {name}: {e}");
1219
1220 break None;
1221 }
1222 None => {
1223 return Err(Error::InputOutput(
1224 std::io::Error::new(
1225 std::io::ErrorKind::BrokenPipe,
1226 "connection closed",
1227 )
1228 .into(),
1229 ))
1230 }
1231 }
1232 };
1233
1234 let (stream, _, queued) = join.into_inner();
1236 if let Some(msg) = queued.and_then(|e| match e.0 {
1237 Either::Left(Ok(msg)) => Some(msg),
1238 Either::Left(Err(_)) | Either::Right(_) => None,
1239 }) {
1240 if let Some(signal) = NameOwnerChanged::from_message(msg) {
1241 if let Ok(args) = signal.args() {
1242 match (args.name(), args.new_owner().deref()) {
1243 (BusName::WellKnown(n), Some(new_owner)) if n == &name => {
1244 src_unique_name = Some(new_owner.to_owned());
1245 }
1246 _ => (),
1247 }
1248 }
1249 }
1250 }
1251 let name_owner_changed_stream = stream.into_inner();
1252
1253 let stream = join_streams(
1254 MessageStream::for_match_rule(signal_rule, conn, None).await?,
1255 Some(name_owner_changed_stream),
1256 );
1257
1258 (src_unique_name, stream)
1259 }
1260 };
1261
1262 Ok(SignalStream {
1263 stream,
1264 src_unique_name,
1265 signal_name,
1266 })
1267 }
1268
1269 fn filter(&mut self, msg: &Message) -> Result<bool> {
1270 let header = msg.header();
1271 let sender = header.sender();
1272 if sender == self.src_unique_name.as_ref() {
1273 return Ok(true);
1274 }
1275
1276 if let Some(signal) = NameOwnerChanged::from_message(msg.clone()) {
1278 let args = signal.args()?;
1279 self.src_unique_name = args.new_owner().as_ref().map(|n| n.to_owned());
1280 }
1281
1282 Ok(false)
1283 }
1284}
1285
1286impl stream::Stream for SignalStream<'_> {
1287 type Item = Message;
1288
1289 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1290 OrderedStream::poll_next_before(self, cx, None).map(|res| res.into_data())
1291 }
1292}
1293
1294impl OrderedStream for SignalStream<'_> {
1295 type Data = Message;
1296 type Ordering = Sequence;
1297
1298 fn poll_next_before(
1299 self: Pin<&mut Self>,
1300 cx: &mut Context<'_>,
1301 before: Option<&Self::Ordering>,
1302 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
1303 let this = self.get_mut();
1304 loop {
1305 match ready!(OrderedStream::poll_next_before(
1306 Pin::new(&mut this.stream),
1307 cx,
1308 before
1309 )) {
1310 PollResult::Item { data, ordering } => {
1311 if let Ok(msg) = data {
1312 if let Ok(true) = this.filter(&msg) {
1313 return Poll::Ready(PollResult::Item {
1314 data: msg,
1315 ordering,
1316 });
1317 }
1318 }
1319 }
1320 PollResult::Terminated => return Poll::Ready(PollResult::Terminated),
1321 PollResult::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
1322 }
1323 }
1324 }
1325}
1326
1327impl stream::FusedStream for SignalStream<'_> {
1328 fn is_terminated(&self) -> bool {
1329 ordered_stream::FusedOrderedStream::is_terminated(&self.stream)
1330 }
1331}
1332
1333#[async_trait::async_trait]
1334impl AsyncDrop for SignalStream<'_> {
1335 async fn async_drop(self) {
1336 let (signals, names, _buffered) = self.stream.into_inner();
1337 signals.async_drop().await;
1338 if let Some(names) = names {
1339 names.async_drop().await;
1340 }
1341 }
1342}
1343
1344#[cfg(feature = "blocking-api")]
1345impl<'a> From<crate::blocking::Proxy<'a>> for Proxy<'a> {
1346 fn from(proxy: crate::blocking::Proxy<'a>) -> Self {
1347 proxy.into_inner()
1348 }
1349}
1350
1351pub trait ProxyImpl<'c>
1354where
1355 Self: Sized,
1356{
1357 fn builder(conn: &Connection) -> Builder<'c, Self>;
1359
1360 fn into_inner(self) -> Proxy<'c>;
1362
1363 fn inner(&self) -> &Proxy<'c>;
1365}
1366
1367enum Either<L, R> {
1368 Left(L),
1369 Right(R),
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374 use super::*;
1375 use crate::{connection, interface, object_server::SignalEmitter, proxy, utils::block_on};
1376 use futures_util::StreamExt;
1377 use ntest::timeout;
1378 use test_log::test;
1379
1380 #[test]
1381 #[timeout(15000)]
1382 fn signal() {
1383 block_on(test_signal()).unwrap();
1384 }
1385
1386 async fn test_signal() -> Result<()> {
1387 let conn = Connection::session().await?;
1390 let dest_conn = Connection::session().await?;
1391 let unique_name = dest_conn.unique_name().unwrap().clone();
1392
1393 let well_known = "org.freedesktop.zbus.async.ProxySignalStreamTest";
1394 let proxy: Proxy<'_> = Builder::new(&conn)
1395 .destination(well_known)?
1396 .path("/does/not/matter")?
1397 .interface("does.not.matter")?
1398 .build()
1399 .await?;
1400 let mut owner_changed_stream = proxy.receive_owner_changed().await?;
1401
1402 let proxy = fdo::DBusProxy::new(&dest_conn).await?;
1403 let mut name_acquired_stream = proxy
1404 .inner()
1405 .receive_signal_with_args("NameAcquired", &[(0, well_known)])
1406 .await?;
1407
1408 let prop_stream = proxy
1409 .inner()
1410 .receive_property_changed("SomeProp")
1411 .await
1412 .filter_map(|changed| async move {
1413 let v: Option<u32> = changed.get().await.ok();
1414 dbg!(v)
1415 });
1416 drop(proxy);
1417 drop(prop_stream);
1418
1419 dest_conn.request_name(well_known).await?;
1420
1421 let (new_owner, acquired_signal) =
1422 futures_util::join!(owner_changed_stream.next(), name_acquired_stream.next(),);
1423
1424 assert_eq!(&new_owner.unwrap().unwrap(), &*unique_name);
1425
1426 let acquired_signal = acquired_signal.unwrap();
1427 assert_eq!(
1428 acquired_signal.body().deserialize::<&str>().unwrap(),
1429 well_known
1430 );
1431
1432 let proxy = Proxy::new(&conn, &unique_name, "/does/not/matter", "does.not.matter").await?;
1433 let mut unique_name_changed_stream = proxy.receive_owner_changed().await?;
1434
1435 drop(dest_conn);
1436 name_acquired_stream.async_drop().await;
1437
1438 let new_owner = owner_changed_stream.next().await;
1440 assert!(new_owner.unwrap().is_none());
1441
1442 let new_unique_owner = unique_name_changed_stream.next().await;
1443 assert!(new_unique_owner.unwrap().is_none());
1444
1445 Ok(())
1446 }
1447
1448 #[test]
1449 #[timeout(15000)]
1450 fn signal_stream_deadlock() {
1451 block_on(test_signal_stream_deadlock()).unwrap();
1452 }
1453
1454 async fn test_signal_stream_deadlock() -> Result<()> {
1462 #[proxy(
1463 gen_blocking = false,
1464 default_path = "/org/zbus/Test",
1465 default_service = "org.zbus.Test.MR501",
1466 interface = "org.zbus.Test"
1467 )]
1468 trait Test {
1469 #[zbus(signal)]
1470 fn my_signal(&self, msg: &str) -> Result<()>;
1471 }
1472
1473 struct TestIface;
1474
1475 #[interface(name = "org.zbus.Test")]
1476 impl TestIface {
1477 #[zbus(signal)]
1478 async fn my_signal(context: &SignalEmitter<'_>, msg: &'static str) -> Result<()>;
1479 }
1480
1481 let test_iface = TestIface;
1482 let server_conn = connection::Builder::session()?
1483 .name("org.zbus.Test.MR501")?
1484 .serve_at("/org/zbus/Test", test_iface)?
1485 .build()
1486 .await?;
1487
1488 let client_conn = connection::Builder::session()?
1489 .max_queued(1)
1490 .build()
1491 .await?;
1492
1493 let test_proxy = TestProxy::new(&client_conn).await?;
1494 let test_prop_proxy = PropertiesProxy::builder(&client_conn)
1495 .destination("org.zbus.Test.MR501")?
1496 .path("/org/zbus/Test")?
1497 .build()
1498 .await?;
1499
1500 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1501
1502 let handle = {
1503 let tx = tx.clone();
1504 let conn = server_conn.clone();
1505 let server_fut = async move {
1506 use std::time::Duration;
1507
1508 #[cfg(not(feature = "tokio"))]
1509 use async_io::Timer;
1510
1511 #[cfg(feature = "tokio")]
1512 use tokio::time::sleep;
1513
1514 let iface_ref = conn
1515 .object_server()
1516 .interface::<_, TestIface>("/org/zbus/Test")
1517 .await
1518 .unwrap();
1519
1520 let context = iface_ref.signal_emitter();
1521 while !tx.is_closed() {
1522 for _ in 0..10 {
1523 TestIface::my_signal(context, "This is a test")
1524 .await
1525 .unwrap();
1526 }
1527
1528 #[cfg(not(feature = "tokio"))]
1529 Timer::after(Duration::from_millis(5)).await;
1530
1531 #[cfg(feature = "tokio")]
1532 sleep(Duration::from_millis(5)).await;
1533 }
1534 };
1535 server_conn.executor().spawn(server_fut, "server_task")
1536 };
1537
1538 let signal_fut = async {
1539 let mut signal_stream = test_proxy.receive_my_signal().await.unwrap();
1540
1541 tx.send(()).await.unwrap();
1542
1543 while let Some(_signal) = signal_stream.next().await {}
1544 };
1545
1546 let prop_fut = async move {
1547 rx.recv().await.unwrap();
1548 let _prop_stream = test_prop_proxy.receive_properties_changed().await.unwrap();
1549 };
1550
1551 futures_util::pin_mut!(signal_fut);
1552 futures_util::pin_mut!(prop_fut);
1553
1554 futures_util::future::select(signal_fut, prop_fut).await;
1555
1556 handle.await;
1557
1558 Ok(())
1559 }
1560}