zbus/fdo/
mod.rs

1mod error;
2pub use error::{Error, Result};
3
4pub(crate) mod dbus;
5pub use dbus::{
6    ConnectionCredentials, DBusProxy, NameAcquired, NameAcquiredArgs, NameAcquiredStream, NameLost,
7    NameLostArgs, NameLostStream, NameOwnerChanged, NameOwnerChangedArgs, NameOwnerChangedStream,
8    ReleaseNameReply, RequestNameFlags, RequestNameReply, StartServiceReply,
9};
10
11pub(crate) mod introspectable;
12pub(crate) use introspectable::Introspectable;
13pub use introspectable::IntrospectableProxy;
14
15pub(crate) mod monitoring;
16pub use monitoring::MonitoringProxy;
17
18pub(crate) mod object_manager;
19pub use object_manager::{
20    InterfacesAdded, InterfacesAddedArgs, InterfacesAddedStream, InterfacesRemoved,
21    InterfacesRemovedArgs, InterfacesRemovedStream, ManagedObjects, ObjectManager,
22    ObjectManagerProxy,
23};
24
25pub(crate) mod peer;
26pub(crate) use peer::Peer;
27pub use peer::PeerProxy;
28
29pub(crate) mod properties;
30pub use properties::{
31    Properties, PropertiesChanged, PropertiesChangedArgs, PropertiesChangedStream, PropertiesProxy,
32};
33
34pub(crate) mod stats;
35pub use stats::StatsProxy;
36
37#[cfg(test)]
38mod tests {
39    use crate::{fdo, interface, message::Message, DBusError, Error};
40    use futures_util::StreamExt;
41    use ntest::timeout;
42    use test_log::test;
43    use tokio::runtime;
44    use zbus_names::WellKnownName;
45
46    #[test]
47    fn error_from_zerror() {
48        let m = Message::method_call("/", "foo")
49            .unwrap()
50            .destination(":1.2")
51            .unwrap()
52            .build(&())
53            .unwrap();
54        let m = Message::error(&m.header(), "org.freedesktop.DBus.Error.TimedOut")
55            .unwrap()
56            .build(&("so long"))
57            .unwrap();
58        let e: Error = m.into();
59        let e: fdo::Error = e.into();
60        assert_eq!(e, fdo::Error::TimedOut("so long".to_string()),);
61        assert_eq!(e.name(), "org.freedesktop.DBus.Error.TimedOut");
62        assert_eq!(e.description(), Some("so long"));
63    }
64
65    #[test]
66    #[timeout(15000)]
67    fn signal() {
68        // Multi-threaded scheduler.
69        runtime::Runtime::new().unwrap().block_on(test_signal());
70
71        // single-threaded scheduler.
72        runtime::Builder::new_current_thread()
73            .enable_io()
74            .build()
75            .unwrap()
76            .block_on(test_signal());
77    }
78
79    async fn test_signal() {
80        let conn = crate::Connection::session().await.unwrap();
81        let proxy = fdo::DBusProxy::new(&conn).await.unwrap();
82
83        // Register a well-known name with the session bus and ensure we get the appropriate
84        // signals called for that.
85        let well_known = "org.freedesktop.zbus.FdoSignalStreamTest";
86        let unique_name = conn.unique_name().unwrap();
87        let owner_change_stream = proxy
88            .receive_name_owner_changed_with_args(&[(0, well_known), (2, unique_name.as_str())])
89            .await
90            .unwrap();
91
92        let name_acquired_stream = proxy
93            .receive_name_acquired_with_args(&[(0, well_known)])
94            .await
95            .unwrap();
96        let mut stream = owner_change_stream.zip(name_acquired_stream);
97
98        let well_known: WellKnownName<'static> = well_known.try_into().unwrap();
99        proxy
100            .request_name(
101                well_known.as_ref(),
102                fdo::RequestNameFlags::ReplaceExisting.into(),
103            )
104            .await
105            .unwrap();
106
107        let (name_owner_changed, name_acquired) = stream.next().await.unwrap();
108        assert_eq!(name_owner_changed.args().unwrap().name(), &well_known);
109        assert_eq!(
110            *name_owner_changed
111                .args()
112                .unwrap()
113                .new_owner()
114                .as_ref()
115                .unwrap(),
116            *unique_name
117        );
118        assert_eq!(name_acquired.args().unwrap().name(), &well_known);
119
120        let result = proxy.release_name(well_known.as_ref()).await.unwrap();
121        assert_eq!(result, fdo::ReleaseNameReply::Released);
122
123        let result = proxy.release_name(well_known).await.unwrap();
124        assert_eq!(result, fdo::ReleaseNameReply::NonExistent);
125
126        let _stream = proxy
127            .receive_features_changed()
128            .await
129            .filter_map(|changed| async move {
130                let v = changed.get().await.ok();
131                dbg!(v)
132            });
133    }
134
135    #[test]
136    #[timeout(15000)]
137    fn no_object_manager_signals_before_hello() {
138        crate::block_on(no_object_manager_signals_before_hello_async());
139    }
140
141    async fn no_object_manager_signals_before_hello_async() {
142        // We were emitting `InterfacesAdded` signals before `Hello` was called, which is wrong and
143        // results in us getting disconnected by the bus. This test case ensures we don't do that
144        // and also that the signals are eventually emitted.
145
146        // Let's first create an interator to get the signals (it has to be another connection).
147        let conn = zbus::Connection::session().await.unwrap();
148        let mut stream = zbus::MessageStream::for_match_rule(
149            zbus::MatchRule::builder()
150                .msg_type(zbus::message::Type::Signal)
151                .interface("org.freedesktop.DBus.ObjectManager")
152                .unwrap()
153                .path("/org/zbus/NoObjectManagerSignalsBeforeHello")
154                .unwrap()
155                .build(),
156            &conn,
157            None,
158        )
159        .await
160        .unwrap();
161
162        // Now create the service side.
163        struct TestObj;
164        #[interface(name = "org.zbus.TestObj")]
165        impl TestObj {
166            #[zbus(property)]
167            fn test(&self) -> String {
168                "test".into()
169            }
170        }
171        let _conn = zbus::conn::Builder::session()
172            .unwrap()
173            .name("org.zbus.NoObjectManagerSignalsBeforeHello")
174            .unwrap()
175            .serve_at("/org/zbus/NoObjectManagerSignalsBeforeHello/Obj", TestObj)
176            .unwrap()
177            .serve_at(
178                "/org/zbus/NoObjectManagerSignalsBeforeHello",
179                super::ObjectManager,
180            )
181            .unwrap()
182            .build()
183            .await
184            .unwrap();
185
186        // Let's see if the `InterfacesAdded` signal was emitted.
187        let msg = stream.next().await.unwrap().unwrap();
188        let signal = super::InterfacesAdded::from_message(msg).unwrap();
189        assert_eq!(
190            signal.args().unwrap().interfaces_and_properties,
191            vec![(
192                "org.zbus.TestObj".try_into().unwrap(),
193                vec![("Test", zvariant::Value::new("test"))]
194                    .into_iter()
195                    .collect()
196            )]
197            .into_iter()
198            .collect()
199        );
200    }
201}