zbus/
lib.rs

1#![deny(rust_2018_idioms)]
2#![doc(
3    html_logo_url = "https://storage.googleapis.com/fdo-gitlab-uploads/project/avatar/3213/zbus-logomark.png"
4)]
5#![doc = include_str!("../README.md")]
6#![doc(test(attr(
7    warn(unused),
8    deny(warnings),
9    // W/o this, we seem to get some bogus warning about `extern crate zbus`.
10    allow(unused_extern_crates),
11)))]
12
13#[cfg(doctest)]
14mod doctests {
15    // Book markdown checks
16    doc_comment::doctest!("../../book/src/client.md");
17    doc_comment::doctest!("../../book/src/concepts.md");
18    doc_comment::doctest!("../../book/src/connection.md");
19    doc_comment::doctest!("../../book/src/contributors.md");
20    doc_comment::doctest!("../../book/src/introduction.md");
21    doc_comment::doctest!("../../book/src/server.md");
22    doc_comment::doctest!("../../book/src/blocking.md");
23    doc_comment::doctest!("../../book/src/faq.md");
24}
25
26#[cfg(all(not(feature = "async-io"), not(feature = "tokio")))]
27mod error_message {
28    #[cfg(windows)]
29    compile_error!("Either \"async-io\" (default) or \"tokio\" must be enabled. On Windows \"async-io\" is (currently) required for UNIX socket support");
30
31    #[cfg(not(windows))]
32    compile_error!("Either \"async-io\" (default) or \"tokio\" must be enabled.");
33}
34
35#[cfg(windows)]
36mod win32;
37
38mod dbus_error;
39pub use dbus_error::*;
40
41mod error;
42pub use error::*;
43
44mod address;
45pub use address::*;
46
47mod guid;
48pub use guid::*;
49
50mod message;
51pub use message::*;
52
53mod message_builder;
54pub use message_builder::*;
55
56mod message_header;
57pub use message_header::*;
58
59mod message_field;
60pub use message_field::*;
61
62mod message_fields;
63pub use message_fields::*;
64
65mod handshake;
66pub use handshake::AuthMechanism;
67pub(crate) use handshake::*;
68
69mod connection;
70pub use connection::*;
71mod connection_builder;
72pub use connection_builder::*;
73mod message_stream;
74pub use message_stream::*;
75mod object_server;
76pub use object_server::*;
77mod proxy;
78pub use proxy::*;
79mod proxy_builder;
80pub use proxy_builder::*;
81mod signal_context;
82pub use signal_context::*;
83mod interface;
84pub use interface::*;
85mod abstractions;
86pub use abstractions::*;
87mod match_rule;
88pub use match_rule::*;
89mod match_rule_builder;
90pub use match_rule_builder::*;
91mod socket_reader;
92
93mod utils;
94pub use utils::*;
95
96#[macro_use]
97pub mod fdo;
98
99mod raw;
100pub use raw::Socket;
101
102pub mod blocking;
103
104#[cfg(feature = "xml")]
105pub mod xml;
106
107#[cfg(feature = "quick-xml")]
108pub mod quick_xml;
109
110pub use zbus_macros::{dbus_interface, dbus_proxy, DBusError};
111
112// Required for the macros to function within this crate.
113extern crate self as zbus;
114
115// Macro support module, not part of the public API.
116#[doc(hidden)]
117pub mod export {
118    pub use async_trait;
119    pub use futures_core;
120    pub use futures_util;
121    pub use ordered_stream;
122    pub use serde;
123    pub use static_assertions;
124}
125
126pub use zbus_names as names;
127pub use zvariant;
128
129#[cfg(unix)]
130use zvariant::OwnedFd;
131
132#[cfg(test)]
133mod tests {
134    use std::{
135        collections::HashMap,
136        convert::{TryFrom, TryInto},
137        sync::{mpsc::channel, Arc, Condvar, Mutex},
138    };
139    #[cfg(unix)]
140    use std::{
141        fs::File,
142        os::unix::io::{AsRawFd, FromRawFd},
143    };
144
145    use crate::utils::block_on;
146    use enumflags2::BitFlags;
147    use ntest::timeout;
148    use test_log::test;
149    use tracing::{debug, instrument, trace};
150
151    use zbus_names::UniqueName;
152    #[cfg(unix)]
153    use zvariant::Fd;
154    use zvariant::{OwnedObjectPath, OwnedValue, Type};
155
156    use crate::{
157        blocking::{self, MessageIterator},
158        fdo::{RequestNameFlags, RequestNameReply},
159        Connection, Message, MessageFlags, Result, SignalContext,
160    };
161
162    fn is_gdbus_test() -> bool {
163        std::env::var_os("ZBUS_GDBUS_TEST").is_some()
164    }
165
166    #[test]
167    fn msg() {
168        let mut m = Message::method(
169            None::<()>,
170            Some("org.freedesktop.DBus"),
171            "/org/freedesktop/DBus",
172            Some("org.freedesktop.DBus.Peer"),
173            "GetMachineId",
174            &(),
175        )
176        .unwrap();
177        assert_eq!(m.path().unwrap(), "/org/freedesktop/DBus");
178        assert_eq!(m.interface().unwrap(), "org.freedesktop.DBus.Peer");
179        assert_eq!(m.member().unwrap(), "GetMachineId");
180        m.modify_primary_header(|primary| {
181            primary.set_flags(BitFlags::from(MessageFlags::NoAutoStart));
182            primary.serial_num_or_init(|| 11);
183
184            Ok(())
185        })
186        .unwrap();
187        let primary = m.primary_header();
188        assert!(*primary.serial_num().unwrap() == 11);
189        assert!(primary.flags() == MessageFlags::NoAutoStart);
190    }
191
192    #[test]
193    #[timeout(15000)]
194    #[instrument]
195    fn basic_connection() {
196        let connection = blocking::Connection::session()
197            .map_err(|e| {
198                debug!("error: {}", e);
199
200                e
201            })
202            .unwrap();
203        // Hello method is already called during connection creation so subsequent calls are
204        // expected to fail but only with a D-Bus error.
205        match connection.call_method(
206            Some("org.freedesktop.DBus"),
207            "/org/freedesktop/DBus",
208            Some("org.freedesktop.DBus"),
209            "Hello",
210            &(),
211        ) {
212            Err(crate::Error::MethodError(_, _, _)) => (),
213            Err(e) => panic!("{}", e),
214
215            // GDBus allows the method to be called multiple times
216            Ok(_) if is_gdbus_test() => (),
217
218            _ => panic!(),
219        };
220    }
221
222    #[test]
223    #[timeout(15000)]
224    fn basic_connection_async() {
225        block_on(test_basic_connection()).unwrap();
226    }
227
228    async fn test_basic_connection() -> Result<()> {
229        let connection = Connection::session().await?;
230
231        match connection
232            .call_method(
233                Some("org.freedesktop.DBus"),
234                "/org/freedesktop/DBus",
235                Some("org.freedesktop.DBus"),
236                "Hello",
237                &(),
238            )
239            .await
240        {
241            Err(crate::Error::MethodError(_, _, _)) => (),
242            Err(e) => panic!("{}", e),
243
244            // GDBus allows the method to be called multiple times
245            Ok(_) if is_gdbus_test() => (),
246
247            _ => panic!(),
248        };
249
250        Ok(())
251    }
252
253    #[cfg(all(unix, not(target_os = "macos")))]
254    #[test]
255    #[timeout(15000)]
256    fn fdpass_systemd() {
257        let connection = blocking::Connection::system().unwrap();
258
259        let reply = connection
260            .call_method(
261                Some("org.freedesktop.systemd1"),
262                "/org/freedesktop/systemd1",
263                Some("org.freedesktop.systemd1.Manager"),
264                "DumpByFileDescriptor",
265                &(),
266            )
267            .unwrap();
268
269        assert!(reply
270            .body_signature()
271            .map(|s| s == <Fd>::signature())
272            .unwrap());
273
274        let fd: Fd = reply.body().unwrap();
275        let _fds = reply.take_fds();
276        assert!(fd.as_raw_fd() >= 0);
277        let f = unsafe { File::from_raw_fd(fd.as_raw_fd()) };
278        f.metadata().unwrap();
279    }
280
281    #[test]
282    #[instrument]
283    #[timeout(15000)]
284    fn freedesktop_api() {
285        let connection = blocking::Connection::session()
286            .map_err(|e| {
287                debug!("error: {}", e);
288
289                e
290            })
291            .unwrap();
292
293        let reply = connection
294            .call_method(
295                Some("org.freedesktop.DBus"),
296                "/org/freedesktop/DBus",
297                Some("org.freedesktop.DBus"),
298                "RequestName",
299                &(
300                    "org.freedesktop.zbus.sync",
301                    BitFlags::from(RequestNameFlags::ReplaceExisting),
302                ),
303            )
304            .unwrap();
305
306        assert!(reply.body_signature().map(|s| s == "u").unwrap());
307        let reply: RequestNameReply = reply.body().unwrap();
308        assert_eq!(reply, RequestNameReply::PrimaryOwner);
309
310        let reply = connection
311            .call_method(
312                Some("org.freedesktop.DBus"),
313                "/org/freedesktop/DBus",
314                Some("org.freedesktop.DBus"),
315                "GetId",
316                &(),
317            )
318            .unwrap();
319
320        assert!(reply
321            .body_signature()
322            .map(|s| s == <&str>::signature())
323            .unwrap());
324        let id: &str = reply.body().unwrap();
325        debug!("Unique ID of the bus: {}", id);
326
327        let reply = connection
328            .call_method(
329                Some("org.freedesktop.DBus"),
330                "/org/freedesktop/DBus",
331                Some("org.freedesktop.DBus"),
332                "NameHasOwner",
333                &"org.freedesktop.zbus.sync",
334            )
335            .unwrap();
336
337        assert!(reply
338            .body_signature()
339            .map(|s| s == bool::signature())
340            .unwrap());
341        assert!(reply.body::<bool>().unwrap());
342
343        let reply = connection
344            .call_method(
345                Some("org.freedesktop.DBus"),
346                "/org/freedesktop/DBus",
347                Some("org.freedesktop.DBus"),
348                "GetNameOwner",
349                &"org.freedesktop.zbus.sync",
350            )
351            .unwrap();
352
353        assert!(reply
354            .body_signature()
355            .map(|s| s == <&str>::signature())
356            .unwrap());
357        assert_eq!(
358            reply.body::<UniqueName<'_>>().unwrap(),
359            *connection.unique_name().unwrap(),
360        );
361
362        // GDBus doesn't provide this method
363        if is_gdbus_test() {
364            return;
365        }
366
367        let reply = connection
368            .call_method(
369                Some("org.freedesktop.DBus"),
370                "/org/freedesktop/DBus",
371                Some("org.freedesktop.DBus"),
372                "GetConnectionCredentials",
373                &"org.freedesktop.DBus",
374            )
375            .unwrap();
376
377        assert!(reply.body_signature().map(|s| s == "a{sv}").unwrap());
378        let hashmap: HashMap<&str, OwnedValue> = reply.body().unwrap();
379
380        let pid: u32 = (&hashmap["ProcessID"]).try_into().unwrap();
381        debug!("DBus bus PID: {}", pid);
382
383        #[cfg(unix)]
384        {
385            let uid: u32 = (&hashmap["UnixUserID"]).try_into().unwrap();
386            debug!("DBus bus UID: {}", uid);
387        }
388    }
389
390    #[test]
391    #[timeout(15000)]
392    fn freedesktop_api_async() {
393        block_on(test_freedesktop_api()).unwrap();
394    }
395
396    #[instrument]
397    async fn test_freedesktop_api() -> Result<()> {
398        let connection = Connection::session().await?;
399
400        let reply = connection
401            .call_method(
402                Some("org.freedesktop.DBus"),
403                "/org/freedesktop/DBus",
404                Some("org.freedesktop.DBus"),
405                "RequestName",
406                &(
407                    "org.freedesktop.zbus.async",
408                    BitFlags::from(RequestNameFlags::ReplaceExisting),
409                ),
410            )
411            .await
412            .unwrap();
413
414        assert!(reply.body_signature().map(|s| s == "u").unwrap());
415        let reply: RequestNameReply = reply.body().unwrap();
416        assert_eq!(reply, RequestNameReply::PrimaryOwner);
417
418        let reply = connection
419            .call_method(
420                Some("org.freedesktop.DBus"),
421                "/org/freedesktop/DBus",
422                Some("org.freedesktop.DBus"),
423                "GetId",
424                &(),
425            )
426            .await
427            .unwrap();
428
429        assert!(reply
430            .body_signature()
431            .map(|s| s == <&str>::signature())
432            .unwrap());
433        let id: &str = reply.body().unwrap();
434        debug!("Unique ID of the bus: {}", id);
435
436        let reply = connection
437            .call_method(
438                Some("org.freedesktop.DBus"),
439                "/org/freedesktop/DBus",
440                Some("org.freedesktop.DBus"),
441                "NameHasOwner",
442                &"org.freedesktop.zbus.async",
443            )
444            .await
445            .unwrap();
446
447        assert!(reply
448            .body_signature()
449            .map(|s| s == bool::signature())
450            .unwrap());
451        assert!(reply.body::<bool>().unwrap());
452
453        let reply = connection
454            .call_method(
455                Some("org.freedesktop.DBus"),
456                "/org/freedesktop/DBus",
457                Some("org.freedesktop.DBus"),
458                "GetNameOwner",
459                &"org.freedesktop.zbus.async",
460            )
461            .await
462            .unwrap();
463
464        assert!(reply
465            .body_signature()
466            .map(|s| s == <&str>::signature())
467            .unwrap());
468        assert_eq!(
469            reply.body::<UniqueName<'_>>().unwrap(),
470            *connection.unique_name().unwrap(),
471        );
472
473        // GDBus doesn't provide this method
474        if is_gdbus_test() {
475            return Ok(());
476        }
477
478        let reply = connection
479            .call_method(
480                Some("org.freedesktop.DBus"),
481                "/org/freedesktop/DBus",
482                Some("org.freedesktop.DBus"),
483                "GetConnectionCredentials",
484                &"org.freedesktop.DBus",
485            )
486            .await
487            .unwrap();
488
489        assert!(reply.body_signature().map(|s| s == "a{sv}").unwrap());
490        let hashmap: HashMap<&str, OwnedValue> = reply.body().unwrap();
491
492        let pid: u32 = (&hashmap["ProcessID"]).try_into().unwrap();
493        debug!("DBus bus PID: {}", pid);
494
495        #[cfg(unix)]
496        {
497            let uid: u32 = (&hashmap["UnixUserID"]).try_into().unwrap();
498            debug!("DBus bus UID: {}", uid);
499        }
500
501        Ok(())
502    }
503
504    #[test]
505    #[timeout(15000)]
506    fn issue_68() {
507        // Tests the fix for https://github.com/dbus2/zbus/issues/68
508        //
509        // While this is not an exact reproduction of the issue 68, the underlying problem it
510        // produces is exactly the same: `Connection::call_method` dropping all incoming messages
511        // while waiting for the reply to the method call.
512        let conn = blocking::Connection::session().unwrap();
513        let stream = MessageIterator::from(&conn);
514
515        // Send a message as client before service starts to process messages
516        let client_conn = blocking::Connection::session().unwrap();
517        let destination = conn.unique_name().map(UniqueName::<'_>::from);
518        let msg = Message::method(
519            None::<()>,
520            destination,
521            "/org/freedesktop/Issue68",
522            Some("org.freedesktop.Issue68"),
523            "Ping",
524            &(),
525        )
526        .unwrap();
527        let serial = client_conn.send_message(msg).unwrap();
528
529        crate::blocking::fdo::DBusProxy::new(&conn)
530            .unwrap()
531            .get_id()
532            .unwrap();
533
534        for m in stream {
535            let msg = m.unwrap();
536
537            if *msg.primary_header().serial_num().unwrap() == serial {
538                break;
539            }
540        }
541    }
542
543    #[test]
544    #[timeout(15000)]
545    fn issue104() {
546        // Tests the fix for https://github.com/dbus2/zbus/issues/104
547        //
548        // The issue is caused by `dbus_proxy` macro adding `()` around the return value of methods
549        // with multiple out arguments, ending up with double parenthesis around the signature of
550        // the return type and zbus only removing the outer `()` only and then it not matching the
551        // signature we receive on the reply message.
552        use zvariant::{ObjectPath, Value};
553
554        struct Secret;
555        #[super::dbus_interface(name = "org.freedesktop.Secret.Service")]
556        impl Secret {
557            fn open_session(
558                &self,
559                _algorithm: &str,
560                input: Value<'_>,
561            ) -> zbus::fdo::Result<(OwnedValue, OwnedObjectPath)> {
562                Ok((
563                    OwnedValue::from(input),
564                    ObjectPath::try_from("/org/freedesktop/secrets/Blah")
565                        .unwrap()
566                        .into(),
567                ))
568            }
569        }
570
571        let secret = Secret;
572        let conn = blocking::ConnectionBuilder::session()
573            .unwrap()
574            .serve_at("/org/freedesktop/secrets", secret)
575            .unwrap()
576            .build()
577            .unwrap();
578        let service_name = conn.unique_name().unwrap().clone();
579
580        {
581            let conn = blocking::Connection::session().unwrap();
582            #[super::dbus_proxy(
583                interface = "org.freedesktop.Secret.Service",
584                assume_defaults = true,
585                gen_async = false
586            )]
587            trait Secret {
588                fn open_session(
589                    &self,
590                    algorithm: &str,
591                    input: &zvariant::Value<'_>,
592                ) -> zbus::Result<(OwnedValue, OwnedObjectPath)>;
593            }
594
595            let proxy = SecretProxy::builder(&conn)
596                .destination(UniqueName::from(service_name))
597                .unwrap()
598                .path("/org/freedesktop/secrets")
599                .unwrap()
600                .build()
601                .unwrap();
602
603            trace!("Calling open_session");
604            proxy.open_session("plain", &Value::from("")).unwrap();
605            trace!("Called open_session");
606        };
607    }
608
609    // This one we just want to see if it builds, no need to run it. For details see:
610    //
611    // https://github.com/dbus2/zbus/issues/121
612    #[test]
613    #[ignore]
614    fn issue_121() {
615        use crate::dbus_proxy;
616
617        #[dbus_proxy(interface = "org.freedesktop.IBus", assume_defaults = true)]
618        trait IBus {
619            /// CurrentInputContext property
620            #[dbus_proxy(property)]
621            fn current_input_context(&self) -> zbus::Result<OwnedObjectPath>;
622
623            /// Engines property
624            #[dbus_proxy(property)]
625            fn engines(&self) -> zbus::Result<Vec<zvariant::OwnedValue>>;
626        }
627    }
628
629    #[test]
630    #[timeout(15000)]
631    fn issue_122() {
632        let conn = blocking::Connection::session().unwrap();
633        let stream = MessageIterator::from(&conn);
634
635        #[allow(clippy::mutex_atomic)]
636        let pair = Arc::new((Mutex::new(false), Condvar::new()));
637        let pair2 = Arc::clone(&pair);
638
639        let child = std::thread::spawn(move || {
640            {
641                let (lock, cvar) = &*pair2;
642                let mut started = lock.lock().unwrap();
643                *started = true;
644                cvar.notify_one();
645            }
646
647            for m in stream {
648                let msg = m.unwrap();
649                let hdr = msg.header().unwrap();
650
651                if hdr.member().unwrap().map(|m| m.as_str()) == Some("ZBusIssue122") {
652                    break;
653                }
654            }
655        });
656
657        // Wait for the receiving thread to start up.
658        let (lock, cvar) = &*pair;
659        let mut started = lock.lock().unwrap();
660        while !*started {
661            started = cvar.wait(started).unwrap();
662        }
663        // Still give it some milliseconds to ensure it's already blocking on receive_message call
664        // when we send a message.
665        std::thread::sleep(std::time::Duration::from_millis(100));
666
667        let destination = conn.unique_name().map(UniqueName::<'_>::from);
668        let msg = Message::method(
669            None::<()>,
670            destination,
671            "/does/not/matter",
672            None::<()>,
673            "ZBusIssue122",
674            &(),
675        )
676        .unwrap();
677        conn.send_message(msg).unwrap();
678
679        child.join().unwrap();
680    }
681
682    #[test]
683    #[ignore]
684    fn issue_81() {
685        use zbus::dbus_proxy;
686        use zvariant::{OwnedValue, Type};
687
688        #[derive(
689            Debug, PartialEq, Eq, Clone, Type, OwnedValue, serde::Serialize, serde::Deserialize,
690        )]
691        pub struct DbusPath {
692            id: String,
693            path: OwnedObjectPath,
694        }
695
696        #[dbus_proxy(assume_defaults = true)]
697        trait Session {
698            #[dbus_proxy(property)]
699            fn sessions_tuple(&self) -> zbus::Result<(String, String)>;
700
701            #[dbus_proxy(property)]
702            fn sessions_struct(&self) -> zbus::Result<DbusPath>;
703        }
704    }
705
706    #[test]
707    #[timeout(15000)]
708    fn issue173() {
709        // Tests the fix for https://github.com/dbus2/zbus/issues/173
710        //
711        // The issue is caused by proxy not keeping track of its destination's owner changes
712        // (service restart) and failing to receive signals as a result.
713        let (tx, rx) = channel();
714        let child = std::thread::spawn(move || {
715            let conn = blocking::Connection::session().unwrap();
716            #[super::dbus_proxy(
717                interface = "org.freedesktop.zbus.ComeAndGo",
718                default_service = "org.freedesktop.zbus.ComeAndGo",
719                default_path = "/org/freedesktop/zbus/ComeAndGo"
720            )]
721            trait ComeAndGo {
722                #[dbus_proxy(signal)]
723                fn the_signal(&self) -> zbus::Result<()>;
724            }
725
726            let proxy = ComeAndGoProxyBlocking::new(&conn).unwrap();
727            let signals = proxy.receive_the_signal().unwrap();
728            tx.send(()).unwrap();
729
730            // We receive two signals, each time from different unique names. W/o the fix for
731            // issue#173, the second iteration hangs.
732            for _ in signals.take(2) {
733                tx.send(()).unwrap();
734            }
735        });
736
737        struct ComeAndGo;
738        #[super::dbus_interface(name = "org.freedesktop.zbus.ComeAndGo")]
739        impl ComeAndGo {
740            #[dbus_interface(signal)]
741            async fn the_signal(signal_ctxt: &SignalContext<'_>) -> zbus::Result<()>;
742        }
743
744        rx.recv().unwrap();
745        for _ in 0..2 {
746            let conn = blocking::ConnectionBuilder::session()
747                .unwrap()
748                .serve_at("/org/freedesktop/zbus/ComeAndGo", ComeAndGo)
749                .unwrap()
750                .name("org.freedesktop.zbus.ComeAndGo")
751                .unwrap()
752                .build()
753                .unwrap();
754
755            let iface_ref = conn
756                .object_server()
757                .interface::<_, ComeAndGo>("/org/freedesktop/zbus/ComeAndGo")
758                .unwrap();
759            block_on(ComeAndGo::the_signal(iface_ref.signal_context())).unwrap();
760
761            rx.recv().unwrap();
762
763            // Now we release the name ownership to use a different connection (i-e new unique
764            // name).
765            conn.release_name("org.freedesktop.zbus.ComeAndGo").unwrap();
766        }
767
768        child.join().unwrap();
769    }
770
771    #[test]
772    #[timeout(15000)]
773    fn uncached_property() {
774        block_on(test_uncached_property()).unwrap();
775    }
776
777    async fn test_uncached_property() -> Result<()> {
778        // A dummy boolean test service. It starts as `false` and can be
779        // flipped to `true`. Two properties can access the inner value, with
780        // and without caching.
781        #[derive(Default)]
782        struct ServiceUncachedPropertyTest(bool);
783        #[crate::dbus_interface(name = "org.freedesktop.zbus.UncachedPropertyTest")]
784        impl ServiceUncachedPropertyTest {
785            #[dbus_interface(property)]
786            fn cached_prop(&self) -> bool {
787                self.0
788            }
789            #[dbus_interface(property)]
790            fn uncached_prop(&self) -> bool {
791                self.0
792            }
793            async fn set_inner_to_true(&mut self) -> zbus::fdo::Result<()> {
794                self.0 = true;
795                Ok(())
796            }
797        }
798
799        #[crate::dbus_proxy(
800            interface = "org.freedesktop.zbus.UncachedPropertyTest",
801            default_service = "org.freedesktop.zbus.UncachedPropertyTest",
802            default_path = "/org/freedesktop/zbus/UncachedPropertyTest"
803        )]
804        trait UncachedPropertyTest {
805            #[dbus_proxy(property)]
806            fn cached_prop(&self) -> zbus::Result<bool>;
807
808            #[dbus_proxy(property(emits_changed_signal = "false"))]
809            fn uncached_prop(&self) -> zbus::Result<bool>;
810
811            fn set_inner_to_true(&self) -> zbus::Result<()>;
812        }
813
814        let service = crate::ConnectionBuilder::session()
815            .unwrap()
816            .serve_at(
817                "/org/freedesktop/zbus/UncachedPropertyTest",
818                ServiceUncachedPropertyTest(false),
819            )
820            .unwrap()
821            .build()
822            .await
823            .unwrap();
824
825        let dest = service.unique_name().unwrap();
826
827        let client_conn = crate::Connection::session().await.unwrap();
828        let client = UncachedPropertyTestProxy::builder(&client_conn)
829            .destination(dest)
830            .unwrap()
831            .build()
832            .await
833            .unwrap();
834
835        // Query properties; this populates the cache too.
836        assert!(!client.cached_prop().await.unwrap());
837        assert!(!client.uncached_prop().await.unwrap());
838
839        // Flip the inner value so we can observe the different semantics of
840        // the two properties.
841        client.set_inner_to_true().await.unwrap();
842
843        // Query properties again; the first one should incur a stale read from
844        // cache, while the second one should be able to read the live/updated
845        // value.
846        assert!(!client.cached_prop().await.unwrap());
847        assert!(client.uncached_prop().await.unwrap());
848
849        Ok(())
850    }
851
852    #[test]
853    #[timeout(15000)]
854    fn issue_260() {
855        // Low-level server example in the book doesn't work. The reason was that
856        // `Connection::request_name` implicitly created the associated `ObjectServer` to avoid
857        // #68. This meant that the `ObjectServer` ended up replying to the incoming method call
858        // with an error, before the service code could do so.
859        block_on(async {
860            let connection = Connection::session().await?;
861
862            connection.request_name("org.zbus.Issue260").await?;
863
864            futures_util::try_join!(
865                issue_260_service(&connection),
866                issue_260_client(&connection),
867            )?;
868
869            Ok::<(), zbus::Error>(())
870        })
871        .unwrap();
872    }
873
874    async fn issue_260_service(connection: &Connection) -> Result<()> {
875        use futures_util::stream::TryStreamExt;
876
877        let mut stream = zbus::MessageStream::from(connection);
878        while let Some(msg) = stream.try_next().await? {
879            let msg_header = msg.header()?;
880
881            match msg_header.message_type()? {
882                zbus::MessageType::MethodCall => {
883                    connection.reply(&msg, &()).await?;
884
885                    break;
886                }
887                _ => continue,
888            }
889        }
890
891        Ok(())
892    }
893
894    async fn issue_260_client(connection: &Connection) -> Result<()> {
895        zbus::Proxy::new(
896            connection,
897            "org.zbus.Issue260",
898            "/org/zbus/Issue260",
899            "org.zbus.Issue260",
900        )
901        .await?
902        .call("Whatever", &())
903        .await?;
904        Ok(())
905    }
906
907    #[test(tokio::test(flavor = "multi_thread", worker_threads = 2))]
908    // Issue specific to tokio runtime.
909    #[cfg(all(unix, feature = "tokio"))]
910    #[instrument]
911    async fn issue_279() {
912        // On failure to read from the socket, we were closing the error channel from the sender
913        // side and since the underlying tokio API doesn't provide a `close` method on the sender,
914        // the async-channel abstraction was achieving this through calling `close` on receiver,
915        // which is behind an async mutex and we end up with a deadlock.
916        use crate::{ConnectionBuilder, MessageStream};
917        use futures_util::{stream::TryStreamExt, try_join};
918        use tokio::net::UnixStream;
919
920        let guid = crate::Guid::generate();
921        let (p0, p1) = UnixStream::pair().unwrap();
922
923        let server = ConnectionBuilder::unix_stream(p0)
924            .server(&guid)
925            .p2p()
926            .build();
927        let client = ConnectionBuilder::unix_stream(p1).p2p().build();
928        let (client, server) = try_join!(client, server).unwrap();
929        let mut stream = MessageStream::from(client);
930        let next_msg_fut = stream.try_next();
931
932        drop(server);
933
934        assert!(matches!(next_msg_fut.await, Err(_)));
935    }
936
937    #[test(tokio::test(flavor = "multi_thread"))]
938    // Issue specific to tokio runtime.
939    #[cfg(all(unix, feature = "tokio"))]
940    #[instrument]
941    async fn issue_310() {
942        // The issue was we were deadlocking on fetching the new property value after invalidation.
943        // This turned out to be caused by us trying to grab a read lock on resource while holding
944        // a write lock. Thanks to connman for being weird and invalidating the property just before
945        // updating it, so this issue could be exposed.
946        use futures_util::StreamExt;
947        use zbus::ConnectionBuilder;
948
949        struct Station(u64);
950
951        #[zbus::dbus_interface(name = "net.connman.iwd.Station")]
952        impl Station {
953            #[dbus_interface(property)]
954            fn connected_network(&self) -> OwnedObjectPath {
955                format!("/net/connman/iwd/0/33/Network/{}", self.0)
956                    .try_into()
957                    .unwrap()
958            }
959        }
960
961        #[zbus::dbus_proxy(
962            interface = "net.connman.iwd.Station",
963            default_service = "net.connman.iwd"
964        )]
965        trait Station {
966            #[dbus_proxy(property)]
967            fn connected_network(&self) -> zbus::Result<OwnedObjectPath>;
968        }
969        let connection = ConnectionBuilder::session()
970            .unwrap()
971            .serve_at("/net/connman/iwd/0/33", Station(0))
972            .unwrap()
973            .name("net.connman.iwd")
974            .unwrap()
975            .build()
976            .await
977            .unwrap();
978        let event = Arc::new(event_listener::Event::new());
979        let conn_clone = connection.clone();
980        let event_clone = event.clone();
981        tokio::spawn(async move {
982            for _ in 0..10 {
983                let listener = event_clone.listen();
984                let iface_ref = conn_clone
985                    .object_server()
986                    .interface::<_, Station>("/net/connman/iwd/0/33")
987                    .await
988                    .unwrap();
989
990                {
991                    let iface = iface_ref.get().await;
992                    iface
993                        .connected_network_invalidate(iface_ref.signal_context())
994                        .await
995                        .unwrap();
996                    iface
997                        .connected_network_changed(iface_ref.signal_context())
998                        .await
999                        .unwrap();
1000                }
1001                listener.await;
1002                iface_ref.get_mut().await.0 += 1;
1003            }
1004        });
1005
1006        let station = StationProxy::builder(&connection)
1007            .path("/net/connman/iwd/0/33")
1008            .unwrap()
1009            .build()
1010            .await
1011            .unwrap();
1012
1013        let mut changes = station.receive_connected_network_changed().await;
1014
1015        let mut last_received = 0;
1016        while last_received < 9 {
1017            let change = changes.next().await.unwrap();
1018            let path = change.get().await.unwrap();
1019            let received: u64 = path
1020                .split('/')
1021                .last()
1022                .unwrap()
1023                .parse()
1024                .expect("invalid path");
1025            assert!(received >= last_received);
1026            last_received = received;
1027            event.notify(1);
1028        }
1029    }
1030}