zbus/message_stream.rs
1use std::{
2 pin::Pin,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use async_broadcast::Receiver as ActiveReceiver;
8use futures_core::stream::{self, FusedStream};
9use ordered_stream::{OrderedStream, PollResult};
10use tracing::warn;
11
12use crate::{
13 connection::ConnectionInner,
14 message::{Message, Sequence},
15 AsyncDrop, Connection, MatchRule, OwnedMatchRule, Result,
16};
17
18/// A [`stream::Stream`] implementation that yields [`Message`] items.
19///
20/// You can convert a [`Connection`] to this type and back to [`Connection`].
21///
22/// **NOTE**: You must ensure a `MessageStream` is continuously polled or you will experience hangs.
23/// If you don't need to continuously poll the `MessageStream` but need to keep it around for later
24/// use, keep the connection around and convert it into a `MessageStream` when needed. The
25/// conversion is not an expensive operation so you don't need to worry about performance, unless
26/// you do it very frequently. If you need to convert back and forth frequently, you may want to
27/// consider keeping both a connection and stream around.
28#[derive(Clone, Debug)]
29#[must_use = "streams do nothing unless polled"]
30pub struct MessageStream {
31 inner: Inner,
32}
33
34impl MessageStream {
35 /// Create a message stream for the given match rule.
36 ///
37 /// If `conn` is a bus connection and match rule is for a signal, the match rule will be
38 /// registered with the bus and queued for deregistration when the stream is dropped. If you'd
39 /// like immediate deregistration, use [`AsyncDrop::async_drop`]. The reason match rules are
40 /// only registered with the bus for signals is that the D-Bus specification only allows signals
41 /// to be broadcasted and unicast messages are always sent to their destination (regardless
42 /// of any match rules registered by the destination) by the bus. Hence there is no need to
43 /// register match rules for non-signal messages with the bus.
44 ///
45 /// Having said that, streams created by this method can still be very useful as it allows you
46 /// to avoid needless task wakeups and simplify your stream consuming code.
47 ///
48 /// You can optionally also request the capacity of the underlying message queue through
49 /// `max_queued`. If specified, the capacity is guaranteed to be at least `max_queued`. If not
50 /// specified, the default of 64 is assumed. The capacity can also be changed later through
51 /// [`MessageStream::set_max_queued`].
52 ///
53 /// # Example
54 ///
55 /// ```
56 /// use async_io::Timer;
57 /// use zbus::{AsyncDrop, Connection, MatchRule, MessageStream, fdo::NameOwnerChanged};
58 /// use futures_util::{TryStreamExt, future::select, future::Either::{Left, Right}, pin_mut};
59 ///
60 /// # zbus::block_on(async {
61 /// let conn = Connection::session().await?;
62 /// let rule = MatchRule::builder()
63 /// .msg_type(zbus::message::Type::Signal)
64 /// .sender("org.freedesktop.DBus")?
65 /// .interface("org.freedesktop.DBus")?
66 /// .member("NameOwnerChanged")?
67 /// .add_arg("org.freedesktop.zbus.MatchRuleStreamTest42")?
68 /// .build();
69 /// let mut stream = MessageStream::for_match_rule(
70 /// rule,
71 /// &conn,
72 /// // For such a specific match rule, we don't need a big queue.
73 /// Some(1),
74 /// ).await?;
75 ///
76 /// let rule_str = "type='signal',sender='org.freedesktop.DBus',\
77 /// interface='org.freedesktop.DBus',member='NameOwnerChanged',\
78 /// arg0='org.freedesktop.zbus.MatchRuleStreamTest42'";
79 /// assert_eq!(
80 /// stream.match_rule().map(|r| r.to_string()).as_deref(),
81 /// Some(rule_str),
82 /// );
83 ///
84 /// // We register 2 names, starting with the uninteresting one. If `stream` wasn't filtering
85 /// // messages based on the match rule, we'd receive method return calls for each of these 2
86 /// // calls first.
87 /// //
88 /// // Note that the `NameOwnerChanged` signal will not be sent by the bus for the first name
89 /// // we register since we setup an arg filter.
90 /// conn.request_name("org.freedesktop.zbus.MatchRuleStreamTest44")
91 /// .await?;
92 /// conn.request_name("org.freedesktop.zbus.MatchRuleStreamTest42")
93 /// .await?;
94 ///
95 /// let msg = stream.try_next().await?.unwrap();
96 /// let signal = NameOwnerChanged::from_message(msg).unwrap();
97 /// assert_eq!(signal.args()?.name(), "org.freedesktop.zbus.MatchRuleStreamTest42");
98 /// stream.async_drop().await;
99 ///
100 /// // Ensure the match rule is deregistered and this connection doesn't receive
101 /// // `NameOwnerChanged` signals.
102 /// let stream = MessageStream::from(&conn).try_filter_map(|msg| async move {
103 /// Ok(NameOwnerChanged::from_message(msg))
104 /// });
105 /// conn.release_name("org.freedesktop.zbus.MatchRuleStreamTest42").await?;
106 ///
107 /// pin_mut!(stream);
108 /// let next = stream.try_next();
109 /// pin_mut!(next);
110 /// let timeout = Timer::after(std::time::Duration::from_millis(50));
111 /// pin_mut!(timeout);
112 /// match select(next, timeout).await {
113 /// Left((msg, _)) => unreachable!("unexpected message: {:?}", msg),
114 /// Right((_, _)) => (),
115 /// }
116 ///
117 /// # Ok::<(), zbus::Error>(())
118 /// # }).unwrap();
119 /// ```
120 ///
121 /// # Caveats
122 ///
123 /// Since this method relies on [`MatchRule::matches`], it inherits its caveats.
124 pub async fn for_match_rule<R>(
125 rule: R,
126 conn: &Connection,
127 max_queued: Option<usize>,
128 ) -> Result<Self>
129 where
130 R: TryInto<OwnedMatchRule>,
131 R::Error: Into<crate::Error>,
132 {
133 let rule = rule.try_into().map_err(Into::into)?;
134 let msg_receiver = conn.add_match(rule.clone(), max_queued).await?;
135
136 Ok(Self::for_subscription_channel(
137 msg_receiver,
138 Some(rule),
139 conn,
140 ))
141 }
142
143 /// The associated match rule, if any.
144 pub fn match_rule(&self) -> Option<MatchRule<'_>> {
145 self.inner.match_rule.as_deref().cloned()
146 }
147
148 /// The maximum number of messages to queue for this stream.
149 pub fn max_queued(&self) -> usize {
150 self.inner.msg_receiver.capacity()
151 }
152
153 /// Set the maximum number of messages to queue for this stream.
154 ///
155 /// After this call, the capacity is guaranteed to be at least `max_queued`.
156 pub fn set_max_queued(&mut self, max_queued: usize) {
157 if max_queued <= self.max_queued() {
158 return;
159 }
160 self.inner.msg_receiver.set_capacity(max_queued);
161 }
162
163 pub(crate) fn for_subscription_channel(
164 msg_receiver: ActiveReceiver<Result<Message>>,
165 rule: Option<OwnedMatchRule>,
166 conn: &Connection,
167 ) -> Self {
168 let conn_inner = conn.inner.clone();
169
170 Self {
171 inner: Inner {
172 conn_inner,
173 msg_receiver,
174 match_rule: rule,
175 },
176 }
177 }
178}
179
180impl stream::Stream for MessageStream {
181 type Item = Result<Message>;
182
183 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
184 let this = self.get_mut();
185
186 Pin::new(&mut this.inner.msg_receiver).poll_next(cx)
187 }
188}
189
190impl OrderedStream for MessageStream {
191 type Data = Result<Message>;
192 type Ordering = Sequence;
193
194 fn poll_next_before(
195 self: Pin<&mut Self>,
196 cx: &mut Context<'_>,
197 before: Option<&Self::Ordering>,
198 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
199 let this = self.get_mut();
200
201 match stream::Stream::poll_next(Pin::new(this), cx) {
202 Poll::Pending if before.is_some() => {
203 // Assume the provided Sequence in before was obtained from a Message
204 // associated with our Connection (because that's the only supported use case).
205 // Because there is only one socket-reader task, any messages that would have been
206 // ordered before that message would have already been sitting in the broadcast
207 // queue (and we would have seen Ready in our poll). Because we didn't, we can
208 // guarantee that we won't ever produce a message whose sequence is before that
209 // provided value, and so we can return NoneBefore.
210 //
211 // This ensures that ordered_stream::Join will never return Pending while it
212 // has a message buffered.
213 Poll::Ready(PollResult::NoneBefore)
214 }
215 Poll::Pending => Poll::Pending,
216 Poll::Ready(Some(Ok(msg))) => Poll::Ready(PollResult::Item {
217 ordering: msg.recv_position(),
218 data: Ok(msg),
219 }),
220 Poll::Ready(Some(Err(e))) => Poll::Ready(PollResult::Item {
221 ordering: Sequence::LAST,
222 data: Err(e),
223 }),
224 Poll::Ready(None) => Poll::Ready(PollResult::Terminated),
225 }
226 }
227}
228
229impl FusedStream for MessageStream {
230 fn is_terminated(&self) -> bool {
231 self.inner.msg_receiver.is_terminated()
232 }
233}
234
235impl From<Connection> for MessageStream {
236 fn from(conn: Connection) -> Self {
237 let conn_inner = conn.inner;
238 let msg_receiver = conn_inner.msg_receiver.activate_cloned();
239
240 Self {
241 inner: Inner {
242 conn_inner,
243 msg_receiver,
244 match_rule: None,
245 },
246 }
247 }
248}
249
250impl From<&Connection> for MessageStream {
251 fn from(conn: &Connection) -> Self {
252 Self::from(conn.clone())
253 }
254}
255
256impl From<MessageStream> for Connection {
257 fn from(stream: MessageStream) -> Connection {
258 Connection::from(&stream)
259 }
260}
261
262impl From<&MessageStream> for Connection {
263 fn from(stream: &MessageStream) -> Connection {
264 Connection {
265 inner: stream.inner.conn_inner.clone(),
266 }
267 }
268}
269
270#[derive(Clone, Debug)]
271struct Inner {
272 conn_inner: Arc<ConnectionInner>,
273 msg_receiver: ActiveReceiver<Result<Message>>,
274 match_rule: Option<OwnedMatchRule>,
275}
276
277impl Drop for Inner {
278 fn drop(&mut self) {
279 let conn = Connection {
280 inner: self.conn_inner.clone(),
281 };
282
283 if let Some(rule) = self.match_rule.take() {
284 conn.queue_remove_match(rule);
285 }
286 }
287}
288
289#[async_trait::async_trait]
290impl AsyncDrop for MessageStream {
291 async fn async_drop(mut self) {
292 let conn = Connection {
293 inner: self.inner.conn_inner.clone(),
294 };
295
296 if let Some(rule) = self.inner.match_rule.take() {
297 if let Err(e) = conn.remove_match(rule).await {
298 warn!("Failed to remove match rule: {}", e);
299 }
300 }
301 }
302}