zbus/connection/
builder.rs

1#[cfg(not(feature = "tokio"))]
2use async_io::Async;
3use enumflags2::BitFlags;
4use event_listener::Event;
5#[cfg(not(feature = "tokio"))]
6use std::net::TcpStream;
7#[cfg(all(unix, not(feature = "tokio")))]
8use std::os::unix::net::UnixStream;
9use std::{
10    collections::{HashMap, HashSet},
11    vec,
12};
13#[cfg(feature = "tokio")]
14use tokio::net::TcpStream;
15#[cfg(all(unix, feature = "tokio"))]
16use tokio::net::UnixStream;
17#[cfg(feature = "tokio-vsock")]
18use tokio_vsock::VsockStream;
19#[cfg(all(windows, not(feature = "tokio")))]
20use uds_windows::UnixStream;
21#[cfg(all(feature = "vsock", not(feature = "tokio")))]
22use vsock::VsockStream;
23
24use zvariant::ObjectPath;
25
26use crate::{
27    address::{self, Address},
28    fdo::RequestNameFlags,
29    names::{InterfaceName, WellKnownName},
30    object_server::{ArcInterface, Interface},
31    Connection, Error, Executor, Guid, OwnedGuid, Result,
32};
33
34use super::{
35    handshake::{AuthMechanism, Authenticated},
36    socket::{BoxedSplit, ReadHalf, Split, WriteHalf},
37};
38
39const DEFAULT_MAX_QUEUED: usize = 64;
40
41#[derive(Debug)]
42enum Target {
43    #[cfg(any(unix, not(feature = "tokio")))]
44    UnixStream(UnixStream),
45    TcpStream(TcpStream),
46    #[cfg(any(
47        all(feature = "vsock", not(feature = "tokio")),
48        feature = "tokio-vsock"
49    ))]
50    VsockStream(VsockStream),
51    Address(Address),
52    Socket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
53    AuthenticatedSocket(Split<Box<dyn ReadHalf>, Box<dyn WriteHalf>>),
54}
55
56type Interfaces<'a> = HashMap<ObjectPath<'a>, HashMap<InterfaceName<'static>, ArcInterface>>;
57
58/// A builder for [`zbus::Connection`].
59///
60/// The builder allows setting the flags [`RequestNameFlags::AllowReplacement`] and
61/// [`RequestNameFlags::ReplaceExisting`] when requesting names, but the flag
62/// [`RequestNameFlags::DoNotQueue`] will always be enabled. The reasons are:
63///
64/// 1. There is no indication given to the caller of [`Self::build`] that the name(s) request was
65///    enqueued and that the requested name might not be available right after building.
66///
67/// 2. The name may be acquired in between the time the name is requested and the
68///    [`crate::fdo::NameAcquiredStream`] is constructed. As a result the service can miss the
69///    [`crate::fdo::NameAcquired`] signal.
70#[derive(Debug)]
71#[must_use]
72pub struct Builder<'a> {
73    target: Option<Target>,
74    max_queued: Option<usize>,
75    // This is only set for p2p server case or pre-authenticated sockets.
76    guid: Option<Guid<'a>>,
77    #[cfg(feature = "p2p")]
78    p2p: bool,
79    internal_executor: bool,
80    interfaces: Interfaces<'a>,
81    names: HashSet<WellKnownName<'a>>,
82    auth_mechanism: Option<AuthMechanism>,
83    #[cfg(feature = "bus-impl")]
84    unique_name: Option<crate::names::UniqueName<'a>>,
85    request_name_flags: BitFlags<RequestNameFlags>,
86}
87
88impl<'a> Builder<'a> {
89    /// Create a builder for the session/user message bus connection.
90    pub fn session() -> Result<Self> {
91        Ok(Self::new(Target::Address(Address::session()?)))
92    }
93
94    /// Create a builder for the system-wide message bus connection.
95    pub fn system() -> Result<Self> {
96        Ok(Self::new(Target::Address(Address::system()?)))
97    }
98
99    /// Create a builder for a connection that will use the given [D-Bus bus address].
100    ///
101    /// # Example
102    ///
103    /// Here is an example of connecting to an IBus service:
104    ///
105    /// ```no_run
106    /// # use std::error::Error;
107    /// # use zbus::connection::Builder;
108    /// # use zbus::block_on;
109    /// #
110    /// # block_on(async {
111    /// let addr = "unix:\
112    ///     path=/home/zeenix/.cache/ibus/dbus-ET0Xzrk9,\
113    ///     guid=fdd08e811a6c7ebe1fef0d9e647230da";
114    /// let conn = Builder::address(addr)?
115    ///     .build()
116    ///     .await?;
117    ///
118    /// // Do something useful with `conn`..
119    /// #     drop(conn);
120    /// #     Ok::<(), zbus::Error>(())
121    /// # }).unwrap();
122    /// #
123    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
124    /// ```
125    ///
126    /// **Note:** The IBus address is different for each session. You can find the address for your
127    /// current session using `ibus address` command.
128    ///
129    /// [D-Bus bus address]: https://dbus.freedesktop.org/doc/dbus-specification.html#addresses
130    pub fn address<A>(address: A) -> Result<Self>
131    where
132        A: TryInto<Address>,
133        A::Error: Into<Error>,
134    {
135        Ok(Self::new(Target::Address(
136            address.try_into().map_err(Into::into)?,
137        )))
138    }
139
140    /// Create a builder for a connection that will use the given unix stream.
141    ///
142    /// If the default `async-io` feature is disabled, this method will expect a
143    /// [`tokio::net::UnixStream`](https://docs.rs/tokio/latest/tokio/net/struct.UnixStream.html)
144    /// argument.
145    ///
146    /// Since tokio currently [does not support Unix domain sockets][tuds] on Windows, this method
147    /// is not available when the `tokio` feature is enabled and building for Windows target.
148    ///
149    /// [tuds]: https://github.com/tokio-rs/tokio/issues/2201
150    #[cfg(any(unix, not(feature = "tokio")))]
151    pub fn unix_stream(stream: UnixStream) -> Self {
152        Self::new(Target::UnixStream(stream))
153    }
154
155    /// Create a builder for a connection that will use the given TCP stream.
156    ///
157    /// If the default `async-io` feature is disabled, this method will expect a
158    /// [`tokio::net::TcpStream`](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html)
159    /// argument.
160    pub fn tcp_stream(stream: TcpStream) -> Self {
161        Self::new(Target::TcpStream(stream))
162    }
163
164    /// Create a builder for a connection that will use the given VSOCK stream.
165    ///
166    /// This method is only available when either `vsock` or `tokio-vsock` feature is enabled. The
167    /// type of `stream` is `vsock::VsockStream` with `vsock` feature and `tokio_vsock::VsockStream`
168    /// with `tokio-vsock` feature.
169    #[cfg(any(
170        all(feature = "vsock", not(feature = "tokio")),
171        feature = "tokio-vsock"
172    ))]
173    pub fn vsock_stream(stream: VsockStream) -> Self {
174        Self::new(Target::VsockStream(stream))
175    }
176
177    /// Create a builder for a connection that will use the given socket.
178    pub fn socket<S: Into<BoxedSplit>>(socket: S) -> Self {
179        Self::new(Target::Socket(socket.into()))
180    }
181
182    /// Create a builder for a connection that will use the given pre-authenticated socket.
183    ///
184    /// This is similar to [`Builder::socket`], except that the socket is either already
185    /// authenticated or does not require authentication.
186    pub fn authenticated_socket<S, G>(socket: S, guid: G) -> Result<Self>
187    where
188        S: Into<BoxedSplit>,
189        G: TryInto<Guid<'a>>,
190        G::Error: Into<Error>,
191    {
192        let mut builder = Self::new(Target::AuthenticatedSocket(socket.into()));
193        builder.guid = Some(guid.try_into().map_err(Into::into)?);
194
195        Ok(builder)
196    }
197
198    /// Specify the mechanism to use during authentication.
199    pub fn auth_mechanism(mut self, auth_mechanism: AuthMechanism) -> Self {
200        self.auth_mechanism = Some(auth_mechanism);
201
202        self
203    }
204
205    /// The to-be-created connection will be a peer-to-peer connection.
206    ///
207    /// This method is only available when the `p2p` feature is enabled.
208    #[cfg(feature = "p2p")]
209    pub fn p2p(mut self) -> Self {
210        self.p2p = true;
211
212        self
213    }
214
215    /// The to-be-created connection will be a server using the given GUID.
216    ///
217    /// The to-be-created connection will wait for incoming client authentication handshake and
218    /// negotiation messages, for peer-to-peer communications after successful creation.
219    ///
220    /// This method is only available when the `p2p` feature is enabled.
221    ///
222    /// **NOTE:** This method is redundant when using [`Builder::authenticated_socket`] since the
223    /// latter already sets the GUID for the connection and zbus doesn't differentiate between a
224    /// server and a client connection, except for authentication.
225    #[cfg(feature = "p2p")]
226    pub fn server<G>(mut self, guid: G) -> Result<Self>
227    where
228        G: TryInto<Guid<'a>>,
229        G::Error: Into<Error>,
230    {
231        self.guid = Some(guid.try_into().map_err(Into::into)?);
232
233        Ok(self)
234    }
235
236    /// Set the capacity of the main (unfiltered) queue.
237    ///
238    /// Since typically you'd want to set this at instantiation time, you can set it through the
239    /// builder.
240    ///
241    /// # Example
242    ///
243    /// ```
244    /// # use std::error::Error;
245    /// # use zbus::connection::Builder;
246    /// # use zbus::block_on;
247    /// #
248    /// # block_on(async {
249    /// let conn = Builder::session()?
250    ///     .max_queued(30)
251    ///     .build()
252    ///     .await?;
253    /// assert_eq!(conn.max_queued(), 30);
254    ///
255    /// #     Ok::<(), zbus::Error>(())
256    /// # }).unwrap();
257    /// #
258    /// // Do something useful with `conn`..
259    /// # Ok::<_, Box<dyn Error + Send + Sync>>(())
260    /// ```
261    pub fn max_queued(mut self, max: usize) -> Self {
262        self.max_queued = Some(max);
263
264        self
265    }
266
267    /// Enable or disable the internal executor thread.
268    ///
269    /// The thread is enabled by default.
270    ///
271    /// See [Connection::executor] for more details.
272    pub fn internal_executor(mut self, enabled: bool) -> Self {
273        self.internal_executor = enabled;
274
275        self
276    }
277
278    /// Register a D-Bus [`Interface`] to be served at a given path.
279    ///
280    /// This is similar to [`zbus::ObjectServer::at`], except that it allows you to have your
281    /// interfaces available immediately after the connection is established. Typically, this is
282    /// exactly what you'd want. Also in contrast to [`zbus::ObjectServer::at`], this method will
283    /// replace any previously added interface with the same name at the same path.
284    ///
285    /// Standard interfaces (Peer, Introspectable, Properties) are added on your behalf. If you
286    /// attempt to add yours, [`Builder::build()`] will fail.
287    pub fn serve_at<P, I>(mut self, path: P, iface: I) -> Result<Self>
288    where
289        I: Interface,
290        P: TryInto<ObjectPath<'a>>,
291        P::Error: Into<Error>,
292    {
293        let path = path.try_into().map_err(Into::into)?;
294        let entry = self.interfaces.entry(path).or_default();
295        entry.insert(I::name(), ArcInterface::new(iface));
296        Ok(self)
297    }
298
299    /// Register a well-known name for this connection on the bus.
300    ///
301    /// This is similar to [`zbus::Connection::request_name`], except the name is requested as part
302    /// of the connection setup ([`Builder::build`]), immediately after interfaces
303    /// registered (through [`Builder::serve_at`]) are advertised. Typically this is
304    /// exactly what you want.
305    ///
306    /// The methods [`Builder::allow_name_replacements`] and [`Builder::replace_existing_names`]
307    /// allow to set the [`zbus::fdo::RequestNameFlags`] used to request the name.
308    pub fn name<W>(mut self, well_known_name: W) -> Result<Self>
309    where
310        W: TryInto<WellKnownName<'a>>,
311        W::Error: Into<Error>,
312    {
313        let well_known_name = well_known_name.try_into().map_err(Into::into)?;
314        self.names.insert(well_known_name);
315
316        Ok(self)
317    }
318
319    /// Whether the [`zbus::fdo::RequestNameFlags::AllowReplacement`] flag will be set when
320    /// requesting names.
321    pub fn allow_name_replacements(mut self, allow_replacement: bool) -> Self {
322        self.request_name_flags
323            .set(RequestNameFlags::AllowReplacement, allow_replacement);
324        self
325    }
326
327    /// Whether the [`zbus::fdo::RequestNameFlags::ReplaceExisting`] flag will be set when
328    /// requesting names.
329    pub fn replace_existing_names(mut self, replace_existing: bool) -> Self {
330        self.request_name_flags
331            .set(RequestNameFlags::ReplaceExisting, replace_existing);
332        self
333    }
334
335    /// Set the unique name of the connection.
336    ///
337    /// This is mainly provided for bus implementations. All other users should not need to use this
338    /// method. Hence why this method is only available when the `bus-impl` feature is enabled.
339    ///
340    /// # Panics
341    ///
342    /// It will panic if the connection is to a message bus as it's the bus that assigns
343    /// peers their unique names.
344    #[cfg(feature = "bus-impl")]
345    pub fn unique_name<U>(mut self, unique_name: U) -> Result<Self>
346    where
347        U: TryInto<crate::names::UniqueName<'a>>,
348        U::Error: Into<Error>,
349    {
350        if !self.p2p {
351            panic!("unique name can only be set for peer-to-peer connections");
352        }
353        let name = unique_name.try_into().map_err(Into::into)?;
354        self.unique_name = Some(name);
355
356        Ok(self)
357    }
358
359    /// Build the connection, consuming the builder.
360    ///
361    /// # Errors
362    ///
363    /// Until server-side bus connection is supported, attempting to build such a connection will
364    /// result in a [`Error::Unsupported`] error.
365    pub async fn build(self) -> Result<Connection> {
366        let executor = Executor::new();
367        #[cfg(not(feature = "tokio"))]
368        let internal_executor = self.internal_executor;
369        // Box the future as it's large and can cause stack overflow.
370        let conn = Box::pin(executor.run(self.build_(executor.clone()))).await?;
371
372        #[cfg(not(feature = "tokio"))]
373        start_internal_executor(&executor, internal_executor)?;
374
375        Ok(conn)
376    }
377
378    async fn build_(mut self, executor: Executor<'static>) -> Result<Connection> {
379        #[cfg(feature = "p2p")]
380        let is_bus_conn = !self.p2p;
381        #[cfg(not(feature = "p2p"))]
382        let is_bus_conn = true;
383
384        let mut auth = self.connect(is_bus_conn).await?;
385
386        // SAFETY: `Authenticated` is always built with these fields set to `Some`.
387        let socket_read = auth.socket_read.take().unwrap();
388        let already_received_bytes = auth.already_received_bytes.drain(..).collect();
389        #[cfg(unix)]
390        let already_received_fds = auth.already_received_fds.drain(..).collect();
391
392        let mut conn = Connection::new(auth, is_bus_conn, executor).await?;
393        conn.set_max_queued(self.max_queued.unwrap_or(DEFAULT_MAX_QUEUED));
394
395        if !self.interfaces.is_empty() {
396            let object_server = conn.ensure_object_server(false);
397            for (path, interfaces) in self.interfaces {
398                for (name, iface) in interfaces {
399                    let added = object_server
400                        .add_arc_interface(path.clone(), name.clone(), iface.clone())
401                        .await?;
402                    if !added {
403                        return Err(Error::InterfaceExists(name.clone(), path.to_owned()));
404                    }
405                }
406            }
407
408            let started_event = Event::new();
409            let listener = started_event.listen();
410            conn.start_object_server(Some(started_event));
411
412            listener.await;
413        }
414
415        // Start the socket reader task.
416        conn.init_socket_reader(
417            socket_read,
418            already_received_bytes,
419            #[cfg(unix)]
420            already_received_fds,
421        );
422
423        for name in self.names {
424            conn.request_name_with_flags(name, self.request_name_flags)
425                .await?;
426        }
427
428        Ok(conn)
429    }
430
431    fn new(target: Target) -> Self {
432        Self {
433            target: Some(target),
434            #[cfg(feature = "p2p")]
435            p2p: false,
436            max_queued: None,
437            guid: None,
438            internal_executor: true,
439            interfaces: HashMap::new(),
440            names: HashSet::new(),
441            auth_mechanism: None,
442            #[cfg(feature = "bus-impl")]
443            unique_name: None,
444            request_name_flags: BitFlags::default(),
445        }
446    }
447
448    async fn connect(&mut self, is_bus_conn: bool) -> Result<Authenticated> {
449        #[cfg(not(feature = "bus-impl"))]
450        let unique_name = None;
451        #[cfg(feature = "bus-impl")]
452        let unique_name = self.unique_name.take().map(Into::into);
453
454        #[allow(unused_mut)]
455        let (mut stream, server_guid, authenticated) = self.target_connect().await?;
456        if authenticated {
457            let (socket_read, socket_write) = stream.take();
458            Ok(Authenticated {
459                #[cfg(unix)]
460                cap_unix_fd: socket_read.can_pass_unix_fd(),
461                socket_read: Some(socket_read),
462                socket_write,
463                // SAFETY: `server_guid` is provided as arg of `Builder::authenticated_socket`.
464                server_guid: server_guid.unwrap(),
465                already_received_bytes: vec![],
466                unique_name,
467                #[cfg(unix)]
468                already_received_fds: vec![],
469            })
470        } else {
471            #[cfg(feature = "p2p")]
472            match self.guid.take() {
473                None => {
474                    // SASL Handshake
475                    Authenticated::client(stream, server_guid, self.auth_mechanism, is_bus_conn)
476                        .await
477                }
478                Some(guid) => {
479                    if !self.p2p {
480                        return Err(Error::Unsupported);
481                    }
482
483                    let creds = stream.read_mut().peer_credentials().await?;
484                    #[cfg(unix)]
485                    let client_uid = creds.unix_user_id();
486                    #[cfg(windows)]
487                    let client_sid = creds.into_windows_sid();
488
489                    Authenticated::server(
490                        stream,
491                        guid.to_owned().into(),
492                        #[cfg(unix)]
493                        client_uid,
494                        #[cfg(windows)]
495                        client_sid,
496                        self.auth_mechanism,
497                        unique_name,
498                    )
499                    .await
500                }
501            }
502
503            #[cfg(not(feature = "p2p"))]
504            Authenticated::client(stream, server_guid, self.auth_mechanism, is_bus_conn).await
505        }
506    }
507
508    async fn target_connect(&mut self) -> Result<(BoxedSplit, Option<OwnedGuid>, bool)> {
509        let mut authenticated = false;
510        let mut guid = None;
511        // SAFETY: `self.target` is always `Some` from the beginning and this method is only called
512        // once.
513        let split = match self.target.take().unwrap() {
514            #[cfg(not(feature = "tokio"))]
515            Target::UnixStream(stream) => Async::new(stream)?.into(),
516            #[cfg(all(unix, feature = "tokio"))]
517            Target::UnixStream(stream) => stream.into(),
518            #[cfg(not(feature = "tokio"))]
519            Target::TcpStream(stream) => Async::new(stream)?.into(),
520            #[cfg(feature = "tokio")]
521            Target::TcpStream(stream) => stream.into(),
522            #[cfg(all(feature = "vsock", not(feature = "tokio")))]
523            Target::VsockStream(stream) => Async::new(stream)?.into(),
524            #[cfg(feature = "tokio-vsock")]
525            Target::VsockStream(stream) => stream.into(),
526            Target::Address(address) => {
527                guid = address.guid().map(|g| g.to_owned().into());
528                match address.connect().await? {
529                    #[cfg(any(unix, not(feature = "tokio")))]
530                    address::transport::Stream::Unix(stream) => stream.into(),
531                    #[cfg(unix)]
532                    address::transport::Stream::Unixexec(stream) => stream.into(),
533                    address::transport::Stream::Tcp(stream) => stream.into(),
534                    #[cfg(any(
535                        all(feature = "vsock", not(feature = "tokio")),
536                        feature = "tokio-vsock"
537                    ))]
538                    address::transport::Stream::Vsock(stream) => stream.into(),
539                }
540            }
541            Target::Socket(stream) => stream,
542            Target::AuthenticatedSocket(stream) => {
543                authenticated = true;
544                guid = self.guid.take().map(Into::into);
545                stream
546            }
547        };
548
549        Ok((split, guid, authenticated))
550    }
551}
552
553/// Start the internal executor thread.
554///
555/// Returns a dummy task that keep the executor ticking thread from exiting due to absence of any
556/// tasks until socket reader task kicks in.
557#[cfg(not(feature = "tokio"))]
558fn start_internal_executor(executor: &Executor<'static>, internal_executor: bool) -> Result<()> {
559    if internal_executor {
560        let executor = executor.clone();
561        std::thread::Builder::new()
562            .name("zbus::Connection executor".into())
563            .spawn(move || {
564                crate::utils::block_on(async move {
565                    // Run as long as there is a task to run.
566                    while !executor.is_empty() {
567                        executor.tick().await;
568                    }
569                })
570            })?;
571    }
572
573    Ok(())
574}