zbus/blocking/connection/
mod.rs

1//! Blocking connection API.
2
3use enumflags2::BitFlags;
4use event_listener::EventListener;
5use std::{io, ops::Deref};
6use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName};
7use zvariant::ObjectPath;
8
9use crate::{
10    blocking::ObjectServer,
11    fdo::{ConnectionCredentials, RequestNameFlags, RequestNameReply},
12    message::Message,
13    utils::block_on,
14    DBusError, Error, Result,
15};
16
17mod builder;
18pub use builder::Builder;
19
20/// A blocking wrapper of [`zbus::Connection`].
21///
22/// Most of the API is very similar to [`zbus::Connection`], except it's blocking.
23#[derive(Debug, Clone)]
24#[must_use = "Dropping a `Connection` will close the underlying socket."]
25pub struct Connection {
26    inner: crate::Connection,
27}
28
29impl Connection {
30    /// Create a `Connection` to the session/user message bus.
31    pub fn session() -> Result<Self> {
32        block_on(crate::Connection::session()).map(Self::from)
33    }
34
35    /// Create a `Connection` to the system-wide message bus.
36    pub fn system() -> Result<Self> {
37        block_on(crate::Connection::system()).map(Self::from)
38    }
39
40    /// The capacity of the main (unfiltered) queue.
41    pub fn max_queued(&self) -> usize {
42        self.inner.max_queued()
43    }
44
45    /// Set the capacity of the main (unfiltered) queue.
46    pub fn set_max_queued(mut self, max: usize) {
47        self.inner.set_max_queued(max)
48    }
49
50    /// The server's GUID.
51    pub fn server_guid(&self) -> &str {
52        self.inner.server_guid()
53    }
54
55    /// The unique name as assigned by the message bus or `None` if not a message bus connection.
56    pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
57        self.inner.unique_name()
58    }
59
60    /// Send `msg` to the peer.
61    pub fn send(&self, msg: &Message) -> Result<()> {
62        block_on(self.inner.send(msg))
63    }
64
65    /// Send a method call.
66    ///
67    /// Create a method-call message, send it over the connection, then wait for the reply.
68    ///
69    /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus
70    /// error replies are returned as [`Error::MethodError`].
71    pub fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
72        &self,
73        destination: Option<D>,
74        path: P,
75        iface: Option<I>,
76        method_name: M,
77        body: &B,
78    ) -> Result<Message>
79    where
80        D: TryInto<BusName<'d>>,
81        P: TryInto<ObjectPath<'p>>,
82        I: TryInto<InterfaceName<'i>>,
83        M: TryInto<MemberName<'m>>,
84        D::Error: Into<Error>,
85        P::Error: Into<Error>,
86        I::Error: Into<Error>,
87        M::Error: Into<Error>,
88        B: serde::ser::Serialize + zvariant::DynamicType,
89    {
90        block_on(
91            self.inner
92                .call_method(destination, path, iface, method_name, body),
93        )
94    }
95
96    /// Emit a signal.
97    ///
98    /// Create a signal message, and send it over the connection.
99    pub fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
100        &self,
101        destination: Option<D>,
102        path: P,
103        iface: I,
104        signal_name: M,
105        body: &B,
106    ) -> Result<()>
107    where
108        D: TryInto<BusName<'d>>,
109        P: TryInto<ObjectPath<'p>>,
110        I: TryInto<InterfaceName<'i>>,
111        M: TryInto<MemberName<'m>>,
112        D::Error: Into<Error>,
113        P::Error: Into<Error>,
114        I::Error: Into<Error>,
115        M::Error: Into<Error>,
116        B: serde::ser::Serialize + zvariant::DynamicType,
117    {
118        block_on(
119            self.inner
120                .emit_signal(destination, path, iface, signal_name, body),
121        )
122    }
123
124    /// Reply to a message.
125    ///
126    /// Given an existing message (likely a method call), send a reply back to the caller with the
127    /// given `body`.
128    pub fn reply<B>(&self, call: &zbus::message::Header<'_>, body: &B) -> Result<()>
129    where
130        B: serde::ser::Serialize + zvariant::DynamicType,
131    {
132        block_on(self.inner.reply(call, body))
133    }
134
135    /// Reply an error to a message.
136    ///
137    /// Given an existing message (likely a method call), send an error reply back to the caller
138    /// with the given `error_name` and `body`.
139    ///
140    /// Returns the message serial number.
141    pub fn reply_error<'e, E, B>(
142        &self,
143        call: &zbus::message::Header<'_>,
144        error_name: E,
145        body: &B,
146    ) -> Result<()>
147    where
148        B: serde::ser::Serialize + zvariant::DynamicType,
149        E: TryInto<ErrorName<'e>>,
150        E::Error: Into<Error>,
151    {
152        block_on(self.inner.reply_error(call, error_name, body))
153    }
154
155    /// Reply to a method call with an error.
156    ///
157    /// Given an existing method call message header, send an error reply back to the caller
158    /// using one of the standard interface reply types.
159    ///
160    /// Returns the message serial number.
161    pub fn reply_dbus_error(
162        &self,
163        call: &zbus::message::Header<'_>,
164        err: impl DBusError,
165    ) -> Result<()> {
166        block_on(self.inner.reply_dbus_error(call, err))
167    }
168
169    /// Register a well-known name for this service on the bus.
170    ///
171    /// Blocking version of [`crate::Connection::request_name`]. See docs there for more details
172    /// and caveats.
173    pub fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
174    where
175        W: TryInto<WellKnownName<'w>>,
176        W::Error: Into<Error>,
177    {
178        block_on(self.inner.request_name(well_known_name))
179    }
180
181    /// Register a well-known name for this service on the bus.
182    ///
183    /// Blocking version of [`crate::Connection::request_name_with_flags`]. See docs there for more
184    /// details and caveats.
185    pub fn request_name_with_flags<'w, W>(
186        &self,
187        well_known_name: W,
188        flags: BitFlags<RequestNameFlags>,
189    ) -> Result<RequestNameReply>
190    where
191        W: TryInto<WellKnownName<'w>>,
192        W::Error: Into<Error>,
193    {
194        block_on(self.inner.request_name_with_flags(well_known_name, flags))
195    }
196
197    /// Deregister a previously registered well-known name for this service on the bus.
198    ///
199    /// Use this method to deregister a well-known name, registered through
200    /// [`Connection::request_name`].
201    ///
202    /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with
203    /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name
204    /// was not previously registered or already deregistered.
205    pub fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
206    where
207        W: TryInto<WellKnownName<'w>>,
208        W::Error: Into<Error>,
209    {
210        block_on(self.inner.release_name(well_known_name))
211    }
212
213    /// Check if `self` is a connection to a message bus.
214    ///
215    /// This will return `false` for p2p connections.
216    pub fn is_bus(&self) -> bool {
217        self.inner.is_bus()
218    }
219
220    /// Get a reference to the associated [`ObjectServer`].
221    ///
222    /// The `ObjectServer` is created on-demand.
223    pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
224        struct Wrapper(ObjectServer);
225        impl Deref for Wrapper {
226            type Target = ObjectServer;
227
228            fn deref(&self) -> &Self::Target {
229                &self.0
230            }
231        }
232
233        Wrapper(ObjectServer::new(&self.inner))
234    }
235
236    /// Get a reference to the underlying async Connection.
237    pub fn inner(&self) -> &crate::Connection {
238        &self.inner
239    }
240
241    /// Get the underlying async Connection, consuming `self`.
242    pub fn into_inner(self) -> crate::Connection {
243        self.inner
244    }
245
246    /// Return a listener, notified on various connection activity.
247    ///
248    /// This function is meant for the caller to implement idle or timeout on inactivity.
249    pub fn monitor_activity(&self) -> EventListener {
250        self.inner.monitor_activity()
251    }
252
253    /// Return the peer credentials.
254    ///
255    /// The fields are populated on the best effort basis. Some or all fields may not even make
256    /// sense for certain sockets or on certain platforms and hence will be set to `None`.
257    ///
258    /// # Caveats
259    ///
260    /// Currently `unix_group_ids` and `linux_security_label` fields are not populated.
261    pub fn peer_credentials(&self) -> io::Result<ConnectionCredentials> {
262        block_on(self.inner.peer_credentials())
263    }
264
265    /// Close the connection.
266    ///
267    /// After this call, all reading and writing operations will fail.
268    pub fn close(self) -> Result<()> {
269        block_on(self.inner.close())
270    }
271
272    /// Gracefully close the connection, waiting for all other references to be dropped.
273    ///
274    /// Blocking version of [`crate::Connection::graceful_shutdown`]. See docs there for
275    /// more details and caveats.
276    pub fn graceful_shutdown(self) {
277        block_on(self.inner.graceful_shutdown())
278    }
279}
280
281impl From<crate::Connection> for Connection {
282    fn from(conn: crate::Connection) -> Self {
283        Self { inner: conn }
284    }
285}
286
287#[cfg(feature = "p2p")]
288#[cfg(all(test, unix))]
289mod tests {
290    use event_listener::Listener;
291    use ntest::timeout;
292    #[cfg(all(unix, not(feature = "tokio")))]
293    use std::os::unix::net::UnixStream;
294    use std::thread;
295    use test_log::test;
296    #[cfg(all(unix, feature = "tokio"))]
297    use tokio::net::UnixStream;
298    #[cfg(all(windows, not(feature = "tokio")))]
299    use uds_windows::UnixStream;
300
301    use crate::{
302        blocking::{connection::Builder, MessageIterator},
303        Guid,
304    };
305
306    #[test]
307    #[timeout(15000)]
308    fn unix_p2p() {
309        let guid = Guid::generate();
310
311        // Tokio needs us to call the sync function from async context. :shrug:
312        let (p0, p1) = crate::utils::block_on(async { UnixStream::pair().unwrap() });
313
314        let (tx, rx) = std::sync::mpsc::channel();
315        let server_thread = thread::spawn(move || {
316            let c = Builder::unix_stream(p0)
317                .server(guid)
318                .unwrap()
319                .p2p()
320                .build()
321                .unwrap();
322            rx.recv().unwrap();
323            let reply = c
324                .call_method(None::<()>, "/", Some("org.zbus.p2p"), "Test", &())
325                .unwrap();
326            assert_eq!(reply.to_string(), "Method return");
327            let val: String = reply.body().deserialize().unwrap();
328            val
329        });
330
331        let c = Builder::unix_stream(p1).p2p().build().unwrap();
332
333        let listener = c.monitor_activity();
334
335        let mut s = MessageIterator::from(&c);
336        tx.send(()).unwrap();
337        let m = s.next().unwrap().unwrap();
338        assert_eq!(m.to_string(), "Method call Test");
339        c.reply(&m.header(), &("yay")).unwrap();
340
341        for _ in s {}
342
343        let val = server_thread.join().expect("failed to join server thread");
344        assert_eq!(val, "yay");
345
346        // there was some activity
347        listener.wait();
348        // eventually, nothing happens and it will timeout
349        loop {
350            let listener = c.monitor_activity();
351            if listener
352                .wait_timeout(std::time::Duration::from_millis(10))
353                .is_none()
354            {
355                break;
356            }
357        }
358    }
359}