zbus/blocking/connection/
mod.rs1use enumflags2::BitFlags;
4use event_listener::EventListener;
5use std::{io, ops::Deref};
6use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName};
7use zvariant::ObjectPath;
8
9use crate::{
10 blocking::ObjectServer,
11 fdo::{ConnectionCredentials, RequestNameFlags, RequestNameReply},
12 message::Message,
13 utils::block_on,
14 DBusError, Error, Result,
15};
16
17mod builder;
18pub use builder::Builder;
19
20#[derive(Debug, Clone)]
24#[must_use = "Dropping a `Connection` will close the underlying socket."]
25pub struct Connection {
26 inner: crate::Connection,
27}
28
29impl Connection {
30 pub fn session() -> Result<Self> {
32 block_on(crate::Connection::session()).map(Self::from)
33 }
34
35 pub fn system() -> Result<Self> {
37 block_on(crate::Connection::system()).map(Self::from)
38 }
39
40 pub fn max_queued(&self) -> usize {
42 self.inner.max_queued()
43 }
44
45 pub fn set_max_queued(mut self, max: usize) {
47 self.inner.set_max_queued(max)
48 }
49
50 pub fn server_guid(&self) -> &str {
52 self.inner.server_guid()
53 }
54
55 pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
57 self.inner.unique_name()
58 }
59
60 pub fn send(&self, msg: &Message) -> Result<()> {
62 block_on(self.inner.send(msg))
63 }
64
65 pub fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
72 &self,
73 destination: Option<D>,
74 path: P,
75 iface: Option<I>,
76 method_name: M,
77 body: &B,
78 ) -> Result<Message>
79 where
80 D: TryInto<BusName<'d>>,
81 P: TryInto<ObjectPath<'p>>,
82 I: TryInto<InterfaceName<'i>>,
83 M: TryInto<MemberName<'m>>,
84 D::Error: Into<Error>,
85 P::Error: Into<Error>,
86 I::Error: Into<Error>,
87 M::Error: Into<Error>,
88 B: serde::ser::Serialize + zvariant::DynamicType,
89 {
90 block_on(
91 self.inner
92 .call_method(destination, path, iface, method_name, body),
93 )
94 }
95
96 pub fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
100 &self,
101 destination: Option<D>,
102 path: P,
103 iface: I,
104 signal_name: M,
105 body: &B,
106 ) -> Result<()>
107 where
108 D: TryInto<BusName<'d>>,
109 P: TryInto<ObjectPath<'p>>,
110 I: TryInto<InterfaceName<'i>>,
111 M: TryInto<MemberName<'m>>,
112 D::Error: Into<Error>,
113 P::Error: Into<Error>,
114 I::Error: Into<Error>,
115 M::Error: Into<Error>,
116 B: serde::ser::Serialize + zvariant::DynamicType,
117 {
118 block_on(
119 self.inner
120 .emit_signal(destination, path, iface, signal_name, body),
121 )
122 }
123
124 pub fn reply<B>(&self, call: &zbus::message::Header<'_>, body: &B) -> Result<()>
129 where
130 B: serde::ser::Serialize + zvariant::DynamicType,
131 {
132 block_on(self.inner.reply(call, body))
133 }
134
135 pub fn reply_error<'e, E, B>(
142 &self,
143 call: &zbus::message::Header<'_>,
144 error_name: E,
145 body: &B,
146 ) -> Result<()>
147 where
148 B: serde::ser::Serialize + zvariant::DynamicType,
149 E: TryInto<ErrorName<'e>>,
150 E::Error: Into<Error>,
151 {
152 block_on(self.inner.reply_error(call, error_name, body))
153 }
154
155 pub fn reply_dbus_error(
162 &self,
163 call: &zbus::message::Header<'_>,
164 err: impl DBusError,
165 ) -> Result<()> {
166 block_on(self.inner.reply_dbus_error(call, err))
167 }
168
169 pub fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
174 where
175 W: TryInto<WellKnownName<'w>>,
176 W::Error: Into<Error>,
177 {
178 block_on(self.inner.request_name(well_known_name))
179 }
180
181 pub fn request_name_with_flags<'w, W>(
186 &self,
187 well_known_name: W,
188 flags: BitFlags<RequestNameFlags>,
189 ) -> Result<RequestNameReply>
190 where
191 W: TryInto<WellKnownName<'w>>,
192 W::Error: Into<Error>,
193 {
194 block_on(self.inner.request_name_with_flags(well_known_name, flags))
195 }
196
197 pub fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
206 where
207 W: TryInto<WellKnownName<'w>>,
208 W::Error: Into<Error>,
209 {
210 block_on(self.inner.release_name(well_known_name))
211 }
212
213 pub fn is_bus(&self) -> bool {
217 self.inner.is_bus()
218 }
219
220 pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
224 struct Wrapper(ObjectServer);
225 impl Deref for Wrapper {
226 type Target = ObjectServer;
227
228 fn deref(&self) -> &Self::Target {
229 &self.0
230 }
231 }
232
233 Wrapper(ObjectServer::new(&self.inner))
234 }
235
236 pub fn inner(&self) -> &crate::Connection {
238 &self.inner
239 }
240
241 pub fn into_inner(self) -> crate::Connection {
243 self.inner
244 }
245
246 pub fn monitor_activity(&self) -> EventListener {
250 self.inner.monitor_activity()
251 }
252
253 pub fn peer_credentials(&self) -> io::Result<ConnectionCredentials> {
262 block_on(self.inner.peer_credentials())
263 }
264
265 pub fn close(self) -> Result<()> {
269 block_on(self.inner.close())
270 }
271
272 pub fn graceful_shutdown(self) {
277 block_on(self.inner.graceful_shutdown())
278 }
279
280 pub fn method_timeout(&self) -> Option<std::time::Duration> {
282 self.inner.method_timeout()
283 }
284}
285
286impl From<crate::Connection> for Connection {
287 fn from(conn: crate::Connection) -> Self {
288 Self { inner: conn }
289 }
290}
291
292#[cfg(feature = "p2p")]
293#[cfg(all(test, unix))]
294mod tests {
295 use event_listener::Listener;
296 use ntest::timeout;
297 #[cfg(all(unix, not(feature = "tokio")))]
298 use std::os::unix::net::UnixStream;
299 use std::thread;
300 use test_log::test;
301 #[cfg(all(unix, feature = "tokio"))]
302 use tokio::net::UnixStream;
303 #[cfg(all(windows, not(feature = "tokio")))]
304 use uds_windows::UnixStream;
305
306 use crate::{
307 blocking::{connection::Builder, MessageIterator},
308 Guid,
309 };
310
311 #[test]
312 #[timeout(15000)]
313 fn unix_p2p() {
314 let guid = Guid::generate();
315
316 let (p0, p1) = crate::utils::block_on(async { UnixStream::pair().unwrap() });
318
319 let (tx, rx) = std::sync::mpsc::channel();
320 let server_thread = thread::spawn(move || {
321 let c = Builder::unix_stream(p0)
322 .server(guid)
323 .unwrap()
324 .p2p()
325 .build()
326 .unwrap();
327 rx.recv().unwrap();
328 let reply = c
329 .call_method(None::<()>, "/", Some("org.zbus.p2p"), "Test", &())
330 .unwrap();
331 assert_eq!(reply.to_string(), "Method return");
332 let val: String = reply.body().deserialize().unwrap();
333 val
334 });
335
336 let c = Builder::unix_stream(p1).p2p().build().unwrap();
337
338 let listener = c.monitor_activity();
339
340 let mut s = MessageIterator::from(&c);
341 tx.send(()).unwrap();
342 let m = s.next().unwrap().unwrap();
343 assert_eq!(m.to_string(), "Method call Test");
344 c.reply(&m.header(), &("yay")).unwrap();
345
346 for _ in s {}
347
348 let val = server_thread.join().expect("failed to join server thread");
349 assert_eq!(val, "yay");
350
351 listener.wait();
353 loop {
355 let listener = c.monitor_activity();
356 if listener
357 .wait_timeout(std::time::Duration::from_millis(10))
358 .is_none()
359 {
360 break;
361 }
362 }
363 }
364}