zbus/
message_stream.rs

1use std::{
2    pin::Pin,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use async_broadcast::Receiver as ActiveReceiver;
8use futures_core::stream::{self, FusedStream};
9use ordered_stream::{OrderedStream, PollResult};
10use tracing::warn;
11
12use crate::{
13    connection::ConnectionInner,
14    message::{Message, Sequence},
15    AsyncDrop, Connection, MatchRule, OwnedMatchRule, Result,
16};
17
18/// A [`stream::Stream`] implementation that yields [`Message`] items.
19///
20/// You can convert a [`Connection`] to this type and back to [`Connection`].
21///
22/// **NOTE**: You must ensure a `MessageStream` is continuously polled or you will experience hangs.
23/// If you don't need to continuously poll the `MessageStream` but need to keep it around for later
24/// use, keep the connection around and convert it into a `MessageStream` when needed. The
25/// conversion is not an expensive operation so you don't need to worry about performance, unless
26/// you do it very frequently. If you need to convert back and forth frequently, you may want to
27/// consider keeping both a connection and stream around.
28#[derive(Clone, Debug)]
29#[must_use = "streams do nothing unless polled"]
30pub struct MessageStream {
31    inner: Inner,
32}
33
34impl MessageStream {
35    /// Create a message stream for the given match rule.
36    ///
37    /// If `conn` is a bus connection and match rule is for a signal, the match rule will be
38    /// registered with the bus and queued for deregistration when the stream is dropped. If you'd
39    /// like immediate deregistration, use [`AsyncDrop::async_drop`]. The reason match rules are
40    /// only registered with the bus for signals is that the D-Bus specification only allows signals
41    /// to be broadcasted and unicast messages are always sent to their destination (regardless
42    /// of any match rules registered by the destination) by the bus. Hence there is no need to
43    /// register match rules for non-signal messages with the bus.
44    ///
45    /// Having said that, streams created by this method can still be very useful as it allows you
46    /// to avoid needless task wakeups and simplify your stream consuming code.
47    ///
48    /// You can optionally also request the capacity of the underlying message queue through
49    /// `max_queued`. If specified, the capacity is guaranteed to be at least `max_queued`. If not
50    /// specified, the default of 64 is assumed. The capacity can also be changed later through
51    /// [`MessageStream::set_max_queued`].
52    ///
53    /// # Example
54    ///
55    /// ```
56    /// use async_io::Timer;
57    /// use zbus::{AsyncDrop, Connection, MatchRule, MessageStream, fdo::NameOwnerChanged};
58    /// use futures_util::{TryStreamExt, future::select, future::Either::{Left, Right}, pin_mut};
59    ///
60    /// # zbus::block_on(async {
61    /// let conn = Connection::session().await?;
62    /// let rule = MatchRule::builder()
63    ///     .msg_type(zbus::message::Type::Signal)
64    ///     .sender("org.freedesktop.DBus")?
65    ///     .interface("org.freedesktop.DBus")?
66    ///     .member("NameOwnerChanged")?
67    ///     .add_arg("org.freedesktop.zbus.MatchRuleStreamTest42")?
68    ///     .build();
69    /// let mut stream = MessageStream::for_match_rule(
70    ///     rule,
71    ///     &conn,
72    ///     // For such a specific match rule, we don't need a big queue.
73    ///     Some(1),
74    /// ).await?;
75    ///
76    /// let rule_str = "type='signal',sender='org.freedesktop.DBus',\
77    ///                 interface='org.freedesktop.DBus',member='NameOwnerChanged',\
78    ///                 arg0='org.freedesktop.zbus.MatchRuleStreamTest42'";
79    /// assert_eq!(
80    ///     stream.match_rule().map(|r| r.to_string()).as_deref(),
81    ///     Some(rule_str),
82    /// );
83    ///
84    /// // We register 2 names, starting with the uninteresting one. If `stream` wasn't filtering
85    /// // messages based on the match rule, we'd receive method return calls for each of these 2
86    /// // calls first.
87    /// //
88    /// // Note that the `NameOwnerChanged` signal will not be sent by the bus for the first name
89    /// // we register since we setup an arg filter.
90    /// conn.request_name("org.freedesktop.zbus.MatchRuleStreamTest44")
91    ///     .await?;
92    /// conn.request_name("org.freedesktop.zbus.MatchRuleStreamTest42")
93    ///     .await?;
94    ///
95    /// let msg = stream.try_next().await?.unwrap();
96    /// let signal = NameOwnerChanged::from_message(msg).unwrap();
97    /// assert_eq!(signal.args()?.name(), "org.freedesktop.zbus.MatchRuleStreamTest42");
98    /// stream.async_drop().await;
99    ///
100    /// // Ensure the match rule is deregistered and this connection doesn't receive
101    /// // `NameOwnerChanged` signals.
102    /// let stream = MessageStream::from(&conn).try_filter_map(|msg| async move {
103    ///     Ok(NameOwnerChanged::from_message(msg))
104    /// });
105    /// conn.release_name("org.freedesktop.zbus.MatchRuleStreamTest42").await?;
106    ///
107    /// pin_mut!(stream);
108    /// let next = stream.try_next();
109    /// pin_mut!(next);
110    /// let timeout = Timer::after(std::time::Duration::from_millis(50));
111    /// pin_mut!(timeout);
112    /// match select(next, timeout).await {
113    ///    Left((msg, _)) => unreachable!("unexpected message: {:?}", msg),
114    ///    Right((_, _)) => (),
115    /// }
116    ///
117    /// # Ok::<(), zbus::Error>(())
118    /// # }).unwrap();
119    /// ```
120    ///
121    /// # Caveats
122    ///
123    /// Since this method relies on [`MatchRule::matches`], it inherits its caveats.
124    pub async fn for_match_rule<R>(
125        rule: R,
126        conn: &Connection,
127        max_queued: Option<usize>,
128    ) -> Result<Self>
129    where
130        R: TryInto<OwnedMatchRule>,
131        R::Error: Into<crate::Error>,
132    {
133        let rule = rule.try_into().map_err(Into::into)?;
134        let msg_receiver = conn.add_match(rule.clone(), max_queued).await?;
135
136        Ok(Self::for_subscription_channel(
137            msg_receiver,
138            Some(rule),
139            conn,
140        ))
141    }
142
143    /// The associated match rule, if any.
144    pub fn match_rule(&self) -> Option<MatchRule<'_>> {
145        self.inner.match_rule.as_deref().cloned()
146    }
147
148    /// The maximum number of messages to queue for this stream.
149    pub fn max_queued(&self) -> usize {
150        self.inner.msg_receiver.capacity()
151    }
152
153    /// Set the maximum number of messages to queue for this stream.
154    ///
155    /// After this call, the capacity is guaranteed to be at least `max_queued`.
156    pub fn set_max_queued(&mut self, max_queued: usize) {
157        if max_queued <= self.max_queued() {
158            return;
159        }
160        self.inner.msg_receiver.set_capacity(max_queued);
161    }
162
163    pub(crate) fn for_subscription_channel(
164        msg_receiver: ActiveReceiver<Result<Message>>,
165        rule: Option<OwnedMatchRule>,
166        conn: &Connection,
167    ) -> Self {
168        let conn_inner = conn.inner.clone();
169
170        Self {
171            inner: Inner {
172                conn_inner,
173                msg_receiver,
174                match_rule: rule,
175            },
176        }
177    }
178}
179
180impl stream::Stream for MessageStream {
181    type Item = Result<Message>;
182
183    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
184        let this = self.get_mut();
185
186        Pin::new(&mut this.inner.msg_receiver).poll_next(cx)
187    }
188}
189
190impl OrderedStream for MessageStream {
191    type Data = Result<Message>;
192    type Ordering = Sequence;
193
194    fn poll_next_before(
195        self: Pin<&mut Self>,
196        cx: &mut Context<'_>,
197        before: Option<&Self::Ordering>,
198    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
199        let this = self.get_mut();
200
201        match stream::Stream::poll_next(Pin::new(this), cx) {
202            Poll::Pending if before.is_some() => {
203                // Assume the provided Sequence in before was obtained from a Message
204                // associated with our Connection (because that's the only supported use case).
205                // Because there is only one socket-reader task, any messages that would have been
206                // ordered before that message would have already been sitting in the broadcast
207                // queue (and we would have seen Ready in our poll).  Because we didn't, we can
208                // guarantee that we won't ever produce a message whose sequence is before that
209                // provided value, and so we can return NoneBefore.
210                //
211                // This ensures that ordered_stream::Join will never return Pending while it
212                // has a message buffered.
213                Poll::Ready(PollResult::NoneBefore)
214            }
215            Poll::Pending => Poll::Pending,
216            Poll::Ready(Some(Ok(msg))) => Poll::Ready(PollResult::Item {
217                ordering: msg.recv_position(),
218                data: Ok(msg),
219            }),
220            Poll::Ready(Some(Err(e))) => Poll::Ready(PollResult::Item {
221                ordering: Sequence::LAST,
222                data: Err(e),
223            }),
224            Poll::Ready(None) => Poll::Ready(PollResult::Terminated),
225        }
226    }
227}
228
229impl FusedStream for MessageStream {
230    fn is_terminated(&self) -> bool {
231        self.inner.msg_receiver.is_terminated()
232    }
233}
234
235impl From<Connection> for MessageStream {
236    fn from(conn: Connection) -> Self {
237        let conn_inner = conn.inner;
238        let msg_receiver = conn_inner.msg_receiver.activate_cloned();
239
240        Self {
241            inner: Inner {
242                conn_inner,
243                msg_receiver,
244                match_rule: None,
245            },
246        }
247    }
248}
249
250impl From<&Connection> for MessageStream {
251    fn from(conn: &Connection) -> Self {
252        Self::from(conn.clone())
253    }
254}
255
256impl From<MessageStream> for Connection {
257    fn from(stream: MessageStream) -> Connection {
258        Connection::from(&stream)
259    }
260}
261
262impl From<&MessageStream> for Connection {
263    fn from(stream: &MessageStream) -> Connection {
264        Connection {
265            inner: stream.inner.conn_inner.clone(),
266        }
267    }
268}
269
270#[derive(Clone, Debug)]
271struct Inner {
272    conn_inner: Arc<ConnectionInner>,
273    msg_receiver: ActiveReceiver<Result<Message>>,
274    match_rule: Option<OwnedMatchRule>,
275}
276
277impl Drop for Inner {
278    fn drop(&mut self) {
279        let conn = Connection {
280            inner: self.conn_inner.clone(),
281        };
282
283        if let Some(rule) = self.match_rule.take() {
284            conn.queue_remove_match(rule);
285        }
286    }
287}
288
289#[async_trait::async_trait]
290impl AsyncDrop for MessageStream {
291    async fn async_drop(mut self) {
292        let conn = Connection {
293            inner: self.inner.conn_inner.clone(),
294        };
295
296        if let Some(rule) = self.inner.match_rule.take() {
297            if let Err(e) = conn.remove_match(rule).await {
298                warn!("Failed to remove match rule: {}", e);
299            }
300        }
301    }
302}