zbus/blocking/connection/
mod.rs1use enumflags2::BitFlags;
4use event_listener::EventListener;
5use std::{io, ops::Deref};
6use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName};
7use zvariant::ObjectPath;
8
9use crate::{
10 blocking::ObjectServer,
11 fdo::{ConnectionCredentials, RequestNameFlags, RequestNameReply},
12 message::Message,
13 utils::block_on,
14 DBusError, Error, Result,
15};
16
17mod builder;
18pub use builder::Builder;
19
20#[derive(Debug, Clone)]
24#[must_use = "Dropping a `Connection` will close the underlying socket."]
25pub struct Connection {
26 inner: crate::Connection,
27}
28
29impl Connection {
30 pub fn session() -> Result<Self> {
32 block_on(crate::Connection::session()).map(Self::from)
33 }
34
35 pub fn system() -> Result<Self> {
37 block_on(crate::Connection::system()).map(Self::from)
38 }
39
40 pub fn max_queued(&self) -> usize {
42 self.inner.max_queued()
43 }
44
45 pub fn set_max_queued(mut self, max: usize) {
47 self.inner.set_max_queued(max)
48 }
49
50 pub fn server_guid(&self) -> &str {
52 self.inner.server_guid()
53 }
54
55 pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
57 self.inner.unique_name()
58 }
59
60 pub fn send(&self, msg: &Message) -> Result<()> {
62 block_on(self.inner.send(msg))
63 }
64
65 pub fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
72 &self,
73 destination: Option<D>,
74 path: P,
75 iface: Option<I>,
76 method_name: M,
77 body: &B,
78 ) -> Result<Message>
79 where
80 D: TryInto<BusName<'d>>,
81 P: TryInto<ObjectPath<'p>>,
82 I: TryInto<InterfaceName<'i>>,
83 M: TryInto<MemberName<'m>>,
84 D::Error: Into<Error>,
85 P::Error: Into<Error>,
86 I::Error: Into<Error>,
87 M::Error: Into<Error>,
88 B: serde::ser::Serialize + zvariant::DynamicType,
89 {
90 block_on(
91 self.inner
92 .call_method(destination, path, iface, method_name, body),
93 )
94 }
95
96 pub fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
100 &self,
101 destination: Option<D>,
102 path: P,
103 iface: I,
104 signal_name: M,
105 body: &B,
106 ) -> Result<()>
107 where
108 D: TryInto<BusName<'d>>,
109 P: TryInto<ObjectPath<'p>>,
110 I: TryInto<InterfaceName<'i>>,
111 M: TryInto<MemberName<'m>>,
112 D::Error: Into<Error>,
113 P::Error: Into<Error>,
114 I::Error: Into<Error>,
115 M::Error: Into<Error>,
116 B: serde::ser::Serialize + zvariant::DynamicType,
117 {
118 block_on(
119 self.inner
120 .emit_signal(destination, path, iface, signal_name, body),
121 )
122 }
123
124 pub fn reply<B>(&self, call: &zbus::message::Header<'_>, body: &B) -> Result<()>
129 where
130 B: serde::ser::Serialize + zvariant::DynamicType,
131 {
132 block_on(self.inner.reply(call, body))
133 }
134
135 pub fn reply_error<'e, E, B>(
142 &self,
143 call: &zbus::message::Header<'_>,
144 error_name: E,
145 body: &B,
146 ) -> Result<()>
147 where
148 B: serde::ser::Serialize + zvariant::DynamicType,
149 E: TryInto<ErrorName<'e>>,
150 E::Error: Into<Error>,
151 {
152 block_on(self.inner.reply_error(call, error_name, body))
153 }
154
155 pub fn reply_dbus_error(
162 &self,
163 call: &zbus::message::Header<'_>,
164 err: impl DBusError,
165 ) -> Result<()> {
166 block_on(self.inner.reply_dbus_error(call, err))
167 }
168
169 pub fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
174 where
175 W: TryInto<WellKnownName<'w>>,
176 W::Error: Into<Error>,
177 {
178 block_on(self.inner.request_name(well_known_name))
179 }
180
181 pub fn request_name_with_flags<'w, W>(
186 &self,
187 well_known_name: W,
188 flags: BitFlags<RequestNameFlags>,
189 ) -> Result<RequestNameReply>
190 where
191 W: TryInto<WellKnownName<'w>>,
192 W::Error: Into<Error>,
193 {
194 block_on(self.inner.request_name_with_flags(well_known_name, flags))
195 }
196
197 pub fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
206 where
207 W: TryInto<WellKnownName<'w>>,
208 W::Error: Into<Error>,
209 {
210 block_on(self.inner.release_name(well_known_name))
211 }
212
213 pub fn is_bus(&self) -> bool {
217 self.inner.is_bus()
218 }
219
220 pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
224 struct Wrapper(ObjectServer);
225 impl Deref for Wrapper {
226 type Target = ObjectServer;
227
228 fn deref(&self) -> &Self::Target {
229 &self.0
230 }
231 }
232
233 Wrapper(ObjectServer::new(&self.inner))
234 }
235
236 pub fn inner(&self) -> &crate::Connection {
238 &self.inner
239 }
240
241 pub fn into_inner(self) -> crate::Connection {
243 self.inner
244 }
245
246 pub fn monitor_activity(&self) -> EventListener {
250 self.inner.monitor_activity()
251 }
252
253 pub fn peer_credentials(&self) -> io::Result<ConnectionCredentials> {
262 block_on(self.inner.peer_credentials())
263 }
264
265 pub fn close(self) -> Result<()> {
269 block_on(self.inner.close())
270 }
271
272 pub fn graceful_shutdown(self) {
277 block_on(self.inner.graceful_shutdown())
278 }
279}
280
281impl From<crate::Connection> for Connection {
282 fn from(conn: crate::Connection) -> Self {
283 Self { inner: conn }
284 }
285}
286
287#[cfg(feature = "p2p")]
288#[cfg(all(test, unix))]
289mod tests {
290 use event_listener::Listener;
291 use ntest::timeout;
292 #[cfg(all(unix, not(feature = "tokio")))]
293 use std::os::unix::net::UnixStream;
294 use std::thread;
295 use test_log::test;
296 #[cfg(all(unix, feature = "tokio"))]
297 use tokio::net::UnixStream;
298 #[cfg(all(windows, not(feature = "tokio")))]
299 use uds_windows::UnixStream;
300
301 use crate::{
302 blocking::{connection::Builder, MessageIterator},
303 Guid,
304 };
305
306 #[test]
307 #[timeout(15000)]
308 fn unix_p2p() {
309 let guid = Guid::generate();
310
311 let (p0, p1) = crate::utils::block_on(async { UnixStream::pair().unwrap() });
313
314 let (tx, rx) = std::sync::mpsc::channel();
315 let server_thread = thread::spawn(move || {
316 let c = Builder::unix_stream(p0)
317 .server(guid)
318 .unwrap()
319 .p2p()
320 .build()
321 .unwrap();
322 rx.recv().unwrap();
323 let reply = c
324 .call_method(None::<()>, "/", Some("org.zbus.p2p"), "Test", &())
325 .unwrap();
326 assert_eq!(reply.to_string(), "Method return");
327 let val: String = reply.body().deserialize().unwrap();
328 val
329 });
330
331 let c = Builder::unix_stream(p1).p2p().build().unwrap();
332
333 let listener = c.monitor_activity();
334
335 let mut s = MessageIterator::from(&c);
336 tx.send(()).unwrap();
337 let m = s.next().unwrap().unwrap();
338 assert_eq!(m.to_string(), "Method call Test");
339 c.reply(&m.header(), &("yay")).unwrap();
340
341 for _ in s {}
342
343 let val = server_thread.join().expect("failed to join server thread");
344 assert_eq!(val, "yay");
345
346 listener.wait();
348 loop {
350 let listener = c.monitor_activity();
351 if listener
352 .wait_timeout(std::time::Duration::from_millis(10))
353 .is_none()
354 {
355 break;
356 }
357 }
358 }
359}