zbus/
connection.rs

1use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender as Broadcaster};
2use enumflags2::BitFlags;
3use event_listener::{Event, EventListener};
4use once_cell::sync::OnceCell;
5use ordered_stream::{OrderedFuture, OrderedStream, PollResult};
6use static_assertions::assert_impl_all;
7use std::{
8    collections::HashMap,
9    convert::TryInto,
10    io::{self, ErrorKind},
11    ops::Deref,
12    pin::Pin,
13    sync::{
14        self,
15        atomic::{AtomicU32, Ordering::SeqCst},
16        Arc, Weak,
17    },
18    task::{Context, Poll},
19};
20use tracing::{debug, info_span, instrument, trace, trace_span, warn, Instrument};
21use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName};
22use zvariant::ObjectPath;
23
24use futures_core::{ready, Future};
25use futures_sink::Sink;
26use futures_util::{sink::SinkExt, StreamExt};
27
28use crate::{
29    async_lock::Mutex,
30    blocking,
31    fdo::{self, ConnectionCredentials, RequestNameFlags, RequestNameReply},
32    raw::{Connection as RawConnection, Socket},
33    socket_reader::SocketReader,
34    Authenticated, CacheProperties, ConnectionBuilder, DBusError, Error, Executor, Guid, MatchRule,
35    Message, MessageBuilder, MessageFlags, MessageStream, MessageType, ObjectServer,
36    OwnedMatchRule, Result, Task,
37};
38
39const DEFAULT_MAX_QUEUED: usize = 64;
40const DEFAULT_MAX_METHOD_RETURN_QUEUED: usize = 8;
41
42/// Inner state shared by Connection and WeakConnection
43#[derive(Debug)]
44pub(crate) struct ConnectionInner {
45    server_guid: Guid,
46    #[cfg(unix)]
47    cap_unix_fd: bool,
48    bus_conn: bool,
49    unique_name: OnceCell<OwnedUniqueName>,
50    registered_names: Mutex<HashMap<WellKnownName<'static>, NameStatus>>,
51
52    raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>,
53
54    // Serial number for next outgoing message
55    serial: AtomicU32,
56
57    // Our executor
58    executor: Executor<'static>,
59
60    // Socket reader task
61    #[allow(unused)]
62    socket_reader_task: OnceCell<Task<()>>,
63
64    pub(crate) msg_receiver: InactiveReceiver<Result<Arc<Message>>>,
65    pub(crate) method_return_receiver: InactiveReceiver<Result<Arc<Message>>>,
66    msg_senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>,
67
68    subscriptions: Mutex<Subscriptions>,
69
70    object_server: OnceCell<blocking::ObjectServer>,
71    object_server_dispatch_task: OnceCell<Task<()>>,
72}
73
74type Subscriptions = HashMap<OwnedMatchRule, (u64, InactiveReceiver<Result<Arc<Message>>>)>;
75
76pub(crate) type MsgBroadcaster = Broadcaster<Result<Arc<Message>>>;
77
78/// A D-Bus connection.
79///
80/// A connection to a D-Bus bus, or a direct peer.
81///
82/// Once created, the connection is authenticated and negotiated and messages can be sent or
83/// received, such as [method calls] or [signals].
84///
85/// For higher-level message handling (typed functions, introspection, documentation reasons etc),
86/// it is recommended to wrap the low-level D-Bus messages into Rust functions with the
87/// [`dbus_proxy`] and [`dbus_interface`] macros instead of doing it directly on a `Connection`.
88///
89/// Typically, a connection is made to the session bus with [`Connection::session`], or to the
90/// system bus with [`Connection::system`]. Then the connection is used with [`crate::Proxy`]
91/// instances or the on-demand [`ObjectServer`] instance that can be accessed through
92/// [`Connection::object_server`].
93///
94/// `Connection` implements [`Clone`] and cloning it is a very cheap operation, as the underlying
95/// data is not cloned. This makes it very convenient to share the connection between different
96/// parts of your code. `Connection` also implements [`std::marker::Sync`] and [`std::marker::Send`]
97/// so you can send and share a connection instance across threads as well.
98///
99/// `Connection` keeps internal queues of incoming message. The default capacity of each of these is
100/// 64. The capacity of the main (unfiltered) queue is configurable through the [`set_max_queued`]
101/// method. When the queue is full, no more messages can be received until room is created for more.
102/// This is why it's important to ensure that all [`crate::MessageStream`] and
103/// [`crate::blocking::MessageIterator`] instances are continuously polled and iterated on,
104/// respectively.
105///
106/// For sending messages you can either use [`Connection::send_message`] method or make use of the
107/// [`Sink`] implementation. For latter, you might find [`SinkExt`] API very useful. Keep in mind
108/// that [`Connection`] will not manage the serial numbers (cookies) on the messages for you when
109/// they are sent through the [`Sink`] implementation. You can manually assign unique serial numbers
110/// to them using the [`Connection::assign_serial_num`] method before sending them off, if needed.
111/// Having said that, the [`Sink`] is mainly useful for sending out signals, as they do not expect
112/// a reply, and serial numbers are not very useful for signals either for the same reason.
113///
114/// Since you do not need exclusive access to a `zbus::Connection` to send messages on the bus,
115/// [`Sink`] is also implemented on `&Connection`.
116///
117/// # Caveats
118///
119/// At the moment, a simultaneous [flush request] from multiple tasks/threads could
120/// potentially create a busy loop, thus wasting CPU time. This limitation may be removed in the
121/// future.
122///
123/// [flush request]: https://docs.rs/futures/0.3.15/futures/sink/trait.SinkExt.html#method.flush
124///
125/// [method calls]: struct.Connection.html#method.call_method
126/// [signals]: struct.Connection.html#method.emit_signal
127/// [`dbus_proxy`]: attr.dbus_proxy.html
128/// [`dbus_interface`]: attr.dbus_interface.html
129/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
130/// [`set_max_queued`]: struct.Connection.html#method.set_max_queued
131///
132/// ### Examples
133///
134/// #### Get the session bus ID
135///
136/// ```
137/// # zbus::block_on(async {
138/// use zbus::Connection;
139///
140/// let connection = Connection::session().await?;
141///
142/// let reply = connection
143///     .call_method(
144///         Some("org.freedesktop.DBus"),
145///         "/org/freedesktop/DBus",
146///         Some("org.freedesktop.DBus"),
147///         "GetId",
148///         &(),
149///     )
150///     .await?;
151///
152/// let id: &str = reply.body()?;
153/// println!("Unique ID of the bus: {}", id);
154/// # Ok::<(), zbus::Error>(())
155/// # }).unwrap();
156/// ```
157///
158/// #### Monitoring all messages
159///
160/// Let's eavesdrop on the session bus 😈 using the [Monitor] interface:
161///
162/// ```rust,no_run
163/// # zbus::block_on(async {
164/// use futures_util::stream::TryStreamExt;
165/// use zbus::{Connection, MessageStream};
166///
167/// let connection = Connection::session().await?;
168///
169/// connection
170///     .call_method(
171///         Some("org.freedesktop.DBus"),
172///         "/org/freedesktop/DBus",
173///         Some("org.freedesktop.DBus.Monitoring"),
174///         "BecomeMonitor",
175///         &(&[] as &[&str], 0u32),
176///     )
177///     .await?;
178///
179/// let mut stream = MessageStream::from(connection);
180/// while let Some(msg) = stream.try_next().await? {
181///     println!("Got message: {}", msg);
182/// }
183///
184/// # Ok::<(), zbus::Error>(())
185/// # }).unwrap();
186/// ```
187///
188/// This should print something like:
189///
190/// ```console
191/// Got message: Signal NameAcquired from org.freedesktop.DBus
192/// Got message: Signal NameLost from org.freedesktop.DBus
193/// Got message: Method call GetConnectionUnixProcessID from :1.1324
194/// Got message: Error org.freedesktop.DBus.Error.NameHasNoOwner:
195///              Could not get PID of name ':1.1332': no such name from org.freedesktop.DBus
196/// Got message: Method call AddMatch from :1.918
197/// Got message: Method return from org.freedesktop.DBus
198/// ```
199///
200/// [Monitor]: https://dbus.freedesktop.org/doc/dbus-specification.html#bus-messages-become-monitor
201#[derive(Clone, Debug)]
202#[must_use = "Dropping a `Connection` will close the underlying socket."]
203pub struct Connection {
204    pub(crate) inner: Arc<ConnectionInner>,
205}
206
207assert_impl_all!(Connection: Send, Sync, Unpin);
208
209/// A method call whose completion can be awaited or joined with other streams.
210///
211/// This is useful for cache population method calls, where joining the [`JoinableStream`] with
212/// an update signal stream can be used to ensure that cache updates are not overwritten by a cache
213/// population whose task is scheduled later.
214#[derive(Debug)]
215pub(crate) struct PendingMethodCall {
216    stream: Option<MessageStream>,
217    serial: u32,
218}
219
220impl Future for PendingMethodCall {
221    type Output = Result<Arc<Message>>;
222
223    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
224        self.poll_before(cx, None).map(|ret| {
225            ret.map(|(_, r)| r).unwrap_or_else(|| {
226                Err(crate::Error::InputOutput(
227                    io::Error::new(ErrorKind::BrokenPipe, "socket closed").into(),
228                ))
229            })
230        })
231    }
232}
233
234impl OrderedFuture for PendingMethodCall {
235    type Output = Result<Arc<Message>>;
236    type Ordering = zbus::MessageSequence;
237
238    fn poll_before(
239        self: Pin<&mut Self>,
240        cx: &mut Context<'_>,
241        before: Option<&Self::Ordering>,
242    ) -> Poll<Option<(Self::Ordering, Self::Output)>> {
243        let this = self.get_mut();
244        if let Some(stream) = &mut this.stream {
245            loop {
246                match Pin::new(&mut *stream).poll_next_before(cx, before) {
247                    Poll::Ready(PollResult::Item {
248                        data: Ok(msg),
249                        ordering,
250                    }) => {
251                        if msg.reply_serial() != Some(this.serial) {
252                            continue;
253                        }
254                        let res = match msg.message_type() {
255                            MessageType::Error => Err(msg.into()),
256                            MessageType::MethodReturn => Ok(msg),
257                            _ => continue,
258                        };
259                        this.stream = None;
260                        return Poll::Ready(Some((ordering, res)));
261                    }
262                    Poll::Ready(PollResult::Item {
263                        data: Err(e),
264                        ordering,
265                    }) => {
266                        return Poll::Ready(Some((ordering, Err(e))));
267                    }
268
269                    Poll::Ready(PollResult::NoneBefore) => {
270                        return Poll::Ready(None);
271                    }
272                    Poll::Ready(PollResult::Terminated) => {
273                        return Poll::Ready(None);
274                    }
275                    Poll::Pending => return Poll::Pending,
276                }
277            }
278        }
279        Poll::Ready(None)
280    }
281}
282
283impl Connection {
284    /// Send `msg` to the peer.
285    ///
286    /// Unlike our [`Sink`] implementation, this method sets a unique (to this connection) serial
287    /// number on the message before sending it off, for you.
288    ///
289    /// On successfully sending off `msg`, the assigned serial number is returned.
290    pub async fn send_message(&self, mut msg: Message) -> Result<u32> {
291        let serial = self.assign_serial_num(&mut msg)?;
292
293        trace!("Sending message: {:?}", msg);
294        (&mut &*self).send(msg).await?;
295        trace!("Sent message with serial: {}", serial);
296
297        Ok(serial)
298    }
299
300    /// Send a method call.
301    ///
302    /// Create a method-call message, send it over the connection, then wait for the reply.
303    ///
304    /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus
305    /// error replies are returned as [`Error::MethodError`].
306    pub async fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
307        &self,
308        destination: Option<D>,
309        path: P,
310        interface: Option<I>,
311        method_name: M,
312        body: &B,
313    ) -> Result<Arc<Message>>
314    where
315        D: TryInto<BusName<'d>>,
316        P: TryInto<ObjectPath<'p>>,
317        I: TryInto<InterfaceName<'i>>,
318        M: TryInto<MemberName<'m>>,
319        D::Error: Into<Error>,
320        P::Error: Into<Error>,
321        I::Error: Into<Error>,
322        M::Error: Into<Error>,
323        B: serde::ser::Serialize + zvariant::DynamicType,
324    {
325        self.call_method_raw(
326            destination,
327            path,
328            interface,
329            method_name,
330            BitFlags::empty(),
331            body,
332        )
333        .await?
334        .expect("no reply")
335        .await
336    }
337
338    /// Send a method call.
339    ///
340    /// Send the given message, which must be a method call, over the connection and return an
341    /// object that allows the reply to be retrieved.  Typically you'd want to use
342    /// [`Connection::call_method`] instead.
343    ///
344    /// If the `flags` do not contain `MethodFlags::NoReplyExpected`, the return value is
345    /// guaranteed to be `Ok(Some(_))`, if there was no error encountered.
346    ///
347    /// INTERNAL NOTE: If this method is ever made pub, flags should become `BitFlags<MethodFlags>`.
348    pub(crate) async fn call_method_raw<'d, 'p, 'i, 'm, D, P, I, M, B>(
349        &self,
350        destination: Option<D>,
351        path: P,
352        interface: Option<I>,
353        method_name: M,
354        flags: BitFlags<MessageFlags>,
355        body: &B,
356    ) -> Result<Option<PendingMethodCall>>
357    where
358        D: TryInto<BusName<'d>>,
359        P: TryInto<ObjectPath<'p>>,
360        I: TryInto<InterfaceName<'i>>,
361        M: TryInto<MemberName<'m>>,
362        D::Error: Into<Error>,
363        P::Error: Into<Error>,
364        I::Error: Into<Error>,
365        M::Error: Into<Error>,
366        B: serde::ser::Serialize + zvariant::DynamicType,
367    {
368        let mut builder = MessageBuilder::method_call(path, method_name)?;
369        if let Some(sender) = self.unique_name() {
370            builder = builder.sender(sender)?
371        }
372        if let Some(destination) = destination {
373            builder = builder.destination(destination)?
374        }
375        if let Some(interface) = interface {
376            builder = builder.interface(interface)?
377        }
378        for flag in flags {
379            builder = builder.with_flags(flag)?;
380        }
381        let msg = builder.build(body)?;
382
383        let msg_receiver = self.inner.method_return_receiver.activate_cloned();
384        let stream = Some(MessageStream::for_subscription_channel(
385            msg_receiver,
386            // This is a lie but we only use the stream internally so it's fine.
387            None,
388            self,
389        ));
390        let serial = self.send_message(msg).await?;
391        if flags.contains(MessageFlags::NoReplyExpected) {
392            Ok(None)
393        } else {
394            Ok(Some(PendingMethodCall { stream, serial }))
395        }
396    }
397
398    /// Emit a signal.
399    ///
400    /// Create a signal message, and send it over the connection.
401    pub async fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
402        &self,
403        destination: Option<D>,
404        path: P,
405        interface: I,
406        signal_name: M,
407        body: &B,
408    ) -> Result<()>
409    where
410        D: TryInto<BusName<'d>>,
411        P: TryInto<ObjectPath<'p>>,
412        I: TryInto<InterfaceName<'i>>,
413        M: TryInto<MemberName<'m>>,
414        D::Error: Into<Error>,
415        P::Error: Into<Error>,
416        I::Error: Into<Error>,
417        M::Error: Into<Error>,
418        B: serde::ser::Serialize + zvariant::DynamicType,
419    {
420        let m = Message::signal(
421            self.unique_name(),
422            destination,
423            path,
424            interface,
425            signal_name,
426            body,
427        )?;
428
429        self.send_message(m).await.map(|_| ())
430    }
431
432    /// Reply to a message.
433    ///
434    /// Given an existing message (likely a method call), send a reply back to the caller with the
435    /// given `body`.
436    ///
437    /// Returns the message serial number.
438    pub async fn reply<B>(&self, call: &Message, body: &B) -> Result<u32>
439    where
440        B: serde::ser::Serialize + zvariant::DynamicType,
441    {
442        let m = Message::method_reply(self.unique_name(), call, body)?;
443        self.send_message(m).await
444    }
445
446    /// Reply an error to a message.
447    ///
448    /// Given an existing message (likely a method call), send an error reply back to the caller
449    /// with the given `error_name` and `body`.
450    ///
451    /// Returns the message serial number.
452    pub async fn reply_error<'e, E, B>(
453        &self,
454        call: &Message,
455        error_name: E,
456        body: &B,
457    ) -> Result<u32>
458    where
459        B: serde::ser::Serialize + zvariant::DynamicType,
460        E: TryInto<ErrorName<'e>>,
461        E::Error: Into<Error>,
462    {
463        let m = Message::method_error(self.unique_name(), call, error_name, body)?;
464        self.send_message(m).await
465    }
466
467    /// Reply an error to a message.
468    ///
469    /// Given an existing message (likely a method call), send an error reply back to the caller
470    /// using one of the standard interface reply types.
471    ///
472    /// Returns the message serial number.
473    pub async fn reply_dbus_error(
474        &self,
475        call: &zbus::MessageHeader<'_>,
476        err: impl DBusError,
477    ) -> Result<u32> {
478        let m = err.create_reply(call);
479        self.send_message(m?).await
480    }
481
482    /// Register a well-known name for this connection.
483    ///
484    /// When connecting to a bus, the name is requested from the bus. In case of p2p connection, the
485    /// name (if requested) is used of self-identification.
486    ///
487    /// You can request multiple names for the same connection. Use [`Connection::release_name`] for
488    /// deregistering names registered through this method.
489    ///
490    /// Note that exclusive ownership without queueing is requested (using
491    /// [`RequestNameFlags::ReplaceExisting`] and [`RequestNameFlags::DoNotQueue`] flags) since that
492    /// is the most typical case. If that is not what you want, you should use
493    /// [`Connection::request_name_with_flags`] instead (but make sure then that name is requested
494    /// **after** you've setup your service implementation with the `ObjectServer`).
495    ///
496    /// # Caveats
497    ///
498    /// The associated `ObjectServer` will only handle method calls destined for the unique name of
499    /// this connection or any of the registered well-known names. If no well-known name is
500    /// registered, the method calls destined to all well-known names will be handled.
501    ///
502    /// Since names registered through any other means than `Connection` or [`ConnectionBuilder`]
503    /// API are not known to the connection, method calls destined to those names will only be
504    /// handled by the associated `ObjectServer` if none of the names are registered through
505    /// `Connection*` API. Simply put, either register all the names through `Connection*` API or
506    /// none of them.
507    ///
508    /// # Errors
509    ///
510    /// Fails with `zbus::Error::NameTaken` if the name is already owned by another peer.
511    pub async fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
512    where
513        W: TryInto<WellKnownName<'w>>,
514        W::Error: Into<Error>,
515    {
516        self.request_name_with_flags(
517            well_known_name,
518            RequestNameFlags::ReplaceExisting | RequestNameFlags::DoNotQueue,
519        )
520        .await
521        .map(|_| ())
522    }
523
524    /// Register a well-known name for this connection.
525    ///
526    /// This is the same as [`Connection::request_name`] but allows to specify the flags to use when
527    /// requesting the name.
528    ///
529    /// If the [`RequestNameFlags::DoNotQueue`] flag is not specified and request ends up in the
530    /// queue, you can use [`fdo::NameAcquiredStream`] to be notified when the name is acquired. A
531    /// queued name request can be cancelled using [`Connection::release_name`].
532    ///
533    /// If the [`RequestNameFlags::AllowReplacement`] flag is specified, the requested name can be
534    /// lost if another peer requests the same name. You can use [`fdo::NameLostStream`] to be
535    /// notified when the name is lost
536    ///
537    /// # Example
538    ///
539    /// ```
540    /// #
541    /// # zbus::block_on(async {
542    /// use zbus::{Connection, fdo::{DBusProxy, RequestNameFlags, RequestNameReply}};
543    /// use enumflags2::BitFlags;
544    /// use futures_util::stream::StreamExt;
545    ///
546    /// let name = "org.freedesktop.zbus.QueuedNameTest";
547    /// let conn1 = Connection::session().await?;
548    /// // This should just work right away.
549    /// conn1.request_name(name).await?;
550    ///
551    /// let conn2 = Connection::session().await?;
552    /// // A second request from the another connection will fail with `DoNotQueue` flag, which is
553    /// // implicit with `request_name` method.
554    /// assert!(conn2.request_name(name).await.is_err());
555    ///
556    /// // Now let's try w/o `DoNotQueue` and we should be queued.
557    /// let reply = conn2
558    ///     .request_name_with_flags(name, RequestNameFlags::AllowReplacement.into())
559    ///     .await?;
560    /// assert_eq!(reply, RequestNameReply::InQueue);
561    /// // Another request should just give us the same response.
562    /// let reply = conn2
563    ///     // The flags on subsequent requests will however be ignored.
564    ///     .request_name_with_flags(name, BitFlags::empty())
565    ///     .await?;
566    /// assert_eq!(reply, RequestNameReply::InQueue);
567    /// let mut acquired_stream = DBusProxy::new(&conn2)
568    ///     .await?
569    ///     .receive_name_acquired()
570    ///     .await?;
571    /// assert!(conn1.release_name(name).await?);
572    /// // This would have waited forever if `conn1` hadn't just release the name.
573    /// let acquired = acquired_stream.next().await.unwrap();
574    /// assert_eq!(acquired.args().unwrap().name, name);
575    ///
576    /// // conn2 made the mistake of being too nice and allowed name replacemnt, so conn1 should be
577    /// // able to take it back.
578    /// let mut lost_stream = DBusProxy::new(&conn2)
579    ///     .await?
580    ///     .receive_name_lost()
581    ///     .await?;
582    /// conn1.request_name(name).await?;
583    /// let lost = lost_stream.next().await.unwrap();
584    /// assert_eq!(lost.args().unwrap().name, name);
585    ///
586    /// # Ok::<(), zbus::Error>(())
587    /// # }).unwrap();
588    /// ```
589    ///
590    /// # Caveats
591    ///
592    /// * Same as that of [`Connection::request_name`].
593    /// * If you wish to track changes to name ownership after this call, make sure that the
594    /// [`fdo::NameAcquired`] and/or [`fdo::NameLostStream`] instance(s) are created **before**
595    /// calling this method. Otherwise, you may loose the signal if it's emitted after this call but
596    /// just before the stream instance get created.
597    pub async fn request_name_with_flags<'w, W>(
598        &self,
599        well_known_name: W,
600        flags: BitFlags<RequestNameFlags>,
601    ) -> Result<RequestNameReply>
602    where
603        W: TryInto<WellKnownName<'w>>,
604        W::Error: Into<Error>,
605    {
606        let well_known_name = well_known_name.try_into().map_err(Into::into)?;
607        // We keep the lock until the end of this function so that the (possibly) spawned task
608        // doesn't end up accessing the name entry before it's inserted.
609        let mut names = self.inner.registered_names.lock().await;
610
611        match names.get(&well_known_name) {
612            Some(NameStatus::Owner(_)) => return Ok(RequestNameReply::AlreadyOwner),
613            Some(NameStatus::Queued(_)) => return Ok(RequestNameReply::InQueue),
614            None => (),
615        }
616
617        if !self.is_bus() {
618            names.insert(well_known_name.to_owned(), NameStatus::Owner(None));
619
620            return Ok(RequestNameReply::PrimaryOwner);
621        }
622
623        let dbus_proxy = fdo::DBusProxy::builder(self)
624            .cache_properties(CacheProperties::No)
625            .build()
626            .await?;
627        let mut acquired_stream = dbus_proxy.receive_name_acquired().await?;
628        let mut lost_stream = dbus_proxy.receive_name_lost().await?;
629        let reply = dbus_proxy
630            .request_name(well_known_name.clone(), flags)
631            .await?;
632        let lost_task_name = format!("monitor name {well_known_name} lost");
633        let name_lost_fut = if flags.contains(RequestNameFlags::AllowReplacement) {
634            let weak_conn = WeakConnection::from(self);
635            let well_known_name = well_known_name.to_owned();
636            Some(
637                async move {
638                    loop {
639                        let signal = lost_stream.next().await;
640                        let inner = match weak_conn.upgrade() {
641                            Some(conn) => conn.inner.clone(),
642                            None => break,
643                        };
644
645                        match signal {
646                            Some(signal) => match signal.args() {
647                                Ok(args) if args.name == well_known_name => {
648                                    tracing::info!(
649                                        "Connection `{}` lost name `{}`",
650                                        // SAFETY: This is bus connection so unique name can't be
651                                        // None.
652                                        inner.unique_name.get().unwrap(),
653                                        well_known_name
654                                    );
655                                    inner.registered_names.lock().await.remove(&well_known_name);
656
657                                    break;
658                                }
659                                Ok(_) => (),
660                                Err(e) => warn!("Failed to parse `NameLost` signal: {}", e),
661                            },
662                            None => {
663                                trace!("`NameLost` signal stream closed");
664                                // This is a very strange state we end up in. Now the name is
665                                // question remains in the queue
666                                // forever. Maybe we can do better here but I
667                                // think it's a very unlikely scenario anyway.
668                                //
669                                // Can happen if the connection is lost/dropped but then the whole
670                                // `Connection` instance will go away soon anyway and hence this
671                                // strange state along with it.
672                                break;
673                            }
674                        }
675                    }
676                }
677                .instrument(info_span!("{}", lost_task_name)),
678            )
679        } else {
680            None
681        };
682        let status = match reply {
683            RequestNameReply::InQueue => {
684                let weak_conn = WeakConnection::from(self);
685                let well_known_name = well_known_name.to_owned();
686                let task_name = format!("monitor name {well_known_name} acquired");
687                let task = self.executor().spawn(
688                    async move {
689                        loop {
690                            let signal = acquired_stream.next().await;
691                            let inner = match weak_conn.upgrade() {
692                                Some(conn) => conn.inner.clone(),
693                                None => break,
694                            };
695                            match signal {
696                                Some(signal) => match signal.args() {
697                                    Ok(args) if args.name == well_known_name => {
698                                        let mut names = inner.registered_names.lock().await;
699                                        if let Some(status) = names.get_mut(&well_known_name) {
700                                            let task = name_lost_fut.map(|fut| {
701                                                inner.executor.spawn(fut, &lost_task_name)
702                                            });
703                                            *status = NameStatus::Owner(task);
704
705                                            break;
706                                        }
707                                        // else the name was released in the meantime. :shrug:
708                                    }
709                                    Ok(_) => (),
710                                    Err(e) => warn!("Failed to parse `NameAcquired` signal: {}", e),
711                                },
712                                None => {
713                                    trace!("`NameAcquired` signal stream closed");
714                                    // See comment above for similar state in case of `NameLost`
715                                    // stream.
716                                    break;
717                                }
718                            }
719                        }
720                    }
721                    .instrument(info_span!("{}", task_name)),
722                    &task_name,
723                );
724
725                NameStatus::Queued(task)
726            }
727            RequestNameReply::PrimaryOwner | RequestNameReply::AlreadyOwner => {
728                let task = name_lost_fut.map(|fut| self.executor().spawn(fut, &lost_task_name));
729
730                NameStatus::Owner(task)
731            }
732            RequestNameReply::Exists => return Err(Error::NameTaken),
733        };
734
735        names.insert(well_known_name.to_owned(), status);
736
737        Ok(reply)
738    }
739
740    /// Deregister a previously registered well-known name for this service on the bus.
741    ///
742    /// Use this method to deregister a well-known name, registered through
743    /// [`Connection::request_name`].
744    ///
745    /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with
746    /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name
747    /// was not previously registered or already deregistered.
748    pub async fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
749    where
750        W: TryInto<WellKnownName<'w>>,
751        W::Error: Into<Error>,
752    {
753        let well_known_name: WellKnownName<'w> = well_known_name.try_into().map_err(Into::into)?;
754        let mut names = self.inner.registered_names.lock().await;
755        // FIXME: Should be possible to avoid cloning/allocation here
756        if names.remove(&well_known_name.to_owned()).is_none() {
757            return Ok(false);
758        };
759
760        if !self.is_bus() {
761            return Ok(true);
762        }
763
764        fdo::DBusProxy::builder(self)
765            .cache_properties(CacheProperties::No)
766            .build()
767            .await?
768            .release_name(well_known_name)
769            .await
770            .map(|_| true)
771            .map_err(Into::into)
772    }
773
774    /// Checks if `self` is a connection to a message bus.
775    ///
776    /// This will return `false` for p2p connections.
777    pub fn is_bus(&self) -> bool {
778        self.inner.bus_conn
779    }
780
781    /// Assigns a serial number to `msg` that is unique to this connection.
782    ///
783    /// This method can fail if `msg` is corrupted.
784    pub fn assign_serial_num(&self, msg: &mut Message) -> Result<u32> {
785        let mut serial = 0;
786        msg.modify_primary_header(|primary| {
787            serial = *primary.serial_num_or_init(|| self.next_serial());
788            Ok(())
789        })?;
790
791        Ok(serial)
792    }
793
794    /// The unique name of the connection, if set/applicable.
795    ///
796    /// The unique name is assigned by the message bus or set manually using
797    /// [`Connection::set_unique_name`].
798    pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
799        self.inner.unique_name.get()
800    }
801
802    /// Sets the unique name of the connection (if not already set).
803    ///
804    /// # Panics
805    ///
806    /// This method panics if the unique name is already set. It will always panic if the connection
807    /// is to a message bus as it's the bus that assigns peers their unique names. This is mainly
808    /// provided for bus implementations. All other users should not need to use this method.
809    pub fn set_unique_name<U>(&self, unique_name: U) -> Result<()>
810    where
811        U: TryInto<OwnedUniqueName>,
812        U::Error: Into<Error>,
813    {
814        let name = unique_name.try_into().map_err(Into::into)?;
815        self.inner
816            .unique_name
817            .set(name)
818            .expect("unique name already set");
819
820        Ok(())
821    }
822
823    /// The capacity of the main (unfiltered) queue.
824    pub fn max_queued(&self) -> usize {
825        self.inner.msg_receiver.capacity()
826    }
827
828    /// Set the capacity of the main (unfiltered) queue.
829    pub fn set_max_queued(&mut self, max: usize) {
830        self.inner.msg_receiver.clone().set_capacity(max);
831    }
832
833    /// The server's GUID.
834    pub fn server_guid(&self) -> &str {
835        self.inner.server_guid.as_str()
836    }
837
838    /// The underlying executor.
839    ///
840    /// When a connection is built with internal_executor set to false, zbus will not spawn a
841    /// thread to run the executor. You're responsible to continuously [tick the executor][tte].
842    /// Failure to do so will result in hangs.
843    ///
844    /// # Examples
845    ///
846    /// Here is how one would typically run the zbus executor through async-std's single-threaded
847    /// scheduler:
848    ///
849    /// ```
850    /// # // Disable on windows because somehow it triggers a stack overflow there:
851    /// # // https://gitlab.freedesktop.org/zeenix/zbus/-/jobs/34023494
852    /// # #[cfg(all(not(feature = "tokio"), not(target_os = "windows")))]
853    /// # {
854    /// use zbus::ConnectionBuilder;
855    /// use async_std::task::{block_on, spawn};
856    ///
857    /// # struct SomeIface;
858    /// #
859    /// # #[zbus::dbus_interface]
860    /// # impl SomeIface {
861    /// # }
862    /// #
863    /// block_on(async {
864    ///     let conn = ConnectionBuilder::session()
865    ///         .unwrap()
866    ///         .internal_executor(false)
867    /// #         // This is only for testing a deadlock that used to happen with this combo.
868    /// #         .serve_at("/some/iface", SomeIface)
869    /// #         .unwrap()
870    ///         .build()
871    ///         .await
872    ///         .unwrap();
873    ///     {
874    ///        let conn = conn.clone();
875    ///        spawn(async move {
876    ///            loop {
877    ///                conn.executor().tick().await;
878    ///            }
879    ///        });
880    ///     }
881    ///
882    ///     // All your other async code goes here.
883    /// });
884    /// # }
885    /// ```
886    ///
887    /// **Note**: zbus 2.1 added support for tight integration with tokio. This means, if you use
888    /// zbus with tokio, you do not need to worry about this at all. All you need to do is enable
889    /// `tokio` feature. You should also disable the (default) `async-io` feature in your
890    /// `Cargo.toml` to avoid unused dependencies. Also note that **prior** to zbus 3.0, disabling
891    /// `async-io` was required to enable tight `tokio` integration.
892    ///
893    /// [tte]: https://docs.rs/async-executor/1.4.1/async_executor/struct.Executor.html#method.tick
894    pub fn executor(&self) -> &Executor<'static> {
895        &self.inner.executor
896    }
897
898    /// Get a reference to the associated [`ObjectServer`].
899    ///
900    /// The `ObjectServer` is created on-demand.
901    ///
902    /// **Note**: Once the `ObjectServer` is created, it will be replying to all method calls
903    /// received on `self`. If you want to manually reply to method calls, do not use this
904    /// method (or any of the `ObjectServer` related API).
905    pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
906        // FIXME: Maybe it makes sense after all to implement Deref<Target= ObjectServer> for
907        // crate::ObjectServer instead of this wrapper?
908        struct Wrapper<'a>(&'a blocking::ObjectServer);
909        impl<'a> Deref for Wrapper<'a> {
910            type Target = ObjectServer;
911
912            fn deref(&self) -> &Self::Target {
913                self.0.inner()
914            }
915        }
916
917        Wrapper(self.sync_object_server(true, None))
918    }
919
920    pub(crate) fn sync_object_server(
921        &self,
922        start: bool,
923        started_event: Option<Event>,
924    ) -> &blocking::ObjectServer {
925        self.inner
926            .object_server
927            .get_or_init(move || self.setup_object_server(start, started_event))
928    }
929
930    fn setup_object_server(
931        &self,
932        start: bool,
933        started_event: Option<Event>,
934    ) -> blocking::ObjectServer {
935        if start {
936            self.start_object_server(started_event);
937        }
938
939        blocking::ObjectServer::new(self)
940    }
941
942    #[instrument(skip(self))]
943    pub(crate) fn start_object_server(&self, started_event: Option<Event>) {
944        self.inner.object_server_dispatch_task.get_or_init(|| {
945            trace!("starting ObjectServer task");
946            let weak_conn = WeakConnection::from(self);
947
948            let obj_server_task_name = "ObjectServer task";
949            self.inner.executor.spawn(
950                async move {
951                    let mut stream = match weak_conn.upgrade() {
952                        Some(conn) => {
953                            let mut builder = MatchRule::builder().msg_type(MessageType::MethodCall);
954                            if let Some(unique_name) = conn.unique_name() {
955                                builder = builder.destination(&**unique_name).expect("unique name");
956                            }
957                            let rule = builder.build();
958                            match conn.add_match(rule.into(), None).await {
959                                Ok(stream) => stream,
960                                Err(e) => {
961                                    // Very unlikely but can happen I guess if connection is closed.
962                                    debug!("Failed to create message stream: {}", e);
963
964                                    return;
965                                }
966                            }
967                        }
968                        None => {
969                            trace!("Connection is gone, stopping associated object server task");
970
971                            return;
972                        }
973                    };
974                    if let Some(started_event) = started_event {
975                        started_event.notify(1);
976                    }
977
978                    trace!("waiting for incoming method call messages..");
979                    while let Some(msg) = stream.next().await.and_then(|m| {
980                        if let Err(e) = &m {
981                            debug!("Error while reading from object server stream: {:?}", e);
982                        }
983                        m.ok()
984                    }) {
985                        if let Some(conn) = weak_conn.upgrade() {
986                            let hdr = match msg.header() {
987                                Ok(hdr) => hdr,
988                                Err(e) => {
989                                    warn!("Failed to parse header: {}", e);
990
991                                    continue;
992                                }
993                            };
994                            match hdr.destination() {
995                                // Unique name is already checked by the match rule.
996                                Ok(Some(BusName::Unique(_))) | Ok(None) => (),
997                                Ok(Some(BusName::WellKnown(dest))) => {
998                                    let names = conn.inner.registered_names.lock().await;
999                                    // destination doesn't matter if no name has been registered
1000                                    // (probably means name it's registered through external means).
1001                                    if !names.is_empty() && !names.contains_key(dest) {
1002                                        trace!("Got a method call for a different destination: {}", dest);
1003
1004                                        continue;
1005                                    }
1006                                }
1007                                Err(e) => {
1008                                    warn!("Failed to parse destination: {}", e);
1009
1010                                    continue;
1011                                }
1012                            }
1013                            let member = match msg.member() {
1014                                Some(member) => member,
1015                                None => {
1016                                    warn!("Got a method call with no `MEMBER` field: {}", msg);
1017
1018                                    continue;
1019                                }
1020                            };
1021                            trace!("Got `{}`. Will spawn a task for dispatch..", msg);
1022                            let executor = conn.inner.executor.clone();
1023                            let task_name = format!("`{member}` method dispatcher");
1024                            executor
1025                                .spawn(
1026                                    async move {
1027                                        trace!("spawned a task to dispatch `{}`.", msg);
1028                                        let server = conn.object_server();
1029                                        if let Err(e) = server.dispatch_message(&msg).await {
1030                                            debug!(
1031                                                "Error dispatching message. Message: {:?}, error: {:?}",
1032                                                msg, e
1033                                            );
1034                                        }
1035                                    }
1036                                    .instrument(trace_span!("{}", task_name)),
1037                                    &task_name,
1038                                )
1039                                .detach();
1040                        } else {
1041                            // If connection is completely gone, no reason to keep running the task anymore.
1042                            trace!("Connection is gone, stopping associated object server task");
1043                            break;
1044                        }
1045                    }
1046                }
1047                .instrument(info_span!("{}", obj_server_task_name)),
1048                obj_server_task_name,
1049            )
1050        });
1051    }
1052
1053    pub(crate) async fn add_match(
1054        &self,
1055        rule: OwnedMatchRule,
1056        max_queued: Option<usize>,
1057    ) -> Result<Receiver<Result<Arc<Message>>>> {
1058        use std::collections::hash_map::Entry;
1059
1060        if self.inner.msg_senders.lock().await.is_empty() {
1061            // This only happens if socket reader task has errored out.
1062            return Err(Error::InputOutput(Arc::new(io::Error::new(
1063                io::ErrorKind::BrokenPipe,
1064                "Socket reader task has errored out",
1065            ))));
1066        }
1067
1068        let mut subscriptions = self.inner.subscriptions.lock().await;
1069        let msg_type = rule.msg_type().unwrap_or(MessageType::Signal);
1070        match subscriptions.entry(rule.clone()) {
1071            Entry::Vacant(e) => {
1072                let max_queued = max_queued.unwrap_or(DEFAULT_MAX_QUEUED);
1073                let (sender, mut receiver) = broadcast(max_queued);
1074                receiver.set_await_active(false);
1075                if self.is_bus() && msg_type == MessageType::Signal {
1076                    fdo::DBusProxy::builder(self)
1077                        .cache_properties(CacheProperties::No)
1078                        .build()
1079                        .await?
1080                        .add_match_rule(e.key().inner().clone())
1081                        .await?;
1082                }
1083                e.insert((1, receiver.clone().deactivate()));
1084                self.inner
1085                    .msg_senders
1086                    .lock()
1087                    .await
1088                    .insert(Some(rule), sender);
1089
1090                Ok(receiver)
1091            }
1092            Entry::Occupied(mut e) => {
1093                let (num_subscriptions, receiver) = e.get_mut();
1094                *num_subscriptions += 1;
1095                if let Some(max_queued) = max_queued {
1096                    if max_queued > receiver.capacity() {
1097                        receiver.set_capacity(max_queued);
1098                    }
1099                }
1100
1101                Ok(receiver.activate_cloned())
1102            }
1103        }
1104    }
1105
1106    pub(crate) async fn remove_match(&self, rule: OwnedMatchRule) -> Result<bool> {
1107        use std::collections::hash_map::Entry;
1108        let mut subscriptions = self.inner.subscriptions.lock().await;
1109        // TODO when it becomes stable, use HashMap::raw_entry and only require expr: &str
1110        // (both here and in add_match)
1111        let msg_type = rule.msg_type().unwrap_or(MessageType::Signal);
1112        match subscriptions.entry(rule) {
1113            Entry::Vacant(_) => Ok(false),
1114            Entry::Occupied(mut e) => {
1115                let rule = e.key().inner().clone();
1116                e.get_mut().0 -= 1;
1117                if e.get().0 == 0 {
1118                    if self.is_bus() && msg_type == MessageType::Signal {
1119                        fdo::DBusProxy::builder(self)
1120                            .cache_properties(CacheProperties::No)
1121                            .build()
1122                            .await?
1123                            .remove_match_rule(rule.clone())
1124                            .await?;
1125                    }
1126                    e.remove();
1127                    self.inner
1128                        .msg_senders
1129                        .lock()
1130                        .await
1131                        .remove(&Some(rule.into()));
1132                }
1133                Ok(true)
1134            }
1135        }
1136    }
1137
1138    pub(crate) fn queue_remove_match(&self, rule: OwnedMatchRule) {
1139        let conn = self.clone();
1140        let task_name = format!("Remove match `{}`", *rule);
1141        let remove_match =
1142            async move { conn.remove_match(rule).await }.instrument(trace_span!("{}", task_name));
1143        self.inner.executor.spawn(remove_match, &task_name).detach()
1144    }
1145
1146    pub(crate) async fn hello_bus(&self) -> Result<()> {
1147        let dbus_proxy = fdo::DBusProxy::builder(self)
1148            .cache_properties(CacheProperties::No)
1149            .build()
1150            .await?;
1151        let name = dbus_proxy.hello().await?;
1152
1153        self.inner
1154            .unique_name
1155            .set(name)
1156            // programmer (probably our) error if this fails.
1157            .expect("Attempted to set unique_name twice");
1158
1159        Ok(())
1160    }
1161
1162    pub(crate) async fn new(
1163        auth: Authenticated<Box<dyn Socket>>,
1164        bus_connection: bool,
1165        executor: Executor<'static>,
1166    ) -> Result<Self> {
1167        #[cfg(unix)]
1168        let cap_unix_fd = auth.cap_unix_fd;
1169
1170        macro_rules! create_msg_broadcast_channel {
1171            ($size:expr) => {{
1172                let (msg_sender, msg_receiver) = broadcast($size);
1173                let mut msg_receiver = msg_receiver.deactivate();
1174                msg_receiver.set_await_active(false);
1175
1176                (msg_sender, msg_receiver)
1177            }};
1178        }
1179        // The unfiltered message channel.
1180        let (msg_sender, msg_receiver) = create_msg_broadcast_channel!(DEFAULT_MAX_QUEUED);
1181        let mut msg_senders = HashMap::new();
1182        msg_senders.insert(None, msg_sender);
1183
1184        // The special method return & error channel.
1185        let (method_return_sender, method_return_receiver) =
1186            create_msg_broadcast_channel!(DEFAULT_MAX_METHOD_RETURN_QUEUED);
1187        let rule = MatchRule::builder()
1188            .msg_type(MessageType::MethodReturn)
1189            .build()
1190            .into();
1191        msg_senders.insert(Some(rule), method_return_sender.clone());
1192        let rule = MatchRule::builder()
1193            .msg_type(MessageType::Error)
1194            .build()
1195            .into();
1196        msg_senders.insert(Some(rule), method_return_sender);
1197        let msg_senders = Arc::new(Mutex::new(msg_senders));
1198        let subscriptions = Mutex::new(HashMap::new());
1199
1200        let raw_conn = Arc::new(sync::Mutex::new(auth.conn));
1201
1202        let connection = Self {
1203            inner: Arc::new(ConnectionInner {
1204                raw_conn,
1205                server_guid: auth.server_guid,
1206                #[cfg(unix)]
1207                cap_unix_fd,
1208                bus_conn: bus_connection,
1209                serial: AtomicU32::new(1),
1210                unique_name: OnceCell::new(),
1211                subscriptions,
1212                object_server: OnceCell::new(),
1213                object_server_dispatch_task: OnceCell::new(),
1214                executor,
1215                socket_reader_task: OnceCell::new(),
1216                msg_senders,
1217                msg_receiver,
1218                method_return_receiver,
1219                registered_names: Mutex::new(HashMap::new()),
1220            }),
1221        };
1222
1223        Ok(connection)
1224    }
1225
1226    fn next_serial(&self) -> u32 {
1227        self.inner.serial.fetch_add(1, SeqCst)
1228    }
1229
1230    /// Create a `Connection` to the session/user message bus.
1231    pub async fn session() -> Result<Self> {
1232        ConnectionBuilder::session()?.build().await
1233    }
1234
1235    /// Create a `Connection` to the system-wide message bus.
1236    pub async fn system() -> Result<Self> {
1237        ConnectionBuilder::system()?.build().await
1238    }
1239
1240    /// Returns a listener, notified on various connection activity.
1241    ///
1242    /// This function is meant for the caller to implement idle or timeout on inactivity.
1243    pub fn monitor_activity(&self) -> EventListener {
1244        self.inner
1245            .raw_conn
1246            .lock()
1247            .expect("poisoned lock")
1248            .monitor_activity()
1249    }
1250
1251    /// Returns the peer process ID, or Ok(None) if it cannot be returned for the associated socket.
1252    #[deprecated(
1253        since = "3.13.0",
1254        note = "Use `peer_credentials` instead, which returns `ConnectionCredentials` which includes
1255                the peer PID."
1256    )]
1257    pub fn peer_pid(&self) -> io::Result<Option<u32>> {
1258        self.inner
1259            .raw_conn
1260            .lock()
1261            .expect("poisoned lock")
1262            .socket()
1263            .peer_pid()
1264    }
1265
1266    /// Returns the peer credentials.
1267    ///
1268    /// The fields are populated on the best effort basis. Some or all fields may not even make
1269    /// sense for certain sockets or on certain platforms and hence will be set to `None`.
1270    ///
1271    /// # Caveats
1272    ///
1273    /// Currently `unix_group_ids` and `linux_security_label` fields are not populated.
1274    #[allow(deprecated)]
1275    pub async fn peer_credentials(&self) -> io::Result<ConnectionCredentials> {
1276        let raw_conn = self.inner.raw_conn.lock().expect("poisoned lock");
1277        let socket = raw_conn.socket();
1278
1279        Ok(ConnectionCredentials {
1280            process_id: socket.peer_pid()?,
1281            #[cfg(unix)]
1282            unix_user_id: socket.uid()?,
1283            #[cfg(not(unix))]
1284            unix_user_id: None,
1285            // Should we beother providing all the groups of user? What's the use case?
1286            unix_group_ids: None,
1287            #[cfg(windows)]
1288            windows_sid: socket.peer_sid(),
1289            #[cfg(not(windows))]
1290            windows_sid: None,
1291            // TODO: Populate this field (see the field docs for pointers).
1292            linux_security_label: None,
1293        })
1294    }
1295
1296    pub(crate) fn init_socket_reader(&self) {
1297        let inner = &self.inner;
1298        inner
1299            .socket_reader_task
1300            .set(
1301                SocketReader::new(inner.raw_conn.clone(), inner.msg_senders.clone())
1302                    .spawn(&inner.executor),
1303            )
1304            .expect("Attempted to set `socket_reader_task` twice");
1305    }
1306}
1307
1308impl<T> Sink<T> for Connection
1309where
1310    T: Into<Arc<Message>>,
1311{
1312    type Error = Error;
1313
1314    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1315        <&Connection as Sink<Arc<Message>>>::poll_ready(Pin::new(&mut &*self), cx)
1316    }
1317
1318    fn start_send(self: Pin<&mut Self>, msg: T) -> Result<()> {
1319        Pin::new(&mut &*self).start_send(msg)
1320    }
1321
1322    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1323        <&Connection as Sink<Arc<Message>>>::poll_flush(Pin::new(&mut &*self), cx)
1324    }
1325
1326    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1327        <&Connection as Sink<Arc<Message>>>::poll_close(Pin::new(&mut &*self), cx)
1328    }
1329}
1330
1331impl<'a, T> Sink<T> for &'a Connection
1332where
1333    T: Into<Arc<Message>>,
1334{
1335    type Error = Error;
1336
1337    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
1338        // TODO: We should have a max queue length in raw::Socket for outgoing messages.
1339        Poll::Ready(Ok(()))
1340    }
1341
1342    fn start_send(self: Pin<&mut Self>, msg: T) -> Result<()> {
1343        let msg = msg.into();
1344
1345        #[cfg(unix)]
1346        if !msg.fds().is_empty() && !self.inner.cap_unix_fd {
1347            return Err(Error::Unsupported);
1348        }
1349
1350        self.inner
1351            .raw_conn
1352            .lock()
1353            .expect("poisoned lock")
1354            .enqueue_message(msg);
1355
1356        Ok(())
1357    }
1358
1359    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1360        self.inner.raw_conn.lock().expect("poisoned lock").flush(cx)
1361    }
1362
1363    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1364        let mut raw_conn = self.inner.raw_conn.lock().expect("poisoned lock");
1365        let res = raw_conn.flush(cx);
1366        match ready!(res) {
1367            Ok(_) => (),
1368            Err(e) => return Poll::Ready(Err(e)),
1369        }
1370
1371        Poll::Ready(raw_conn.close())
1372    }
1373}
1374
1375impl From<crate::blocking::Connection> for Connection {
1376    fn from(conn: crate::blocking::Connection) -> Self {
1377        conn.into_inner()
1378    }
1379}
1380
1381// Internal API that allows keeping a weak connection ref around.
1382#[derive(Debug)]
1383pub(crate) struct WeakConnection {
1384    inner: Weak<ConnectionInner>,
1385}
1386
1387impl WeakConnection {
1388    /// Upgrade to a Connection.
1389    pub fn upgrade(&self) -> Option<Connection> {
1390        self.inner.upgrade().map(|inner| Connection { inner })
1391    }
1392}
1393
1394impl From<&Connection> for WeakConnection {
1395    fn from(conn: &Connection) -> Self {
1396        Self {
1397            inner: Arc::downgrade(&conn.inner),
1398        }
1399    }
1400}
1401
1402#[derive(Debug)]
1403enum NameStatus {
1404    // The task waits for name lost signal if owner allows replacement.
1405    Owner(#[allow(unused)] Option<Task<()>>),
1406    // The task waits for name acquisition signal.
1407    Queued(#[allow(unused)] Task<()>),
1408}
1409
1410#[cfg(test)]
1411mod tests {
1412    use futures_util::stream::TryStreamExt;
1413    use ntest::timeout;
1414    use test_log::test;
1415
1416    use crate::{fdo::DBusProxy, AuthMechanism};
1417
1418    use super::*;
1419
1420    // Same numbered client and server are already paired up. We make use of the
1421    // `futures_util::stream::Forward` to connect the two pipes and hence test one of the benefits
1422    // of our Stream and Sink impls.
1423    async fn test_p2p(
1424        server1: Connection,
1425        client1: Connection,
1426        server2: Connection,
1427        client2: Connection,
1428    ) -> Result<()> {
1429        let forward1 = MessageStream::from(server1.clone()).forward(client2.clone());
1430        let forward2 = MessageStream::from(&client2).forward(server1);
1431        let _forward_task = client1.executor().spawn(
1432            async move { futures_util::try_join!(forward1, forward2) },
1433            "forward_task",
1434        );
1435
1436        let server_ready = Event::new();
1437        let server_ready_listener = server_ready.listen();
1438        let client_done = Event::new();
1439        let client_done_listener = client_done.listen();
1440
1441        let server_future = async move {
1442            let mut stream = MessageStream::from(&server2);
1443            server_ready.notify(1);
1444            let method = loop {
1445                let m = stream.try_next().await?.unwrap();
1446                if m.to_string() == "Method call Test" {
1447                    break m;
1448                }
1449            };
1450
1451            // Send another message first to check the queueing function on client side.
1452            server2
1453                .emit_signal(None::<()>, "/", "org.zbus.p2p", "ASignalForYou", &())
1454                .await?;
1455            server2.reply(&method, &("yay")).await?;
1456            client_done_listener.await;
1457
1458            Ok(())
1459        };
1460
1461        let client_future = async move {
1462            let mut stream = MessageStream::from(&client1);
1463            server_ready_listener.await;
1464            let reply = client1
1465                .call_method(None::<()>, "/", Some("org.zbus.p2p"), "Test", &())
1466                .await?;
1467            assert_eq!(reply.to_string(), "Method return");
1468            // Check we didn't miss the signal that was sent during the call.
1469            let m = stream.try_next().await?.unwrap();
1470            client_done.notify(1);
1471            assert_eq!(m.to_string(), "Signal ASignalForYou");
1472            reply.body::<String>()
1473        };
1474
1475        let (val, _) = futures_util::try_join!(client_future, server_future,)?;
1476        assert_eq!(val, "yay");
1477
1478        Ok(())
1479    }
1480
1481    #[test]
1482    #[timeout(15000)]
1483    fn tcp_p2p() {
1484        crate::utils::block_on(test_tcp_p2p()).unwrap();
1485    }
1486
1487    async fn test_tcp_p2p() -> Result<()> {
1488        let (server1, client1) = tcp_p2p_pipe().await?;
1489        let (server2, client2) = tcp_p2p_pipe().await?;
1490
1491        test_p2p(server1, client1, server2, client2).await
1492    }
1493
1494    async fn tcp_p2p_pipe() -> Result<(Connection, Connection)> {
1495        let guid = Guid::generate();
1496
1497        #[cfg(not(feature = "tokio"))]
1498        let (server_conn_builder, client_conn_builder) = {
1499            let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1500            let addr = listener.local_addr().unwrap();
1501            let p1 = std::net::TcpStream::connect(addr).unwrap();
1502            let p0 = listener.incoming().next().unwrap().unwrap();
1503
1504            (
1505                ConnectionBuilder::tcp_stream(p0)
1506                    .server(&guid)
1507                    .p2p()
1508                    .auth_mechanisms(&[AuthMechanism::Anonymous]),
1509                ConnectionBuilder::tcp_stream(p1).p2p(),
1510            )
1511        };
1512
1513        #[cfg(feature = "tokio")]
1514        let (server_conn_builder, client_conn_builder) = {
1515            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1516            let addr = listener.local_addr().unwrap();
1517            let p1 = tokio::net::TcpStream::connect(addr).await.unwrap();
1518            let p0 = listener.accept().await.unwrap().0;
1519
1520            (
1521                ConnectionBuilder::tcp_stream(p0)
1522                    .server(&guid)
1523                    .p2p()
1524                    .auth_mechanisms(&[AuthMechanism::Anonymous]),
1525                ConnectionBuilder::tcp_stream(p1).p2p(),
1526            )
1527        };
1528
1529        futures_util::try_join!(server_conn_builder.build(), client_conn_builder.build())
1530    }
1531
1532    #[cfg(unix)]
1533    #[test]
1534    #[timeout(15000)]
1535    fn unix_p2p() {
1536        crate::utils::block_on(test_unix_p2p()).unwrap();
1537    }
1538
1539    #[cfg(unix)]
1540    async fn test_unix_p2p() -> Result<()> {
1541        let (server1, client1) = unix_p2p_pipe().await?;
1542        let (server2, client2) = unix_p2p_pipe().await?;
1543
1544        test_p2p(server1, client1, server2, client2).await
1545    }
1546
1547    #[cfg(unix)]
1548    async fn unix_p2p_pipe() -> Result<(Connection, Connection)> {
1549        #[cfg(not(feature = "tokio"))]
1550        use std::os::unix::net::UnixStream;
1551        #[cfg(feature = "tokio")]
1552        use tokio::net::UnixStream;
1553        #[cfg(all(windows, not(feature = "tokio")))]
1554        use uds_windows::UnixStream;
1555
1556        let guid = Guid::generate();
1557
1558        let (p0, p1) = UnixStream::pair().unwrap();
1559
1560        futures_util::try_join!(
1561            ConnectionBuilder::unix_stream(p1).p2p().build(),
1562            ConnectionBuilder::unix_stream(p0)
1563                .server(&guid)
1564                .p2p()
1565                .build(),
1566        )
1567    }
1568
1569    // Compile-test only since we don't have a VM setup to run this with/in.
1570    #[cfg(any(
1571        all(feature = "vsock", not(feature = "tokio")),
1572        feature = "tokio-vsock"
1573    ))]
1574    #[test]
1575    #[timeout(15000)]
1576    #[ignore]
1577    fn vsock_p2p() {
1578        crate::utils::block_on(test_vsock_p2p()).unwrap();
1579    }
1580
1581    #[cfg(any(
1582        all(feature = "vsock", not(feature = "tokio")),
1583        feature = "tokio-vsock"
1584    ))]
1585    async fn test_vsock_p2p() -> Result<()> {
1586        let (server1, client1) = vsock_p2p_pipe().await?;
1587        let (server2, client2) = vsock_p2p_pipe().await?;
1588
1589        test_p2p(server1, client1, server2, client2).await
1590    }
1591
1592    #[cfg(all(feature = "vsock", not(feature = "tokio")))]
1593    async fn vsock_p2p_pipe() -> Result<(Connection, Connection)> {
1594        let guid = Guid::generate();
1595
1596        let listener = vsock::VsockListener::bind_with_cid_port(vsock::VMADDR_CID_ANY, 42).unwrap();
1597        let addr = listener.local_addr().unwrap();
1598        let client = vsock::VsockStream::connect(&addr).unwrap();
1599        let server = listener.incoming().next().unwrap().unwrap();
1600
1601        futures_util::try_join!(
1602            ConnectionBuilder::vsock_stream(server)
1603                .server(&guid)
1604                .p2p()
1605                .auth_mechanisms(&[AuthMechanism::Anonymous])
1606                .build(),
1607            ConnectionBuilder::vsock_stream(client).p2p().build(),
1608        )
1609    }
1610
1611    #[cfg(feature = "tokio-vsock")]
1612    async fn vsock_p2p_pipe() -> Result<(Connection, Connection)> {
1613        let guid = Guid::generate();
1614
1615        let listener = tokio_vsock::VsockListener::bind(2, 42).unwrap();
1616        let client = tokio_vsock::VsockStream::connect(3, 42).await.unwrap();
1617        let server = listener.incoming().next().await.unwrap().unwrap();
1618
1619        futures_util::try_join!(
1620            ConnectionBuilder::vsock_stream(server)
1621                .server(&guid)
1622                .p2p()
1623                .auth_mechanisms(&[AuthMechanism::Anonymous])
1624                .build(),
1625            ConnectionBuilder::vsock_stream(client).p2p().build(),
1626        )
1627    }
1628
1629    #[test]
1630    #[timeout(15000)]
1631    fn serial_monotonically_increases() {
1632        crate::utils::block_on(test_serial_monotonically_increases());
1633    }
1634
1635    async fn test_serial_monotonically_increases() {
1636        let c = Connection::session().await.unwrap();
1637        let serial = c.next_serial() + 1;
1638
1639        for next in serial..serial + 10 {
1640            assert_eq!(next, c.next_serial());
1641        }
1642    }
1643
1644    #[cfg(all(windows, feature = "windows-gdbus"))]
1645    #[test]
1646    fn connect_gdbus_session_bus() {
1647        let addr = crate::win32::windows_autolaunch_bus_address()
1648            .expect("Unable to get GDBus session bus address");
1649
1650        crate::block_on(async { addr.connect().await }).expect("Unable to connect to session bus");
1651    }
1652
1653    #[cfg(target_os = "macos")]
1654    #[test]
1655    fn connect_launchd_session_bus() {
1656        crate::block_on(async {
1657            let addr = crate::address::macos_launchd_bus_address("DBUS_LAUNCHD_SESSION_BUS_SOCKET")
1658                .await
1659                .expect("Unable to get Launchd session bus address");
1660            addr.connect().await
1661        })
1662        .expect("Unable to connect to session bus");
1663    }
1664
1665    #[test]
1666    #[timeout(15000)]
1667    fn disconnect_on_drop() {
1668        // Reproducer for https://github.com/dbus2/zbus/issues/308 where setting up the
1669        // objectserver would cause the connection to not disconnect on drop.
1670        crate::utils::block_on(test_disconnect_on_drop());
1671    }
1672
1673    async fn test_disconnect_on_drop() {
1674        #[derive(Default)]
1675        struct MyInterface {}
1676
1677        #[crate::dbus_interface(name = "dev.peelz.FooBar.Baz")]
1678        impl MyInterface {
1679            fn do_thing(&self) {}
1680        }
1681        let name = "dev.peelz.foobar";
1682        let connection = ConnectionBuilder::session()
1683            .unwrap()
1684            .name(name)
1685            .unwrap()
1686            .serve_at("/dev/peelz/FooBar", MyInterface::default())
1687            .unwrap()
1688            .build()
1689            .await
1690            .unwrap();
1691
1692        let connection2 = Connection::session().await.unwrap();
1693        let dbus = DBusProxy::new(&connection2).await.unwrap();
1694        let mut stream = dbus
1695            .receive_name_owner_changed_with_args(&[(0, name), (2, "")])
1696            .await
1697            .unwrap();
1698
1699        drop(connection);
1700
1701        // If the connection is not dropped, this will hang forever.
1702        stream.next().await.unwrap();
1703
1704        // Let's still make sure the name is gone.
1705        let name_has_owner = dbus.name_has_owner(name.try_into().unwrap()).await.unwrap();
1706        assert!(!name_has_owner);
1707    }
1708
1709    #[cfg(any(unix, not(feature = "tokio")))]
1710    #[test]
1711    #[timeout(15000)]
1712    fn unix_p2p_cookie_auth() {
1713        use crate::utils::block_on;
1714        use std::{
1715            fs::{create_dir_all, remove_file, write},
1716            time::{SystemTime as Time, UNIX_EPOCH},
1717        };
1718        #[cfg(unix)]
1719        use std::{
1720            fs::{set_permissions, Permissions},
1721            os::unix::fs::PermissionsExt,
1722        };
1723        use xdg_home::home_dir;
1724
1725        let cookie_context = "zbus-test-cookie-context";
1726        let cookie_id = 123456789;
1727        let cookie = hex::encode(b"our cookie");
1728
1729        // Ensure cookie directory exists.
1730        let cookie_dir = home_dir().unwrap().join(".dbus-keyrings");
1731        create_dir_all(&cookie_dir).unwrap();
1732        #[cfg(unix)]
1733        set_permissions(&cookie_dir, Permissions::from_mode(0o700)).unwrap();
1734
1735        // Create a cookie file.
1736        let cookie_file = cookie_dir.join(cookie_context);
1737        let ts = Time::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
1738        let cookie_entry = format!("{cookie_id} {ts} {cookie}");
1739        write(&cookie_file, cookie_entry).unwrap();
1740
1741        // Explicit cookie ID.
1742        let res1 = block_on(test_unix_p2p_cookie_auth(cookie_context, Some(cookie_id)));
1743        // Implicit cookie ID (first one should be picked).
1744        let res2 = block_on(test_unix_p2p_cookie_auth(cookie_context, None));
1745
1746        // Remove the cookie file.
1747        remove_file(&cookie_file).unwrap();
1748
1749        res1.unwrap();
1750        res2.unwrap();
1751    }
1752
1753    #[cfg(any(unix, not(feature = "tokio")))]
1754    async fn test_unix_p2p_cookie_auth(
1755        cookie_context: &'static str,
1756        cookie_id: Option<usize>,
1757    ) -> Result<()> {
1758        #[cfg(all(unix, not(feature = "tokio")))]
1759        use std::os::unix::net::UnixStream;
1760        #[cfg(all(unix, feature = "tokio"))]
1761        use tokio::net::UnixStream;
1762        #[cfg(all(windows, not(feature = "tokio")))]
1763        use uds_windows::UnixStream;
1764
1765        let guid = Guid::generate();
1766
1767        let (p0, p1) = UnixStream::pair().unwrap();
1768        let mut server_builder = ConnectionBuilder::unix_stream(p0)
1769            .server(&guid)
1770            .p2p()
1771            .auth_mechanisms(&[AuthMechanism::Cookie])
1772            .cookie_context(cookie_context)
1773            .unwrap();
1774        if let Some(cookie_id) = cookie_id {
1775            server_builder = server_builder.cookie_id(cookie_id);
1776        }
1777
1778        futures_util::try_join!(
1779            ConnectionBuilder::unix_stream(p1).p2p().build(),
1780            server_builder.build(),
1781        )
1782        .map(|_| ())
1783    }
1784}