zbus/
proxy.rs

1use enumflags2::{bitflags, BitFlags};
2use event_listener::{Event, EventListener};
3use futures_core::{ready, stream};
4use futures_util::{future::Either, stream::Map};
5use once_cell::sync::OnceCell;
6use ordered_stream::{join as join_streams, FromFuture, Join, OrderedStream, PollResult};
7use static_assertions::assert_impl_all;
8use std::{
9    collections::{HashMap, HashSet},
10    convert::{TryFrom, TryInto},
11    future::Future,
12    ops::Deref,
13    pin::Pin,
14    sync::{Arc, RwLock, RwLockReadGuard},
15    task::{Context, Poll},
16};
17use tracing::{debug, info_span, instrument, trace, Instrument};
18
19use zbus_names::{BusName, InterfaceName, MemberName, UniqueName};
20use zvariant::{ObjectPath, OwnedValue, Str, Value};
21
22use crate::{
23    fdo::{self, IntrospectableProxy, NameOwnerChanged, PropertiesChangedStream, PropertiesProxy},
24    AsyncDrop, CacheProperties, Connection, Error, Executor, MatchRule, Message, MessageFlags,
25    MessageSequence, MessageStream, MessageType, OwnedMatchRule, ProxyBuilder, Result, Task,
26};
27
28/// A client-side interface proxy.
29///
30/// A `Proxy` is a helper to interact with an interface on a remote object.
31///
32/// # Example
33///
34/// ```
35/// use std::result::Result;
36/// use std::error::Error;
37/// use zbus::{Connection, Proxy};
38///
39/// #[tokio::main]
40/// async fn main() -> Result<(), Box<dyn Error>> {
41///     let connection = Connection::session().await?;
42///     let p = Proxy::new(
43///         &connection,
44///         "org.freedesktop.DBus",
45///         "/org/freedesktop/DBus",
46///         "org.freedesktop.DBus",
47///     ).await?;
48///     // owned return value
49///     let _id: String = p.call("GetId", &()).await?;
50///     // borrowed return value
51///     let _id: &str = p.call_method("GetId", &()).await?.body()?;
52///
53///     Ok(())
54/// }
55/// ```
56///
57/// # Note
58///
59/// It is recommended to use the [`dbus_proxy`] macro, which provides a more convenient and
60/// type-safe *façade* `Proxy` derived from a Rust trait.
61///
62/// [`futures` crate]: https://crates.io/crates/futures
63/// [`dbus_proxy`]: attr.dbus_proxy.html
64#[derive(Clone, Debug)]
65pub struct Proxy<'a> {
66    pub(crate) inner: Arc<ProxyInner<'a>>,
67}
68
69assert_impl_all!(Proxy<'_>: Send, Sync, Unpin);
70
71/// This is required to avoid having the Drop impl extend the lifetime 'a, which breaks zbus_xmlgen
72/// (and possibly other crates).
73#[derive(derivative::Derivative)]
74#[derivative(Debug)]
75pub(crate) struct ProxyInnerStatic {
76    #[derivative(Debug = "ignore")]
77    pub(crate) conn: Connection,
78    dest_owner_change_match_rule: OnceCell<OwnedMatchRule>,
79}
80
81#[derive(Debug)]
82pub(crate) struct ProxyInner<'a> {
83    inner_without_borrows: ProxyInnerStatic,
84    pub(crate) destination: BusName<'a>,
85    pub(crate) path: ObjectPath<'a>,
86    pub(crate) interface: InterfaceName<'a>,
87
88    /// Cache of property values.
89    property_cache: Option<OnceCell<(Arc<PropertiesCache>, Task<()>)>>,
90    /// Set of properties which do not get cached, by name.
91    /// This overrides proxy-level caching behavior.
92    uncached_properties: HashSet<Str<'a>>,
93}
94
95impl Drop for ProxyInnerStatic {
96    fn drop(&mut self) {
97        if let Some(rule) = self.dest_owner_change_match_rule.take() {
98            self.conn.queue_remove_match(rule);
99        }
100    }
101}
102
103/// A property changed event.
104///
105/// The property changed event generated by [`PropertyStream`].
106pub struct PropertyChanged<'a, T> {
107    name: &'a str,
108    properties: Arc<PropertiesCache>,
109    proxy: Proxy<'a>,
110    phantom: std::marker::PhantomData<T>,
111}
112
113impl<'a, T> PropertyChanged<'a, T> {
114    // The name of the property that changed.
115    pub fn name(&self) -> &str {
116        self.name
117    }
118
119    // Get the raw value of the property that changed.
120    //
121    // If the notification signal contained the new value, it has been cached already and this call
122    // will return that value. Otherwise (i-e invalidated property), a D-Bus call is made to fetch
123    // and cache the new value.
124    pub async fn get_raw<'p>(&'p self) -> Result<impl Deref<Target = Value<'static>> + 'p> {
125        struct Wrapper<'w> {
126            name: &'w str,
127            values: RwLockReadGuard<'w, HashMap<String, PropertyValue>>,
128        }
129
130        impl<'w> Deref for Wrapper<'w> {
131            type Target = Value<'static>;
132
133            fn deref(&self) -> &Self::Target {
134                self.values
135                    .get(self.name)
136                    .expect("PropertyStream with no corresponding property")
137                    .value
138                    .as_ref()
139                    .expect("PropertyStream with no corresponding property")
140            }
141        }
142
143        {
144            let values = self.properties.values.read().expect("lock poisoned");
145            if values
146                .get(self.name)
147                .expect("PropertyStream with no corresponding property")
148                .value
149                .is_some()
150            {
151                return Ok(Wrapper {
152                    name: self.name,
153                    values,
154                });
155            }
156        }
157
158        // The property was invalidated, so we need to fetch the new value.
159        let properties_proxy = self.proxy.properties_proxy();
160        let value = properties_proxy
161            .get(self.proxy.inner.interface.clone(), self.name)
162            .await
163            .map_err(crate::Error::from)?;
164
165        // Save the new value
166        {
167            let mut values = self.properties.values.write().expect("lock poisoned");
168
169            values
170                .get_mut(self.name)
171                .expect("PropertyStream with no corresponding property")
172                .value = Some(value);
173        }
174
175        Ok(Wrapper {
176            name: self.name,
177            values: self.properties.values.read().expect("lock poisoned"),
178        })
179    }
180}
181
182impl<T> PropertyChanged<'_, T>
183where
184    T: TryFrom<zvariant::OwnedValue>,
185    T::Error: Into<crate::Error>,
186{
187    // Get the value of the property that changed.
188    //
189    // If the notification signal contained the new value, it has been cached already and this call
190    // will return that value. Otherwise (i-e invalidated property), a D-Bus call is made to fetch
191    // and cache the new value.
192    pub async fn get(&self) -> Result<T> {
193        self.get_raw()
194            .await
195            .and_then(|v| T::try_from(OwnedValue::from(&*v)).map_err(Into::into))
196    }
197}
198
199/// A [`stream::Stream`] implementation that yields property change notifications.
200///
201/// Use [`Proxy::receive_property_changed`] to create an instance of this type.
202#[derive(derivative::Derivative)]
203#[derivative(Debug)]
204pub struct PropertyStream<'a, T> {
205    name: &'a str,
206    proxy: Proxy<'a>,
207    changed_listener: EventListener,
208    phantom: std::marker::PhantomData<T>,
209}
210
211impl<'a, T> stream::Stream for PropertyStream<'a, T>
212where
213    T: Unpin,
214{
215    type Item = PropertyChanged<'a, T>;
216
217    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
218        let m = self.get_mut();
219        let properties = match m.proxy.get_property_cache() {
220            Some(properties) => properties.clone(),
221            // With no cache, we will get no updates; return immediately
222            None => return Poll::Ready(None),
223        };
224        ready!(Pin::new(&mut m.changed_listener).poll(cx));
225
226        m.changed_listener = properties
227            .values
228            .read()
229            .expect("lock poisoned")
230            .get(m.name)
231            .expect("PropertyStream with no corresponding property")
232            .event
233            .listen();
234
235        Poll::Ready(Some(PropertyChanged {
236            name: m.name,
237            properties,
238            proxy: m.proxy.clone(),
239            phantom: std::marker::PhantomData,
240        }))
241    }
242}
243
244#[derive(Debug)]
245pub(crate) struct PropertiesCache {
246    values: RwLock<HashMap<String, PropertyValue>>,
247    caching_result: RwLock<CachingResult>,
248}
249
250#[derive(Debug)]
251enum CachingResult {
252    Caching { ready: Event },
253    Cached { result: Result<()> },
254}
255
256impl PropertiesCache {
257    #[instrument(skip_all)]
258    fn new(
259        proxy: PropertiesProxy<'static>,
260        interface: InterfaceName<'static>,
261        executor: &Executor<'_>,
262        uncached_properties: HashSet<zvariant::Str<'static>>,
263    ) -> (Arc<Self>, Task<()>) {
264        let cache = Arc::new(PropertiesCache {
265            values: Default::default(),
266            caching_result: RwLock::new(CachingResult::Caching {
267                ready: Event::new(),
268            }),
269        });
270
271        let cache_clone = cache.clone();
272        let task_name = format!("{interface} proxy caching");
273        let proxy_caching = async move {
274            let result = cache_clone
275                .init(proxy, interface, uncached_properties)
276                .await;
277            let (prop_changes, interface, uncached_properties) = {
278                let mut caching_result = cache_clone.caching_result.write().expect("lock poisoned");
279                let ready = match &*caching_result {
280                    CachingResult::Caching { ready } => ready,
281                    // SAFETY: This is the only part of the code that changes this state and it's
282                    // only run once.
283                    _ => unreachable!(),
284                };
285                match result {
286                    Ok((prop_changes, interface, uncached_properties)) => {
287                        ready.notify(usize::MAX);
288                        *caching_result = CachingResult::Cached { result: Ok(()) };
289
290                        (prop_changes, interface, uncached_properties)
291                    }
292                    Err(e) => {
293                        ready.notify(usize::MAX);
294                        *caching_result = CachingResult::Cached { result: Err(e) };
295
296                        return;
297                    }
298                }
299            };
300
301            if let Err(e) = cache_clone
302                .keep_updated(prop_changes, interface, uncached_properties)
303                .await
304            {
305                debug!("Error keeping properties cache updated: {e}");
306            }
307        }
308        .instrument(info_span!("{}", task_name));
309        let task = executor.spawn(proxy_caching, &task_name);
310
311        (cache, task)
312    }
313
314    // new() runs this in a task it spawns for initialization of properties cache.
315    async fn init(
316        &self,
317        proxy: PropertiesProxy<'static>,
318        interface: InterfaceName<'static>,
319        uncached_properties: HashSet<zvariant::Str<'static>>,
320    ) -> Result<(
321        PropertiesChangedStream<'static>,
322        InterfaceName<'static>,
323        HashSet<zvariant::Str<'static>>,
324    )> {
325        use ordered_stream::OrderedStreamExt;
326
327        let prop_changes = proxy.receive_properties_changed().await?.map(Either::Left);
328
329        let get_all = proxy
330            .connection()
331            .call_method_raw(
332                Some(proxy.destination()),
333                proxy.path(),
334                Some(proxy.interface()),
335                "GetAll",
336                BitFlags::empty(),
337                &interface,
338            )
339            .await
340            .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
341
342        let mut join = join_streams(prop_changes, get_all);
343
344        loop {
345            match join.next().await {
346                Some(Either::Left(_update)) => {
347                    // discard updates prior to the initial population
348                }
349                Some(Either::Right(populate)) => {
350                    populate?.body().map(|values| {
351                        self.update_cache(&uncached_properties, &values, Vec::new(), &interface);
352                    })?;
353                    break;
354                }
355                None => break,
356            }
357        }
358        if let Some((Either::Left(update), _)) = Pin::new(&mut join).take_buffered() {
359            // if an update was buffered, then it happened after the get_all returned and needs to
360            // be applied before we discard the join
361            if let Ok(args) = update.args() {
362                if args.interface_name == interface {
363                    self.update_cache(
364                        &uncached_properties,
365                        &args.changed_properties,
366                        args.invalidated_properties,
367                        &interface,
368                    );
369                }
370            }
371        }
372        // This is needed to avoid a "implementation of `OrderedStream` is not general enough"
373        // error that occurs if you apply the map and join to Pin::new(&mut prop_changes) instead
374        // of directly to the stream.
375        let prop_changes = join.into_inner().0.into_inner();
376
377        Ok((prop_changes, interface, uncached_properties))
378    }
379
380    // new() runs this in a task it spawns for keeping the cache in sync.
381    #[instrument(skip_all)]
382    async fn keep_updated(
383        &self,
384        mut prop_changes: PropertiesChangedStream<'static>,
385        interface: InterfaceName<'static>,
386        uncached_properties: HashSet<zvariant::Str<'static>>,
387    ) -> Result<()> {
388        use futures_util::StreamExt;
389
390        trace!("Listening for property changes on {interface}...");
391        while let Some(update) = prop_changes.next().await {
392            if let Ok(args) = update.args() {
393                if args.interface_name == interface {
394                    self.update_cache(
395                        &uncached_properties,
396                        &args.changed_properties,
397                        args.invalidated_properties,
398                        &interface,
399                    );
400                }
401            }
402        }
403
404        Ok(())
405    }
406
407    fn update_cache(
408        &self,
409        uncached_properties: &HashSet<Str<'_>>,
410        changed: &HashMap<&str, Value<'_>>,
411        invalidated: Vec<&str>,
412        interface: &InterfaceName<'_>,
413    ) {
414        let mut values = self.values.write().expect("lock poisoned");
415
416        for inval in invalidated {
417            if uncached_properties.contains(&Str::from(inval)) {
418                debug!(
419                    "Ignoring invalidation of uncached property `{}.{}`",
420                    interface, inval
421                );
422                continue;
423            }
424            trace!("Property `{interface}.{inval}` invalidated");
425
426            if let Some(entry) = values.get_mut(inval) {
427                entry.value = None;
428                entry.event.notify(usize::MAX);
429            }
430        }
431
432        for (property_name, value) in changed {
433            if uncached_properties.contains(&Str::from(*property_name)) {
434                debug!(
435                    "Ignoring update of uncached property `{}.{}`",
436                    interface, property_name
437                );
438                continue;
439            }
440            trace!("Property `{interface}.{property_name}` updated");
441
442            let entry = values.entry(property_name.to_string()).or_default();
443
444            entry.value = Some(OwnedValue::from(value));
445            entry.event.notify(usize::MAX);
446        }
447    }
448
449    /// Wait for the cache to be populated and return any error encountered during population
450    pub(crate) async fn ready(&self) -> Result<()> {
451        let listener = match &*self.caching_result.read().expect("lock poisoned") {
452            CachingResult::Caching { ready } => ready.listen(),
453            CachingResult::Cached { result } => return result.clone(),
454        };
455        listener.await;
456
457        // It must be ready now.
458        match &*self.caching_result.read().expect("lock poisoned") {
459            // SAFETY: We were just notified that state has changed to `Cached` and we never go back
460            // to `Caching` once in `Cached`.
461            CachingResult::Caching { .. } => unreachable!(),
462            CachingResult::Cached { result } => result.clone(),
463        }
464    }
465}
466
467impl<'a> ProxyInner<'a> {
468    pub(crate) fn new(
469        conn: Connection,
470        destination: BusName<'a>,
471        path: ObjectPath<'a>,
472        interface: InterfaceName<'a>,
473        cache: CacheProperties,
474        uncached_properties: HashSet<Str<'a>>,
475    ) -> Self {
476        let property_cache = match cache {
477            CacheProperties::Yes | CacheProperties::Lazily => Some(OnceCell::new()),
478            CacheProperties::No => None,
479        };
480        Self {
481            inner_without_borrows: ProxyInnerStatic {
482                conn,
483                dest_owner_change_match_rule: OnceCell::new(),
484            },
485            destination,
486            path,
487            interface,
488            property_cache,
489            uncached_properties,
490        }
491    }
492
493    /// Subscribe to the "NameOwnerChanged" signal on the bus for our destination.
494    ///
495    /// If the destination is a unique name, we will not subscribe to the signal.
496    pub(crate) async fn subscribe_dest_owner_change(&self) -> Result<()> {
497        if !self.inner_without_borrows.conn.is_bus() {
498            // Names don't mean much outside the bus context.
499            return Ok(());
500        }
501
502        let well_known_name = match &self.destination {
503            BusName::WellKnown(well_known_name) => well_known_name,
504            BusName::Unique(_) => return Ok(()),
505        };
506
507        if self
508            .inner_without_borrows
509            .dest_owner_change_match_rule
510            .get()
511            .is_some()
512        {
513            // Already watching over the bus for any name updates so nothing to do here.
514            return Ok(());
515        }
516
517        let conn = &self.inner_without_borrows.conn;
518        let signal_rule: OwnedMatchRule = MatchRule::builder()
519            .msg_type(MessageType::Signal)
520            .sender("org.freedesktop.DBus")?
521            .path("/org/freedesktop/DBus")?
522            .interface("org.freedesktop.DBus")?
523            .member("NameOwnerChanged")?
524            .add_arg(well_known_name.as_str())?
525            .build()
526            .to_owned()
527            .into();
528
529        conn.add_match(
530            signal_rule.clone(),
531            Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
532        )
533        .await?;
534
535        if self
536            .inner_without_borrows
537            .dest_owner_change_match_rule
538            .set(signal_rule.clone())
539            .is_err()
540        {
541            // we raced another destination_unique_name call and added it twice
542            conn.remove_match(signal_rule).await?;
543        }
544
545        Ok(())
546    }
547}
548
549const MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED: usize = 8;
550
551impl<'a> Proxy<'a> {
552    /// Create a new `Proxy` for the given destination/path/interface.
553    pub async fn new<D, P, I>(
554        conn: &Connection,
555        destination: D,
556        path: P,
557        interface: I,
558    ) -> Result<Proxy<'a>>
559    where
560        D: TryInto<BusName<'a>>,
561        P: TryInto<ObjectPath<'a>>,
562        I: TryInto<InterfaceName<'a>>,
563        D::Error: Into<Error>,
564        P::Error: Into<Error>,
565        I::Error: Into<Error>,
566    {
567        ProxyBuilder::new_bare(conn)
568            .destination(destination)?
569            .path(path)?
570            .interface(interface)?
571            .build()
572            .await
573    }
574
575    /// Create a new `Proxy` for the given destination/path/interface, taking ownership of all
576    /// passed arguments.
577    pub async fn new_owned<D, P, I>(
578        conn: Connection,
579        destination: D,
580        path: P,
581        interface: I,
582    ) -> Result<Proxy<'a>>
583    where
584        D: TryInto<BusName<'static>>,
585        P: TryInto<ObjectPath<'static>>,
586        I: TryInto<InterfaceName<'static>>,
587        D::Error: Into<Error>,
588        P::Error: Into<Error>,
589        I::Error: Into<Error>,
590    {
591        ProxyBuilder::new_bare(&conn)
592            .destination(destination)?
593            .path(path)?
594            .interface(interface)?
595            .build()
596            .await
597    }
598
599    /// Get a reference to the associated connection.
600    pub fn connection(&self) -> &Connection {
601        &self.inner.inner_without_borrows.conn
602    }
603
604    /// Get a reference to the destination service name.
605    pub fn destination(&self) -> &BusName<'_> {
606        &self.inner.destination
607    }
608
609    /// Get a reference to the object path.
610    pub fn path(&self) -> &ObjectPath<'_> {
611        &self.inner.path
612    }
613
614    /// Get a reference to the interface.
615    pub fn interface(&self) -> &InterfaceName<'_> {
616        &self.inner.interface
617    }
618
619    /// Introspect the associated object, and return the XML description.
620    ///
621    /// See the [xml](xml/index.html) or [quick_xml](quick_xml/index.html) module for parsing the
622    /// result.
623    pub async fn introspect(&self) -> fdo::Result<String> {
624        let proxy = IntrospectableProxy::builder(&self.inner.inner_without_borrows.conn)
625            .destination(&self.inner.destination)?
626            .path(&self.inner.path)?
627            .build()
628            .await?;
629
630        proxy.introspect().await
631    }
632
633    fn properties_proxy(&self) -> PropertiesProxy<'_> {
634        PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
635            // Safe because already checked earlier
636            .destination(self.inner.destination.as_ref())
637            .unwrap()
638            // Safe because already checked earlier
639            .path(self.inner.path.as_ref())
640            .unwrap()
641            // does not have properties
642            .cache_properties(CacheProperties::No)
643            .build_internal()
644            .unwrap()
645            .into()
646    }
647
648    fn owned_properties_proxy(&self) -> PropertiesProxy<'static> {
649        PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
650            // Safe because already checked earlier
651            .destination(self.inner.destination.to_owned())
652            .unwrap()
653            // Safe because already checked earlier
654            .path(self.inner.path.to_owned())
655            .unwrap()
656            // does not have properties
657            .cache_properties(CacheProperties::No)
658            .build_internal()
659            .unwrap()
660            .into()
661    }
662
663    /// Get the cache, starting it in the background if needed.
664    ///
665    /// Use PropertiesCache::ready() to wait for the cache to be populated and to get any errors
666    /// encountered in the population.
667    pub(crate) fn get_property_cache(&self) -> Option<&Arc<PropertiesCache>> {
668        let cache = match &self.inner.property_cache {
669            Some(cache) => cache,
670            None => return None,
671        };
672        let (cache, _) = &cache.get_or_init(|| {
673            let proxy = self.owned_properties_proxy();
674            let interface = self.interface().to_owned();
675            let uncached_properties: HashSet<zvariant::Str<'static>> = self
676                .inner
677                .uncached_properties
678                .iter()
679                .map(|s| s.to_owned())
680                .collect();
681            let executor = self.connection().executor();
682
683            PropertiesCache::new(proxy, interface, executor, uncached_properties)
684        });
685
686        Some(cache)
687    }
688
689    /// Get the cached value of the property `property_name`.
690    ///
691    /// This returns `None` if the property is not in the cache.  This could be because the cache
692    /// was invalidated by an update, because caching was disabled for this property or proxy, or
693    /// because the cache has not yet been populated.  Use `get_property` to fetch the value from
694    /// the peer.
695    pub fn cached_property<T>(&self, property_name: &str) -> Result<Option<T>>
696    where
697        T: TryFrom<OwnedValue>,
698        T::Error: Into<Error>,
699    {
700        self.cached_property_raw(property_name)
701            .as_deref()
702            .map(|v| T::try_from(OwnedValue::from(v)))
703            .transpose()
704            .map_err(Into::into)
705    }
706
707    /// Get the cached value of the property `property_name`.
708    ///
709    /// Same as `cached_property`, but gives you access to the raw value stored in the cache. This
710    /// is useful if you want to avoid allocations and cloning.
711    pub fn cached_property_raw<'p>(
712        &'p self,
713        property_name: &'p str,
714    ) -> Option<impl Deref<Target = Value<'static>> + 'p> {
715        if let Some(values) = self
716            .inner
717            .property_cache
718            .as_ref()
719            .and_then(OnceCell::get)
720            .map(|c| c.0.values.read().expect("lock poisoned"))
721        {
722            // ensure that the property is in the cache.
723            values
724                .get(property_name)
725                // if the property value has not yet been cached, this will return None.
726                .and_then(|e| e.value.as_ref())?;
727
728            struct Wrapper<'a> {
729                values: RwLockReadGuard<'a, HashMap<String, PropertyValue>>,
730                property_name: &'a str,
731            }
732
733            impl Deref for Wrapper<'_> {
734                type Target = Value<'static>;
735
736                fn deref(&self) -> &Self::Target {
737                    self.values
738                        .get(self.property_name)
739                        .and_then(|e| e.value.as_ref())
740                        .map(|v| v.deref())
741                        .expect("inexistent property")
742                }
743            }
744
745            Some(Wrapper {
746                values,
747                property_name,
748            })
749        } else {
750            None
751        }
752    }
753
754    async fn get_proxy_property(&self, property_name: &str) -> Result<OwnedValue> {
755        Ok(self
756            .properties_proxy()
757            .get(self.inner.interface.as_ref(), property_name)
758            .await?)
759    }
760
761    /// Get the property `property_name`.
762    ///
763    /// Get the property value from the cache (if caching is enabled) or call the
764    /// `Get` method of the `org.freedesktop.DBus.Properties` interface.
765    pub async fn get_property<T>(&self, property_name: &str) -> Result<T>
766    where
767        T: TryFrom<OwnedValue>,
768        T::Error: Into<Error>,
769    {
770        if let Some(cache) = self.get_property_cache() {
771            cache.ready().await?;
772        }
773        if let Some(value) = self.cached_property(property_name)? {
774            return Ok(value);
775        }
776
777        let value = self.get_proxy_property(property_name).await?;
778        value.try_into().map_err(Into::into)
779    }
780
781    /// Set the property `property_name`.
782    ///
783    /// Effectively, call the `Set` method of the `org.freedesktop.DBus.Properties` interface.
784    pub async fn set_property<'t, T: 't>(&self, property_name: &str, value: T) -> fdo::Result<()>
785    where
786        T: Into<Value<'t>>,
787    {
788        self.properties_proxy()
789            .set(self.inner.interface.as_ref(), property_name, &value.into())
790            .await
791    }
792
793    /// Call a method and return the reply.
794    ///
795    /// Typically, you would want to use [`call`] method instead. Use this method if you need to
796    /// deserialize the reply message manually (this way, you can avoid the memory
797    /// allocation/copying, by deserializing the reply to an unowned type).
798    ///
799    /// [`call`]: struct.Proxy.html#method.call
800    pub async fn call_method<'m, M, B>(&self, method_name: M, body: &B) -> Result<Arc<Message>>
801    where
802        M: TryInto<MemberName<'m>>,
803        M::Error: Into<Error>,
804        B: serde::ser::Serialize + zvariant::DynamicType,
805    {
806        self.inner
807            .inner_without_borrows
808            .conn
809            .call_method(
810                Some(&self.inner.destination),
811                self.inner.path.as_str(),
812                Some(&self.inner.interface),
813                method_name,
814                body,
815            )
816            .await
817    }
818
819    /// Call a method and return the reply body.
820    ///
821    /// Use [`call_method`] instead if you need to deserialize the reply manually/separately.
822    ///
823    /// [`call_method`]: struct.Proxy.html#method.call_method
824    pub async fn call<'m, M, B, R>(&self, method_name: M, body: &B) -> Result<R>
825    where
826        M: TryInto<MemberName<'m>>,
827        M::Error: Into<Error>,
828        B: serde::ser::Serialize + zvariant::DynamicType,
829        R: serde::de::DeserializeOwned + zvariant::Type,
830    {
831        let reply = self.call_method(method_name, body).await?;
832
833        reply.body()
834    }
835
836    /// Call a method and return the reply body, optionally supplying a set of
837    /// method flags to control the way the method call message is sent and handled.
838    ///
839    /// Use [`call`] instead if you do not need any special handling via additional flags.
840    /// If the `NoReplyExpected` flag is passed , this will return None immediately
841    /// after sending the message, similar to [`call_noreply`]
842    ///
843    /// [`call`]: struct.Proxy.html#method.call
844    /// [`call_noreply`]: struct.Proxy.html#method.call_noreply
845    pub async fn call_with_flags<'m, M, B, R>(
846        &self,
847        method_name: M,
848        flags: BitFlags<MethodFlags>,
849        body: &B,
850    ) -> Result<Option<R>>
851    where
852        M: TryInto<MemberName<'m>>,
853        M::Error: Into<Error>,
854        B: serde::ser::Serialize + zvariant::DynamicType,
855        R: serde::de::DeserializeOwned + zvariant::Type,
856    {
857        let flags = flags
858            .iter()
859            .map(MessageFlags::from)
860            .collect::<BitFlags<_>>();
861        match self
862            .inner
863            .inner_without_borrows
864            .conn
865            .call_method_raw(
866                Some(self.destination()),
867                self.path(),
868                Some(self.interface()),
869                method_name,
870                flags,
871                body,
872            )
873            .await?
874        {
875            Some(reply) => reply.await?.body().map(Some),
876            None => Ok(None),
877        }
878    }
879
880    /// Call a method without expecting a reply
881    ///
882    /// This sets the `NoReplyExpected` flag on the calling message and does not wait for a reply.
883    pub async fn call_noreply<'m, M, B>(&self, method_name: M, body: &B) -> Result<()>
884    where
885        M: TryInto<MemberName<'m>>,
886        M::Error: Into<Error>,
887        B: serde::ser::Serialize + zvariant::DynamicType,
888    {
889        self.call_with_flags::<_, _, ()>(method_name, MethodFlags::NoReplyExpected.into(), body)
890            .await?;
891        Ok(())
892    }
893
894    /// Create a stream for signal named `signal_name`.
895    pub async fn receive_signal<'m, M>(&self, signal_name: M) -> Result<SignalStream<'m>>
896    where
897        M: TryInto<MemberName<'m>>,
898        M::Error: Into<Error>,
899    {
900        self.receive_signal_with_args(signal_name, &[]).await
901    }
902
903    /// Same as [`Proxy::receive_signal`] but with a filter.
904    ///
905    /// The D-Bus specification allows you to filter signals by their arguments, which helps avoid
906    /// a lot of unnecessary traffic and processing since the filter is run on the server side. Use
907    /// this method where possible. Note that this filtering is limited to arguments of string
908    /// types.
909    ///
910    /// The arguments are passed as a tuples of argument index and expected value.
911    pub async fn receive_signal_with_args<'m, M>(
912        &self,
913        signal_name: M,
914        args: &[(u8, &str)],
915    ) -> Result<SignalStream<'m>>
916    where
917        M: TryInto<MemberName<'m>>,
918        M::Error: Into<Error>,
919    {
920        let signal_name = signal_name.try_into().map_err(Into::into)?;
921        self.receive_signals(Some(signal_name), args).await
922    }
923
924    async fn receive_signals<'m>(
925        &self,
926        signal_name: Option<MemberName<'m>>,
927        args: &[(u8, &str)],
928    ) -> Result<SignalStream<'m>> {
929        self.inner.subscribe_dest_owner_change().await?;
930
931        SignalStream::new(self.clone(), signal_name, args).await
932    }
933
934    /// Create a stream for all signals emitted by this service.
935    pub async fn receive_all_signals(&self) -> Result<SignalStream<'static>> {
936        self.receive_signals(None, &[]).await
937    }
938
939    /// Get a stream to receive property changed events.
940    ///
941    /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it
942    /// will only receive the last update.
943    ///
944    /// If caching is not enabled on this proxy, the resulting stream will not return any events.
945    pub async fn receive_property_changed<'name: 'a, T>(
946        &self,
947        name: &'name str,
948    ) -> PropertyStream<'a, T> {
949        let properties = self.get_property_cache();
950        let changed_listener = if let Some(properties) = &properties {
951            let mut values = properties.values.write().expect("lock poisoned");
952            let entry = values
953                .entry(name.to_string())
954                .or_insert_with(PropertyValue::default);
955            entry.event.listen()
956        } else {
957            Event::new().listen()
958        };
959
960        PropertyStream {
961            name,
962            proxy: self.clone(),
963            changed_listener,
964            phantom: std::marker::PhantomData,
965        }
966    }
967
968    /// Get a stream to receive destination owner changed events.
969    ///
970    /// If the proxy destination is a unique name, the stream will be notified of the peer
971    /// disconnection from the bus (with a `None` value).
972    ///
973    /// If the proxy destination is a well-known name, the stream will be notified whenever the name
974    /// owner is changed, either by a new peer being granted ownership (`Some` value) or when the
975    /// name is released (with a `None` value).
976    ///
977    /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it
978    /// will only receive the last update.
979    pub async fn receive_owner_changed(&self) -> Result<OwnerChangedStream<'_>> {
980        use futures_util::StreamExt;
981        let dbus_proxy = fdo::DBusProxy::builder(self.connection())
982            .cache_properties(CacheProperties::No)
983            .build()
984            .await?;
985        Ok(OwnerChangedStream {
986            stream: dbus_proxy
987                .receive_name_owner_changed_with_args(&[(0, self.destination().as_str())])
988                .await?
989                .map(Box::new(move |signal| {
990                    let args = signal.args().unwrap();
991                    let new_owner = args.new_owner().as_ref().map(|owner| owner.to_owned());
992
993                    new_owner
994                })),
995            name: self.destination().clone(),
996        })
997    }
998}
999
1000#[derive(Debug, Default)]
1001struct PropertyValue {
1002    value: Option<OwnedValue>,
1003    event: Event,
1004}
1005
1006/// Flags to use with [`Proxy::call_with_flags`].
1007#[bitflags]
1008#[repr(u8)]
1009#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1010pub enum MethodFlags {
1011    /// No response is expected from this method call, regardless of whether the
1012    /// signature for the interface method indicates a reply type. When passed,
1013    /// `call_with_flags` will return `Ok(None)` immediately after successfully
1014    /// sending the method call.
1015    ///
1016    /// Errors encountered while *making* the call will still be returned as
1017    /// an `Err` variant, but any errors that are triggered by the receiver's
1018    /// handling of the call will not be delivered.
1019    NoReplyExpected = 0x1,
1020
1021    /// When set on a call whose destination is a message bus, this flag will instruct
1022    /// the bus not to [launch][al] a service to handle the call if no application
1023    /// on the bus owns the requested name.
1024    ///
1025    /// This flag is ignored when using a peer-to-peer connection.
1026    ///
1027    /// [al]: https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-starting-services
1028    NoAutoStart = 0x2,
1029
1030    /// Indicates to the receiver that this client is prepared to wait for interactive
1031    /// authorization, which might take a considerable time to complete. For example, the receiver
1032    /// may query the user for confirmation via [polkit] or a similar framework.
1033    ///
1034    /// [polkit]: https://gitlab.freedesktop.org/polkit/polkit/
1035    AllowInteractiveAuth = 0x4,
1036}
1037
1038assert_impl_all!(MethodFlags: Send, Sync, Unpin);
1039
1040impl From<MethodFlags> for MessageFlags {
1041    fn from(method_flag: MethodFlags) -> Self {
1042        match method_flag {
1043            MethodFlags::NoReplyExpected => Self::NoReplyExpected,
1044            MethodFlags::NoAutoStart => Self::NoAutoStart,
1045            MethodFlags::AllowInteractiveAuth => Self::AllowInteractiveAuth,
1046        }
1047    }
1048}
1049
1050type OwnerChangedStreamMap<'a> = Map<
1051    fdo::NameOwnerChangedStream<'a>,
1052    Box<dyn FnMut(fdo::NameOwnerChanged) -> Option<UniqueName<'static>> + Send + Sync + Unpin>,
1053>;
1054
1055/// A [`stream::Stream`] implementation that yields `UniqueName` when the bus owner changes.
1056///
1057/// Use [`Proxy::receive_owner_changed`] to create an instance of this type.
1058pub struct OwnerChangedStream<'a> {
1059    stream: OwnerChangedStreamMap<'a>,
1060    name: BusName<'a>,
1061}
1062
1063assert_impl_all!(OwnerChangedStream<'_>: Send, Sync, Unpin);
1064
1065impl OwnerChangedStream<'_> {
1066    /// The bus name being tracked.
1067    pub fn name(&self) -> &BusName<'_> {
1068        &self.name
1069    }
1070}
1071
1072impl<'a> stream::Stream for OwnerChangedStream<'a> {
1073    type Item = Option<UniqueName<'static>>;
1074
1075    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1076        use futures_util::StreamExt;
1077        self.get_mut().stream.poll_next_unpin(cx)
1078    }
1079}
1080
1081/// A [`stream::Stream`] implementation that yields signal [messages](`Message`).
1082///
1083/// Use [`Proxy::receive_signal`] to create an instance of this type.
1084///
1085/// This type uses a [`MessageStream::for_match_rule`] internally and therefore the note about match
1086/// rule registration and [`AsyncDrop`] in its documentation applies here as well.
1087#[derive(Debug)]
1088pub struct SignalStream<'a> {
1089    stream: Join<MessageStream, Option<MessageStream>>,
1090    src_unique_name: Option<UniqueName<'static>>,
1091    signal_name: Option<MemberName<'a>>,
1092}
1093
1094impl<'a> SignalStream<'a> {
1095    /// The signal name.
1096    pub fn name(&self) -> Option<&MemberName<'a>> {
1097        self.signal_name.as_ref()
1098    }
1099
1100    async fn new(
1101        proxy: Proxy<'_>,
1102        signal_name: Option<MemberName<'a>>,
1103        args: &[(u8, &str)],
1104    ) -> Result<SignalStream<'a>> {
1105        let mut rule_builder = MatchRule::builder()
1106            .msg_type(MessageType::Signal)
1107            .sender(proxy.destination())?
1108            .path(proxy.path())?
1109            .interface(proxy.interface())?;
1110        if let Some(name) = &signal_name {
1111            rule_builder = rule_builder.member(name)?;
1112        }
1113        for (i, arg) in args {
1114            rule_builder = rule_builder.arg(*i, *arg)?;
1115        }
1116        let signal_rule: OwnedMatchRule = rule_builder.build().to_owned().into();
1117        let conn = proxy.connection();
1118
1119        let (src_unique_name, stream) = match proxy.destination().to_owned() {
1120            BusName::Unique(name) => (
1121                Some(name),
1122                join_streams(
1123                    MessageStream::for_match_rule(signal_rule, conn, None).await?,
1124                    None,
1125                ),
1126            ),
1127            BusName::WellKnown(name) => {
1128                use ordered_stream::OrderedStreamExt;
1129
1130                let name_owner_changed_rule = MatchRule::builder()
1131                    .msg_type(MessageType::Signal)
1132                    .sender("org.freedesktop.DBus")?
1133                    .path("/org/freedesktop/DBus")?
1134                    .interface("org.freedesktop.DBus")?
1135                    .member("NameOwnerChanged")?
1136                    .add_arg(name.as_str())?
1137                    .build();
1138                let name_owner_changed_stream = MessageStream::for_match_rule(
1139                    name_owner_changed_rule,
1140                    conn,
1141                    Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
1142                )
1143                .await?
1144                .map(Either::Left);
1145
1146                let get_name_owner = conn
1147                    .call_method_raw(
1148                        Some("org.freedesktop.DBus"),
1149                        "/org/freedesktop/DBus",
1150                        Some("org.freedesktop.DBus"),
1151                        "GetNameOwner",
1152                        BitFlags::empty(),
1153                        &name,
1154                    )
1155                    .await
1156                    .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
1157
1158                let mut join = join_streams(name_owner_changed_stream, get_name_owner);
1159
1160                let mut src_unique_name = loop {
1161                    match join.next().await {
1162                        Some(Either::Left(Ok(msg))) => {
1163                            let signal = NameOwnerChanged::from_message(msg)
1164                                .expect("`NameOwnerChanged` signal stream got wrong message");
1165                            {
1166                                break signal
1167                                    .args()
1168                                    // SAFETY: The filtering code couldn't have let this through if
1169                                    // args were not in order.
1170                                    .expect("`NameOwnerChanged` signal has no args")
1171                                    .new_owner()
1172                                    .as_ref()
1173                                    .map(UniqueName::to_owned);
1174                            }
1175                        }
1176                        Some(Either::Left(Err(_))) => (),
1177                        Some(Either::Right(Ok(response))) => {
1178                            break Some(response.body::<UniqueName<'_>>()?.to_owned())
1179                        }
1180                        Some(Either::Right(Err(e))) => {
1181                            // Probably the name is not owned. Not a problem but let's still log it.
1182                            debug!("Failed to get owner of {name}: {e}");
1183
1184                            break None;
1185                        }
1186                        None => {
1187                            return Err(Error::InputOutput(
1188                                std::io::Error::new(
1189                                    std::io::ErrorKind::BrokenPipe,
1190                                    "connection closed",
1191                                )
1192                                .into(),
1193                            ))
1194                        }
1195                    }
1196                };
1197
1198                // Let's take into account any buffered NameOwnerChanged signal.
1199                let (stream, _, queued) = join.into_inner();
1200                if let Some(msg) = queued.and_then(|e| match e.0 {
1201                    Either::Left(Ok(msg)) => Some(msg),
1202                    Either::Left(Err(_)) | Either::Right(_) => None,
1203                }) {
1204                    if let Some(signal) = NameOwnerChanged::from_message(msg) {
1205                        if let Ok(args) = signal.args() {
1206                            match (args.name(), args.new_owner().deref()) {
1207                                (BusName::WellKnown(n), Some(new_owner)) if n == &name => {
1208                                    src_unique_name = Some(new_owner.to_owned());
1209                                }
1210                                _ => (),
1211                            }
1212                        }
1213                    }
1214                }
1215                let name_owner_changed_stream = stream.into_inner();
1216
1217                let stream = join_streams(
1218                    MessageStream::for_match_rule(signal_rule, conn, None).await?,
1219                    Some(name_owner_changed_stream),
1220                );
1221
1222                (src_unique_name, stream)
1223            }
1224        };
1225
1226        Ok(SignalStream {
1227            stream,
1228            src_unique_name,
1229            signal_name,
1230        })
1231    }
1232
1233    fn filter(&mut self, msg: &Arc<Message>) -> Result<bool> {
1234        let header = msg.header()?;
1235        let sender = header.sender()?;
1236        if sender == self.src_unique_name.as_ref() {
1237            return Ok(true);
1238        }
1239
1240        // The src_unique_name must be maintained in lock-step with the applied filter
1241        if let Some(signal) = NameOwnerChanged::from_message(msg.clone()) {
1242            let args = signal.args()?;
1243            self.src_unique_name = args.new_owner().as_ref().map(|n| n.to_owned());
1244        }
1245
1246        Ok(false)
1247    }
1248}
1249
1250assert_impl_all!(SignalStream<'_>: Send, Sync, Unpin);
1251
1252impl<'a> stream::Stream for SignalStream<'a> {
1253    type Item = Arc<Message>;
1254
1255    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1256        OrderedStream::poll_next_before(self, cx, None).map(|res| res.into_data())
1257    }
1258}
1259
1260impl<'a> OrderedStream for SignalStream<'a> {
1261    type Data = Arc<Message>;
1262    type Ordering = MessageSequence;
1263
1264    fn poll_next_before(
1265        self: Pin<&mut Self>,
1266        cx: &mut Context<'_>,
1267        before: Option<&Self::Ordering>,
1268    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
1269        let this = self.get_mut();
1270        loop {
1271            match ready!(OrderedStream::poll_next_before(
1272                Pin::new(&mut this.stream),
1273                cx,
1274                before
1275            )) {
1276                PollResult::Item { data, ordering } => {
1277                    if let Ok(msg) = data {
1278                        if let Ok(true) = this.filter(&msg) {
1279                            return Poll::Ready(PollResult::Item {
1280                                data: msg,
1281                                ordering,
1282                            });
1283                        }
1284                    }
1285                }
1286                PollResult::Terminated => return Poll::Ready(PollResult::Terminated),
1287                PollResult::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
1288            }
1289        }
1290    }
1291}
1292
1293impl<'a> stream::FusedStream for SignalStream<'a> {
1294    fn is_terminated(&self) -> bool {
1295        ordered_stream::FusedOrderedStream::is_terminated(&self.stream)
1296    }
1297}
1298
1299#[async_trait::async_trait]
1300impl AsyncDrop for SignalStream<'_> {
1301    async fn async_drop(self) {
1302        let (signals, names, _buffered) = self.stream.into_inner();
1303        signals.async_drop().await;
1304        if let Some(names) = names {
1305            names.async_drop().await;
1306        }
1307    }
1308}
1309
1310impl<'a> From<crate::blocking::Proxy<'a>> for Proxy<'a> {
1311    fn from(proxy: crate::blocking::Proxy<'a>) -> Self {
1312        proxy.into_inner()
1313    }
1314}
1315
1316#[cfg(test)]
1317mod tests {
1318    use super::*;
1319    use crate::{dbus_interface, dbus_proxy, utils::block_on, ConnectionBuilder, SignalContext};
1320    use futures_util::StreamExt;
1321    use ntest::timeout;
1322    use test_log::test;
1323
1324    #[test]
1325    #[timeout(15000)]
1326    fn signal() {
1327        block_on(test_signal()).unwrap();
1328    }
1329
1330    async fn test_signal() -> Result<()> {
1331        // Register a well-known name with the session bus and ensure we get the appropriate
1332        // signals called for that.
1333        let conn = Connection::session().await?;
1334        let dest_conn = Connection::session().await?;
1335        let unique_name = dest_conn.unique_name().unwrap().clone();
1336
1337        let well_known = "org.freedesktop.zbus.async.ProxySignalStreamTest";
1338        let proxy: Proxy<'_> = ProxyBuilder::new_bare(&conn)
1339            .destination(well_known)?
1340            .path("/does/not/matter")?
1341            .interface("does.not.matter")?
1342            .build()
1343            .await?;
1344        let mut owner_changed_stream = proxy.receive_owner_changed().await?;
1345
1346        let proxy = fdo::DBusProxy::new(&dest_conn).await?;
1347        let mut name_acquired_stream = proxy
1348            .receive_signal_with_args("NameAcquired", &[(0, well_known)])
1349            .await?;
1350
1351        let prop_stream =
1352            proxy
1353                .receive_property_changed("SomeProp")
1354                .await
1355                .filter_map(|changed| async move {
1356                    let v: Option<u32> = changed.get().await.ok();
1357                    dbg!(v)
1358                });
1359        drop(proxy);
1360        drop(prop_stream);
1361
1362        dest_conn.request_name(well_known).await?;
1363
1364        let (new_owner, acquired_signal) =
1365            futures_util::join!(owner_changed_stream.next(), name_acquired_stream.next(),);
1366
1367        assert_eq!(&new_owner.unwrap().unwrap(), &*unique_name);
1368
1369        let acquired_signal = acquired_signal.unwrap();
1370        assert_eq!(acquired_signal.body::<&str>().unwrap(), well_known);
1371
1372        let proxy = Proxy::new(&conn, &unique_name, "/does/not/matter", "does.not.matter").await?;
1373        let mut unique_name_changed_stream = proxy.receive_owner_changed().await?;
1374
1375        drop(dest_conn);
1376        name_acquired_stream.async_drop().await;
1377
1378        // There shouldn't be an owner anymore.
1379        let new_owner = owner_changed_stream.next().await;
1380        assert!(new_owner.unwrap().is_none());
1381
1382        let new_unique_owner = unique_name_changed_stream.next().await;
1383        assert!(new_unique_owner.unwrap().is_none());
1384
1385        Ok(())
1386    }
1387
1388    #[test]
1389    #[timeout(15000)]
1390    fn signal_stream_deadlock() {
1391        block_on(test_signal_stream_deadlock()).unwrap();
1392    }
1393
1394    /// Tests deadlocking in signal reception when the message queue is full.
1395    ///
1396    /// Creates a connection with a small message queue, and a service that
1397    /// emits signals at a high rate. First a listener is created that listens
1398    /// for that signal which should fill the small queue. Then another signal
1399    /// signal listener is created against another signal. Previously, this second
1400    /// call to add the match rule never resolved and resulted in a deadlock.
1401    async fn test_signal_stream_deadlock() -> Result<()> {
1402        #[dbus_proxy(
1403            gen_blocking = false,
1404            default_path = "/org/zbus/Test",
1405            default_service = "org.zbus.Test.MR501",
1406            interface = "org.zbus.Test"
1407        )]
1408        trait Test {
1409            #[dbus_proxy(signal)]
1410            fn my_signal(&self, msg: &str) -> Result<()>;
1411        }
1412
1413        struct TestIface;
1414
1415        #[dbus_interface(name = "org.zbus.Test")]
1416        impl TestIface {
1417            #[dbus_interface(signal)]
1418            async fn my_signal(context: &SignalContext<'_>, msg: &'static str) -> Result<()>;
1419        }
1420
1421        let test_iface = TestIface;
1422        let server_conn = ConnectionBuilder::session()?
1423            .name("org.zbus.Test.MR501")?
1424            .serve_at("/org/zbus/Test", test_iface)?
1425            .build()
1426            .await?;
1427
1428        let client_conn = ConnectionBuilder::session()?.max_queued(1).build().await?;
1429
1430        let test_proxy = TestProxy::new(&client_conn).await?;
1431        let test_prop_proxy = PropertiesProxy::builder(&client_conn)
1432            .destination("org.zbus.Test.MR501")?
1433            .path("/org/zbus/Test")?
1434            .build()
1435            .await?;
1436
1437        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1438
1439        let handle = {
1440            let tx = tx.clone();
1441            let conn = server_conn.clone();
1442            let server_fut = async move {
1443                use std::time::Duration;
1444
1445                #[cfg(not(feature = "tokio"))]
1446                use async_io::Timer;
1447
1448                #[cfg(feature = "tokio")]
1449                use tokio::time::sleep;
1450
1451                let iface_ref = conn
1452                    .object_server()
1453                    .interface::<_, TestIface>("/org/zbus/Test")
1454                    .await
1455                    .unwrap();
1456
1457                let context = iface_ref.signal_context();
1458                while !tx.is_closed() {
1459                    for _ in 0..10 {
1460                        TestIface::my_signal(context, "This is a test")
1461                            .await
1462                            .unwrap();
1463                    }
1464
1465                    #[cfg(not(feature = "tokio"))]
1466                    Timer::after(Duration::from_millis(5)).await;
1467
1468                    #[cfg(feature = "tokio")]
1469                    sleep(Duration::from_millis(5)).await;
1470                }
1471            };
1472            server_conn.executor().spawn(server_fut, "server_task")
1473        };
1474
1475        let signal_fut = async {
1476            let mut signal_stream = test_proxy.receive_my_signal().await.unwrap();
1477
1478            tx.send(()).await.unwrap();
1479
1480            while let Some(_signal) = signal_stream.next().await {}
1481        };
1482
1483        let prop_fut = async move {
1484            rx.recv().await.unwrap();
1485            let _prop_stream = test_prop_proxy.receive_properties_changed().await.unwrap();
1486        };
1487
1488        futures_util::pin_mut!(signal_fut);
1489        futures_util::pin_mut!(prop_fut);
1490
1491        futures_util::future::select(signal_fut, prop_fut).await;
1492
1493        handle.await;
1494
1495        Ok(())
1496    }
1497}