zbus/connection/
builder.rs

1#[cfg(not(feature = "tokio"))]
2use async_io::Async;
3use enumflags2::BitFlags;
4use event_listener::Event;
5#[cfg(not(feature = "tokio"))]
6use std::net::TcpStream;
7#[cfg(all(unix, not(feature = "tokio")))]
8use std::os::unix::net::UnixStream;
9use std::{
10    collections::{HashMap, HashSet},
11    vec,
12};
13#[cfg(feature = "tokio")]
14use tokio::net::TcpStream;
15#[cfg(all(unix, feature = "tokio"))]
16use tokio::net::UnixStream;
17#[cfg(feature = "tokio-vsock")]
18use tokio_vsock::VsockStream;
19#[cfg(all(windows, not(feature = "tokio")))]
20use uds_windows::UnixStream;
21#[cfg(all(feature = "vsock", not(feature = "tokio")))]
22use vsock::VsockStream;
23
24use zvariant::ObjectPath;
25
26use crate::{
27    address::{self, Address},
28    fdo::RequestNameFlags,
29    names::{InterfaceName, WellKnownName},
30    object_server::{ArcInterface, Interface},
31    Connection, Error, Executor, Guid, OwnedGuid, Result,
32};
33
34use super::{
35    handshake::{AuthMechanism, Authenticated},
36    socket::{BoxedSplit, ReadHalf, Split, WriteHalf},
37};
38
39const DEFAULT_MAX_QUEUED: usize = 64;
40
41#[derive(Debug)]
42enum Target {
43    #[cfg(any(unix, not(feature = "tokio")))]
44    UnixStream(UnixStream),
45    TcpStream(TcpStream),
46    #[cfg(any(
47        all(feature = "vsock", not(feature = "tokio")),
48        feature = "tokio-vsock"
49    ))]
50    VsockStream(VsockStream),
51    Address(Address),
52    Socket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
53    AuthenticatedSocket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
54}
55
56type Interfaces<'a> = HashMap<ObjectPath<'a>, HashMap<InterfaceName<'static>, ArcInterface>>;
57
58/// A builder for [`zbus::Connection`].
59///
60/// The builder allows setting the flags [`RequestNameFlags::AllowReplacement`] and
61/// [`RequestNameFlags::ReplaceExisting`] when requesting names, but the flag
62/// [`RequestNameFlags::DoNotQueue`] will always be enabled. The reasons are:
63///
64/// 1. There is no indication given to the caller of [`Self::build`] that the name(s) request was
65///    enqueued and that the requested name might not be available right after building.
66///
67/// 2. The name may be acquired in between the time the name is requested and the
68///    [`crate::fdo::NameAcquiredStream`] is constructed. As a result the service can miss the
69///    [`crate::fdo::NameAcquired`] signal.
70#[derive(Debug)]
71#[must_use]
72pub struct Builder<'a> {
73    target: Option<Target>,
74    max_queued: Option<usize>,
75    // This is only set for p2p server case or pre-authenticated sockets.
76    guid: Option<Guid<'a>>,
77    #[cfg(feature = "p2p")]
78    p2p: bool,
79    internal_executor: bool,
80    interfaces: Interfaces<'a>,
81    names: HashSet<WellKnownName<'a>>,
82    auth_mechanism: Option<AuthMechanism>,
83    #[cfg(feature = "bus-impl")]
84    unique_name: Option<crate::names::UniqueName<'a>>,
85    request_name_flags: BitFlags<RequestNameFlags>,
86    method_timeout: Option<std::time::Duration>,
87}
88
89impl<'a> Builder<'a> {
90    /// Create a builder for the session/user message bus connection.
91    pub fn session() -> Result<Self> {
92        Ok(Self::new(Target::Address(Address::session()?)))
93    }
94
95    /// Create a builder for the system-wide message bus connection.
96    pub fn system() -> Result<Self> {
97        Ok(Self::new(Target::Address(Address::system()?)))
98    }
99
100    /// Create a builder for a connection that will use the given [D-Bus bus address].
101    ///
102    /// # Example
103    ///
104    /// Here is an example of connecting to an IBus service:
105    ///
106    /// ```no_run
107    /// # use std::error::Error;
108    /// # use zbus::connection::Builder;
109    /// # use zbus::block_on;
110    /// #
111    /// # block_on(async {
112    /// let addr = "unix:\
113    ///     path=/home/zeenix/.cache/ibus/dbus-ET0Xzrk9,\
114    ///     guid=fdd08e811a6c7ebe1fef0d9e647230da";
115    /// let conn = Builder::address(addr)?
116    ///     .build()
117    ///     .await?;
118    ///
119    /// // Do something useful with `conn`..
120    /// #     drop(conn);
121    /// #     Ok::<(), zbus::Error>(())
122    /// # }).unwrap();
123    /// #
124    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
125    /// ```
126    ///
127    /// **Note:** The IBus address is different for each session. You can find the address for your
128    /// current session using `ibus address` command.
129    ///
130    /// [D-Bus bus address]: https://dbus.freedesktop.org/doc/dbus-specification.html#addresses
131    pub fn address<A>(address: A) -> Result<Self>
132    where
133        A: TryInto<Address>,
134        A::Error: Into<Error>,
135    {
136        Ok(Self::new(Target::Address(
137            address.try_into().map_err(Into::into)?,
138        )))
139    }
140
141    /// Create a builder for a connection that will use the given unix stream.
142    ///
143    /// If the default `async-io` feature is disabled, this method will expect a
144    /// [`tokio::net::UnixStream`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html)
145    /// argument.
146    ///
147    /// Since tokio currently [does not support Unix domain sockets][tuds] on Windows, this method
148    /// is not available when the `tokio` feature is enabled and building for Windows target.
149    ///
150    /// [tuds]: https://github.com/tokio-rs/tokio/issues/2201
151    #[cfg(any(unix, not(feature = "tokio")))]
152    pub fn unix_stream(stream: UnixStream) -> Self {
153        Self::new(Target::UnixStream(stream))
154    }
155
156    /// Create a builder for a connection that will use the given TCP stream.
157    ///
158    /// If the default `async-io` feature is disabled, this method will expect a
159    /// [`tokio::net::TcpStream`](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html)
160    /// argument.
161    pub fn tcp_stream(stream: TcpStream) -> Self {
162        Self::new(Target::TcpStream(stream))
163    }
164
165    /// Create a builder for a connection that will use the given VSOCK stream.
166    ///
167    /// This method is only available when either `vsock` or `tokio-vsock` feature is enabled. The
168    /// type of `stream` is `vsock::VsockStream` with `vsock` feature and `tokio_vsock::VsockStream`
169    /// with `tokio-vsock` feature.
170    #[cfg(any(
171        all(feature = "vsock", not(feature = "tokio")),
172        feature = "tokio-vsock"
173    ))]
174    pub fn vsock_stream(stream: VsockStream) -> Self {
175        Self::new(Target::VsockStream(stream))
176    }
177
178    /// Create a builder for a connection that will use the given socket.
179    pub fn socket<S: Into<BoxedSplit>>(socket: S) -> Self {
180        Self::new(Target::Socket(socket.into()))
181    }
182
183    /// Create a builder for a connection that will use the given pre-authenticated socket.
184    ///
185    /// This is similar to [`Builder::socket`], except that the socket is either already
186    /// authenticated or does not require authentication.
187    pub fn authenticated_socket<S, G>(socket: S, guid: G) -> Result<Self>
188    where
189        S: Into<BoxedSplit>,
190        G: TryInto<Guid<'a>>,
191        G::Error: Into<Error>,
192    {
193        let mut builder = Self::new(Target::AuthenticatedSocket(socket.into()));
194        builder.guid = Some(guid.try_into().map_err(Into::into)?);
195
196        Ok(builder)
197    }
198
199    /// Specify the mechanism to use during authentication.
200    pub fn auth_mechanism(mut self, auth_mechanism: AuthMechanism) -> Self {
201        self.auth_mechanism = Some(auth_mechanism);
202
203        self
204    }
205
206    /// The to-be-created connection will be a peer-to-peer connection.
207    ///
208    /// This method is only available when the `p2p` feature is enabled.
209    #[cfg(feature = "p2p")]
210    pub fn p2p(mut self) -> Self {
211        self.p2p = true;
212
213        self
214    }
215
216    /// The to-be-created connection will be a server using the given GUID.
217    ///
218    /// The to-be-created connection will wait for incoming client authentication handshake and
219    /// negotiation messages, for peer-to-peer communications after successful creation.
220    ///
221    /// This method is only available when the `p2p` feature is enabled.
222    ///
223    /// **NOTE:** This method is redundant when using [`Builder::authenticated_socket`] since the
224    /// latter already sets the GUID for the connection and zbus doesn't differentiate between a
225    /// server and a client connection, except for authentication.
226    #[cfg(feature = "p2p")]
227    pub fn server<G>(mut self, guid: G) -> Result<Self>
228    where
229        G: TryInto<Guid<'a>>,
230        G::Error: Into<Error>,
231    {
232        self.guid = Some(guid.try_into().map_err(Into::into)?);
233
234        Ok(self)
235    }
236
237    /// Set the capacity of the main (unfiltered) queue.
238    ///
239    /// Since typically you'd want to set this at instantiation time, you can set it through the
240    /// builder.
241    ///
242    /// # Example
243    ///
244    /// ```
245    /// # use std::error::Error;
246    /// # use zbus::connection::Builder;
247    /// # use zbus::block_on;
248    /// #
249    /// # block_on(async {
250    /// let conn = Builder::session()?
251    ///     .max_queued(30)
252    ///     .build()
253    ///     .await?;
254    /// assert_eq!(conn.max_queued(), 30);
255    ///
256    /// #     Ok::<(), zbus::Error>(())
257    /// # }).unwrap();
258    /// #
259    /// // Do something useful with `conn`..
260    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
261    /// ```
262    pub fn max_queued(mut self, max: usize) -> Self {
263        self.max_queued = Some(max);
264
265        self
266    }
267
268    /// Enable or disable the internal executor thread.
269    ///
270    /// The thread is enabled by default.
271    ///
272    /// See [Connection::executor] for more details.
273    pub fn internal_executor(mut self, enabled: bool) -> Self {
274        self.internal_executor = enabled;
275
276        self
277    }
278
279    /// Register a D-Bus [`Interface`] to be served at a given path.
280    ///
281    /// This is similar to [`zbus::ObjectServer::at`], except that it allows you to have your
282    /// interfaces available immediately after the connection is established. Typically, this is
283    /// exactly what you'd want. Also in contrast to [`zbus::ObjectServer::at`], this method will
284    /// replace any previously added interface with the same name at the same path.
285    ///
286    /// Standard interfaces (Peer, Introspectable, Properties) are added on your behalf. If you
287    /// attempt to add yours, [`Builder::build()`] will fail.
288    pub fn serve_at<P, I>(mut self, path: P, iface: I) -> Result<Self>
289    where
290        I: Interface,
291        P: TryInto<ObjectPath<'a>>,
292        P::Error: Into<Error>,
293    {
294        let path = path.try_into().map_err(Into::into)?;
295        let entry = self.interfaces.entry(path).or_default();
296        entry.insert(I::name(), ArcInterface::new(iface));
297        Ok(self)
298    }
299
300    /// Register a well-known name for this connection on the bus.
301    ///
302    /// This is similar to [`zbus::Connection::request_name`], except the name is requested as part
303    /// of the connection setup ([`Builder::build`]), immediately after interfaces
304    /// registered (through [`Builder::serve_at`]) are advertised. Typically this is
305    /// exactly what you want.
306    ///
307    /// The methods [`Builder::allow_name_replacements`] and [`Builder::replace_existing_names`]
308    /// allow to set the [`zbus::fdo::RequestNameFlags`] used to request the name.
309    pub fn name<W>(mut self, well_known_name: W) -> Result<Self>
310    where
311        W: TryInto<WellKnownName<'a>>,
312        W::Error: Into<Error>,
313    {
314        let well_known_name = well_known_name.try_into().map_err(Into::into)?;
315        self.names.insert(well_known_name);
316
317        Ok(self)
318    }
319
320    /// Whether the [`zbus::fdo::RequestNameFlags::AllowReplacement`] flag will be set when
321    /// requesting names.
322    pub fn allow_name_replacements(mut self, allow_replacement: bool) -> Self {
323        self.request_name_flags
324            .set(RequestNameFlags::AllowReplacement, allow_replacement);
325        self
326    }
327
328    /// Whether the [`zbus::fdo::RequestNameFlags::ReplaceExisting`] flag will be set when
329    /// requesting names.
330    pub fn replace_existing_names(mut self, replace_existing: bool) -> Self {
331        self.request_name_flags
332            .set(RequestNameFlags::ReplaceExisting, replace_existing);
333        self
334    }
335
336    /// Set the unique name of the connection.
337    ///
338    /// This is mainly provided for bus implementations. All other users should not need to use this
339    /// method. Hence why this method is only available when the `bus-impl` feature is enabled.
340    ///
341    /// # Panics
342    ///
343    /// It will panic if the connection is to a message bus as it's the bus that assigns
344    /// peers their unique names.
345    #[cfg(feature = "bus-impl")]
346    pub fn unique_name<U>(mut self, unique_name: U) -> Result<Self>
347    where
348        U: TryInto<crate::names::UniqueName<'a>>,
349        U::Error: Into<Error>,
350    {
351        if !self.p2p {
352            panic!("unique name can only be set for peer-to-peer connections");
353        }
354        let name = unique_name.try_into().map_err(Into::into)?;
355        self.unique_name = Some(name);
356
357        Ok(self)
358    }
359
360    /// Set a timeout for method calls.
361    ///
362    /// Method calls will return
363    /// `zbus::Error::InputOutput(std::io::Error(kind: ErrorKind::TimedOut))` if a client does not
364    /// receive an answer from a service in time.
365    pub fn method_timeout(mut self, timeout: std::time::Duration) -> Self {
366        self.method_timeout = Some(timeout);
367
368        self
369    }
370
371    /// Build the connection, consuming the builder.
372    ///
373    /// # Errors
374    ///
375    /// Until server-side bus connection is supported, attempting to build such a connection will
376    /// result in a [`Error::Unsupported`] error.
377    pub async fn build(self) -> Result<Connection> {
378        let executor = Executor::new();
379        #[cfg(not(feature = "tokio"))]
380        let internal_executor = self.internal_executor;
381        // Box the future as it's large and can cause stack overflow.
382        let conn = Box::pin(executor.run(self.build_(executor.clone()))).await?;
383
384        #[cfg(not(feature = "tokio"))]
385        start_internal_executor(&executor, internal_executor)?;
386
387        Ok(conn)
388    }
389
390    async fn build_(mut self, executor: Executor<'static>) -> Result<Connection> {
391        #[cfg(feature = "p2p")]
392        let is_bus_conn = !self.p2p;
393        #[cfg(not(feature = "p2p"))]
394        let is_bus_conn = true;
395
396        let mut auth = self.connect(is_bus_conn).await?;
397
398        // SAFETY: `Authenticated` is always built with these fields set to `Some`.
399        let socket_read = auth.socket_read.take().unwrap();
400        let already_received_bytes = auth.already_received_bytes.drain(..).collect();
401        #[cfg(unix)]
402        let already_received_fds = auth.already_received_fds.drain(..).collect();
403
404        let mut conn = Connection::new(auth, is_bus_conn, executor, self.method_timeout).await?;
405        conn.set_max_queued(self.max_queued.unwrap_or(DEFAULT_MAX_QUEUED));
406
407        if !self.interfaces.is_empty() {
408            let object_server = conn.ensure_object_server(false);
409            for (path, interfaces) in self.interfaces {
410                for (name, iface) in interfaces {
411                    let added = object_server
412                        .add_arc_interface(path.clone(), name.clone(), iface.clone())
413                        .await?;
414                    if !added {
415                        return Err(Error::InterfaceExists(name.clone(), path.to_owned()));
416                    }
417                }
418            }
419
420            let started_event = Event::new();
421            let listener = started_event.listen();
422            conn.start_object_server(Some(started_event));
423
424            listener.await;
425        }
426
427        // Start the socket reader task.
428        conn.init_socket_reader(
429            socket_read,
430            already_received_bytes,
431            #[cfg(unix)]
432            already_received_fds,
433        );
434
435        for name in self.names {
436            conn.request_name_with_flags(name, self.request_name_flags)
437                .await?;
438        }
439
440        Ok(conn)
441    }
442
443    fn new(target: Target) -> Self {
444        Self {
445            target: Some(target),
446            #[cfg(feature = "p2p")]
447            p2p: false,
448            max_queued: None,
449            guid: None,
450            internal_executor: true,
451            interfaces: HashMap::new(),
452            names: HashSet::new(),
453            auth_mechanism: None,
454            #[cfg(feature = "bus-impl")]
455            unique_name: None,
456            request_name_flags: BitFlags::default(),
457            method_timeout: None,
458        }
459    }
460
461    async fn connect(&mut self, is_bus_conn: bool) -> Result<Authenticated> {
462        #[cfg(not(feature = "bus-impl"))]
463        let unique_name = None;
464        #[cfg(feature = "bus-impl")]
465        let unique_name = self.unique_name.take().map(Into::into);
466
467        #[allow(unused_mut)]
468        let (mut stream, server_guid, authenticated) = self.target_connect().await?;
469        if authenticated {
470            let (socket_read, socket_write) = stream.take();
471            Ok(Authenticated {
472                #[cfg(unix)]
473                cap_unix_fd: socket_read.can_pass_unix_fd(),
474                socket_read: Some(socket_read),
475                socket_write,
476                // SAFETY: `server_guid` is provided as arg of `Builder::authenticated_socket`.
477                server_guid: server_guid.unwrap(),
478                already_received_bytes: vec![],
479                unique_name,
480                #[cfg(unix)]
481                already_received_fds: vec![],
482            })
483        } else {
484            #[cfg(feature = "p2p")]
485            match self.guid.take() {
486                None => {
487                    // SASL Handshake
488                    Authenticated::client(stream, server_guid, self.auth_mechanism, is_bus_conn)
489                        .await
490                }
491                Some(guid) => {
492                    if !self.p2p {
493                        return Err(Error::Unsupported);
494                    }
495
496                    let creds = stream.read_mut().peer_credentials().await?;
497                    #[cfg(unix)]
498                    let client_uid = creds.unix_user_id();
499                    #[cfg(windows)]
500                    let client_sid = creds.into_windows_sid();
501
502                    Authenticated::server(
503                        stream,
504                        guid.to_owned().into(),
505                        #[cfg(unix)]
506                        client_uid,
507                        #[cfg(windows)]
508                        client_sid,
509                        self.auth_mechanism,
510                        unique_name,
511                    )
512                    .await
513                }
514            }
515
516            #[cfg(not(feature = "p2p"))]
517            Authenticated::client(stream, server_guid, self.auth_mechanism, is_bus_conn).await
518        }
519    }
520
521    async fn target_connect(&mut self) -> Result<(BoxedSplit, Option<OwnedGuid>, bool)> {
522        let mut authenticated = false;
523        let mut guid = None;
524        // SAFETY: `self.target` is always `Some` from the beginning and this method is only called
525        // once.
526        let split = match self.target.take().unwrap() {
527            #[cfg(not(feature = "tokio"))]
528            Target::UnixStream(stream) => Async::new(stream)?.into(),
529            #[cfg(all(unix, feature = "tokio"))]
530            Target::UnixStream(stream) => stream.into(),
531            #[cfg(not(feature = "tokio"))]
532            Target::TcpStream(stream) => Async::new(stream)?.into(),
533            #[cfg(feature = "tokio")]
534            Target::TcpStream(stream) => stream.into(),
535            #[cfg(all(feature = "vsock", not(feature = "tokio")))]
536            Target::VsockStream(stream) => Async::new(stream)?.into(),
537            #[cfg(feature = "tokio-vsock")]
538            Target::VsockStream(stream) => stream.into(),
539            Target::Address(address) => {
540                guid = address.guid().map(|g| g.to_owned().into());
541                match address.connect().await? {
542                    #[cfg(any(unix, not(feature = "tokio")))]
543                    address::transport::Stream::Unix(stream) => stream.into(),
544                    #[cfg(unix)]
545                    address::transport::Stream::Unixexec(stream) => stream.into(),
546                    address::transport::Stream::Tcp(stream) => stream.into(),
547                    #[cfg(any(
548                        all(feature = "vsock", not(feature = "tokio")),
549                        feature = "tokio-vsock"
550                    ))]
551                    address::transport::Stream::Vsock(stream) => stream.into(),
552                }
553            }
554            Target::Socket(stream) => stream,
555            Target::AuthenticatedSocket(stream) => {
556                authenticated = true;
557                guid = self.guid.take().map(Into::into);
558                stream
559            }
560        };
561
562        Ok((split, guid, authenticated))
563    }
564}
565
566/// Start the internal executor thread.
567///
568/// Returns a dummy task that keep the executor ticking thread from exiting due to absence of any
569/// tasks until socket reader task kicks in.
570#[cfg(not(feature = "tokio"))]
571fn start_internal_executor(executor: &Executor<'static>, internal_executor: bool) -> Result<()> {
572    if internal_executor {
573        let executor = executor.clone();
574        std::thread::Builder::new()
575            .name("zbus::Connection executor".into())
576            .spawn(move || {
577                crate::utils::block_on(async move {
578                    // Run as long as there is a task to run.
579                    while !executor.is_empty() {
580                        executor.tick().await;
581                    }
582                })
583            })?;
584    }
585
586    Ok(())
587}