zbus/blocking/
connection.rs

1use enumflags2::BitFlags;
2use event_listener::EventListener;
3use static_assertions::assert_impl_all;
4use std::{convert::TryInto, io, ops::Deref, sync::Arc};
5use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName};
6use zvariant::ObjectPath;
7
8use crate::{
9    blocking::ObjectServer,
10    fdo::{ConnectionCredentials, RequestNameFlags, RequestNameReply},
11    utils::block_on,
12    DBusError, Error, Message, Result,
13};
14
15/// A blocking wrapper of [`zbus::Connection`].
16///
17/// Most of the API is very similar to [`zbus::Connection`], except it's blocking. One
18/// notable difference is that there is no equivalent of [`Sink`] implementation provided.
19///
20/// [`Sink`]: https://docs.rs/futures/0.3.17/futures/sink/trait.Sink.html
21#[derive(derivative::Derivative, Clone)]
22#[derivative(Debug)]
23#[must_use = "Dropping a `Connection` will close the underlying socket."]
24pub struct Connection {
25    inner: crate::Connection,
26}
27
28assert_impl_all!(Connection: Send, Sync, Unpin);
29
30impl Connection {
31    /// Create a `Connection` to the session/user message bus.
32    pub fn session() -> Result<Self> {
33        block_on(crate::Connection::session()).map(Self::from)
34    }
35
36    /// Create a `Connection` to the system-wide message bus.
37    pub fn system() -> Result<Self> {
38        block_on(crate::Connection::system()).map(Self::from)
39    }
40
41    /// The capacity of the main (unfiltered) queue.
42    pub fn max_queued(&self) -> usize {
43        self.inner.max_queued()
44    }
45
46    /// Set the capacity of the main (unfiltered) queue.
47    pub fn set_max_queued(mut self, max: usize) {
48        self.inner.set_max_queued(max)
49    }
50
51    /// The server's GUID.
52    pub fn server_guid(&self) -> &str {
53        self.inner.server_guid()
54    }
55
56    /// The unique name as assigned by the message bus or `None` if not a message bus connection.
57    pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
58        self.inner.unique_name()
59    }
60
61    /// Send `msg` to the peer.
62    ///
63    /// The connection sets a unique serial number on the message before sending it off.
64    ///
65    /// On successfully sending off `msg`, the assigned serial number is returned.
66    pub fn send_message(&self, msg: Message) -> Result<u32> {
67        block_on(self.inner.send_message(msg))
68    }
69
70    /// Send a method call.
71    ///
72    /// Create a method-call message, send it over the connection, then wait for the reply. Incoming
73    /// messages are received through [`receive_message`] until the matching method reply (error or
74    /// return) is received.
75    ///
76    /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus
77    /// error replies are returned as [`MethodError`].
78    ///
79    /// [`receive_message`]: struct.Connection.html#method.receive_message
80    /// [`MethodError`]: enum.Error.html#variant.MethodError
81    pub fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
82        &self,
83        destination: Option<D>,
84        path: P,
85        iface: Option<I>,
86        method_name: M,
87        body: &B,
88    ) -> Result<Arc<Message>>
89    where
90        D: TryInto<BusName<'d>>,
91        P: TryInto<ObjectPath<'p>>,
92        I: TryInto<InterfaceName<'i>>,
93        M: TryInto<MemberName<'m>>,
94        D::Error: Into<Error>,
95        P::Error: Into<Error>,
96        I::Error: Into<Error>,
97        M::Error: Into<Error>,
98        B: serde::ser::Serialize + zvariant::DynamicType,
99    {
100        block_on(
101            self.inner
102                .call_method(destination, path, iface, method_name, body),
103        )
104    }
105
106    /// Emit a signal.
107    ///
108    /// Create a signal message, and send it over the connection.
109    pub fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
110        &self,
111        destination: Option<D>,
112        path: P,
113        iface: I,
114        signal_name: M,
115        body: &B,
116    ) -> Result<()>
117    where
118        D: TryInto<BusName<'d>>,
119        P: TryInto<ObjectPath<'p>>,
120        I: TryInto<InterfaceName<'i>>,
121        M: TryInto<MemberName<'m>>,
122        D::Error: Into<Error>,
123        P::Error: Into<Error>,
124        I::Error: Into<Error>,
125        M::Error: Into<Error>,
126        B: serde::ser::Serialize + zvariant::DynamicType,
127    {
128        block_on(
129            self.inner
130                .emit_signal(destination, path, iface, signal_name, body),
131        )
132    }
133
134    /// Reply to a message.
135    ///
136    /// Given an existing message (likely a method call), send a reply back to the caller with the
137    /// given `body`.
138    ///
139    /// Returns the message serial number.
140    pub fn reply<B>(&self, call: &Message, body: &B) -> Result<u32>
141    where
142        B: serde::ser::Serialize + zvariant::DynamicType,
143    {
144        block_on(self.inner.reply(call, body))
145    }
146
147    /// Reply an error to a message.
148    ///
149    /// Given an existing message (likely a method call), send an error reply back to the caller
150    /// with the given `error_name` and `body`.
151    ///
152    /// Returns the message serial number.
153    pub fn reply_error<'e, E, B>(&self, call: &Message, error_name: E, body: &B) -> Result<u32>
154    where
155        B: serde::ser::Serialize + zvariant::DynamicType,
156        E: TryInto<ErrorName<'e>>,
157        E::Error: Into<Error>,
158    {
159        block_on(self.inner.reply_error(call, error_name, body))
160    }
161
162    /// Reply to a method call with an error.
163    ///
164    /// Given an existing method call message header, send an error reply back to the caller
165    /// using one of the standard interface reply types.
166    ///
167    /// Returns the message serial number.
168    pub fn reply_dbus_error(
169        &self,
170        call: &zbus::MessageHeader<'_>,
171        err: impl DBusError,
172    ) -> Result<u32> {
173        block_on(self.inner.reply_dbus_error(call, err))
174    }
175
176    /// Register a well-known name for this service on the bus.
177    ///
178    /// Blocking version of [`crate::Connection::request_name`]. See docs there for more details
179    /// and caveats.
180    pub fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
181    where
182        W: TryInto<WellKnownName<'w>>,
183        W::Error: Into<Error>,
184    {
185        block_on(self.inner.request_name(well_known_name))
186    }
187
188    /// Register a well-known name for this service on the bus.
189    ///
190    /// Blocking version of [`crate::Connection::request_name_with_flags`]. See docs there for more
191    /// details and caveats.
192    pub fn request_name_with_flags<'w, W>(
193        &self,
194        well_known_name: W,
195        flags: BitFlags<RequestNameFlags>,
196    ) -> Result<RequestNameReply>
197    where
198        W: TryInto<WellKnownName<'w>>,
199        W::Error: Into<Error>,
200    {
201        block_on(self.inner.request_name_with_flags(well_known_name, flags))
202    }
203
204    /// Deregister a previously registered well-known name for this service on the bus.
205    ///
206    /// Use this method to deregister a well-known name, registered through
207    /// [`Connection::request_name`].
208    ///
209    /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with
210    /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name
211    /// was not previously registered or already deregistered.
212    pub fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
213    where
214        W: TryInto<WellKnownName<'w>>,
215        W::Error: Into<Error>,
216    {
217        block_on(self.inner.release_name(well_known_name))
218    }
219
220    /// Checks if `self` is a connection to a message bus.
221    ///
222    /// This will return `false` for p2p connections.
223    pub fn is_bus(&self) -> bool {
224        self.inner.is_bus()
225    }
226
227    /// Get a reference to the associated [`ObjectServer`].
228    ///
229    /// The `ObjectServer` is created on-demand.
230    pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
231        self.inner.sync_object_server(true, None)
232    }
233
234    /// Get a reference to the underlying async Connection.
235    pub fn inner(&self) -> &crate::Connection {
236        &self.inner
237    }
238
239    /// Get the underlying async Connection, consuming `self`.
240    pub fn into_inner(self) -> crate::Connection {
241        self.inner
242    }
243
244    /// Returns a listener, notified on various connection activity.
245    ///
246    /// This function is meant for the caller to implement idle or timeout on inactivity.
247    pub fn monitor_activity(&self) -> EventListener {
248        self.inner.monitor_activity()
249    }
250
251    /// Returns the peer credentials.
252    ///
253    /// The fields are populated on the best effort basis. Some or all fields may not even make
254    /// sense for certain sockets or on certain platforms and hence will be set to `None`.
255    ///
256    /// # Caveats
257    ///
258    /// Currently `unix_group_ids` and `linux_security_label` fields are not populated.
259    pub fn peer_credentials(&self) -> io::Result<ConnectionCredentials> {
260        block_on(self.inner.peer_credentials())
261    }
262}
263
264impl From<crate::Connection> for Connection {
265    fn from(conn: crate::Connection) -> Self {
266        Self { inner: conn }
267    }
268}
269
270#[cfg(all(test, unix))]
271mod tests {
272    use ntest::timeout;
273    #[cfg(all(unix, not(feature = "tokio")))]
274    use std::os::unix::net::UnixStream;
275    use std::thread;
276    use test_log::test;
277    #[cfg(all(unix, feature = "tokio"))]
278    use tokio::net::UnixStream;
279    #[cfg(all(windows, not(feature = "tokio")))]
280    use uds_windows::UnixStream;
281
282    use crate::{
283        blocking::{ConnectionBuilder, MessageIterator},
284        Guid,
285    };
286
287    #[test]
288    #[timeout(15000)]
289    fn unix_p2p() {
290        let guid = Guid::generate();
291
292        // Tokio needs us to call the sync function from async context. :shrug:
293        let (p0, p1) = crate::utils::block_on(async { UnixStream::pair().unwrap() });
294
295        let (tx, rx) = std::sync::mpsc::channel();
296        let server_thread = thread::spawn(move || {
297            let c = ConnectionBuilder::unix_stream(p0)
298                .server(&guid)
299                .p2p()
300                .build()
301                .unwrap();
302            rx.recv().unwrap();
303            let reply = c
304                .call_method(None::<()>, "/", Some("org.zbus.p2p"), "Test", &())
305                .unwrap();
306            assert_eq!(reply.to_string(), "Method return");
307            let val: String = reply.body().unwrap();
308            val
309        });
310
311        let c = ConnectionBuilder::unix_stream(p1).p2p().build().unwrap();
312        let listener = c.monitor_activity();
313        let mut s = MessageIterator::from(&c);
314        tx.send(()).unwrap();
315        let m = s.next().unwrap().unwrap();
316        assert_eq!(m.to_string(), "Method call Test");
317        c.reply(&m, &("yay")).unwrap();
318
319        for _ in s {}
320
321        let val = server_thread.join().expect("failed to join server thread");
322        assert_eq!(val, "yay");
323
324        // there was some activity
325        listener.wait();
326        // eventually, nothing happens and it will timeout
327        loop {
328            let listener = c.monitor_activity();
329            if !listener.wait_timeout(std::time::Duration::from_millis(10)) {
330                break;
331            }
332        }
333    }
334}