zbus/blocking/
message_iterator.rs

1use futures_util::StreamExt;
2use static_assertions::assert_impl_all;
3use std::{convert::TryInto, sync::Arc};
4
5use crate::{blocking::Connection, utils::block_on, MatchRule, Message, OwnedMatchRule, Result};
6
7/// A blocking wrapper of [`crate::MessageStream`].
8///
9/// Just like [`crate::MessageStream`] must be continuously polled, you must continuously iterate
10/// over this type until it's consumed or dropped.
11#[derive(derivative::Derivative, Clone)]
12#[derivative(Debug)]
13pub struct MessageIterator {
14    // Wrap it in an `Option` to ensure the stream is dropped in a `block_on` call. This is needed
15    // for tokio because the proxy spawns a task in its `Drop` impl and that needs a runtime
16    // context in case of tokio. Moreover, we want to use `AsyncDrop::async_drop` to drop the
17    // stream to ensure any associated match rule is deregistered before the iterator is
18    // dropped.
19    pub(crate) azync: Option<crate::MessageStream>,
20}
21
22assert_impl_all!(MessageIterator: Send, Sync, Unpin);
23
24impl MessageIterator {
25    /// Get a reference to the underlying async message stream.
26    pub fn inner(&self) -> &crate::MessageStream {
27        self.azync.as_ref().expect("Inner stream is `None`")
28    }
29
30    /// Get the underlying async message stream, consuming `self`.
31    pub fn into_inner(mut self) -> crate::MessageStream {
32        self.azync.take().expect("Inner stream is `None`")
33    }
34
35    /// Create a message iterator for the given match rule.
36    ///
37    /// This is a wrapper around [`crate::MessageStream::for_match_rule`]. Unlike the underlying
38    /// `MessageStream`, the match rule is immediately deregistered when the iterator is dropped.
39    ///
40    /// # Example
41    ///
42    /// ```
43    /// use zbus::{blocking::{Connection, MessageIterator}, MatchRule, fdo::NameOwnerChanged};
44    ///
45    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
46    /// let conn = Connection::session()?;
47    /// let rule = MatchRule::builder()
48    ///     .msg_type(zbus::MessageType::Signal)
49    ///     .sender("org.freedesktop.DBus")?
50    ///     .interface("org.freedesktop.DBus")?
51    ///     .member("NameOwnerChanged")?
52    ///     .add_arg("org.freedesktop.zbus.MatchRuleIteratorTest42")?
53    ///     .build();
54    /// let mut iter = MessageIterator::for_match_rule(
55    ///     rule,
56    ///     &conn,
57    ///     // For such a specific match rule, we don't need a big queue.
58    ///     Some(1),
59    /// )?;
60    ///
61    /// let rule_str = "type='signal',sender='org.freedesktop.DBus',\
62    ///                 interface='org.freedesktop.DBus',member='NameOwnerChanged',\
63    ///                 arg0='org.freedesktop.zbus.MatchRuleIteratorTest42'";
64    /// assert_eq!(
65    ///     iter.match_rule().map(|r| r.to_string()).as_deref(),
66    ///     Some(rule_str),
67    /// );
68    ///
69    /// // We register 2 names, starting with the uninteresting one. If `iter` wasn't filtering
70    /// // messages based on the match rule, we'd receive method return call for each of these 2
71    /// // calls first.
72    /// //
73    /// // Note that the `NameOwnerChanged` signal will not be sent by the bus  for the first name
74    /// // we register since we setup an arg filter.
75    /// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest44")?;
76    /// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest42")?;
77    ///
78    /// let msg = iter.next().unwrap()?;
79    /// let signal = NameOwnerChanged::from_message(msg).unwrap();
80    /// assert_eq!(signal.args()?.name(), "org.freedesktop.zbus.MatchRuleIteratorTest42");
81    ///
82    /// # Ok(())
83    /// # }
84    /// ```
85    ///
86    /// # Caveats
87    ///
88    /// Since this method relies on [`MatchRule::matches`], it inherits its caveats.
89    pub fn for_match_rule<R>(rule: R, conn: &Connection, max_queued: Option<usize>) -> Result<Self>
90    where
91        R: TryInto<OwnedMatchRule>,
92        R::Error: Into<crate::Error>,
93    {
94        block_on(crate::MessageStream::for_match_rule(
95            rule,
96            conn.inner(),
97            max_queued,
98        ))
99        .map(Some)
100        .map(|s| Self { azync: s })
101    }
102
103    /// The associated match rule, if any.
104    pub fn match_rule(&self) -> Option<MatchRule<'_>> {
105        self.azync
106            .as_ref()
107            .expect("Inner stream is `None`")
108            .match_rule()
109    }
110}
111
112impl Iterator for MessageIterator {
113    type Item = Result<Arc<Message>>;
114
115    fn next(&mut self) -> Option<Self::Item> {
116        block_on(self.azync.as_mut().expect("Inner stream is `None`").next())
117    }
118}
119
120impl From<Connection> for MessageIterator {
121    fn from(conn: Connection) -> Self {
122        let azync = crate::MessageStream::from(conn.into_inner());
123
124        Self { azync: Some(azync) }
125    }
126}
127
128impl From<&Connection> for MessageIterator {
129    fn from(conn: &Connection) -> Self {
130        Self::from(conn.clone())
131    }
132}
133
134impl From<MessageIterator> for Connection {
135    fn from(mut iter: MessageIterator) -> Connection {
136        Connection::from(crate::Connection::from(
137            iter.azync.take().expect("Inner stream is `None`"),
138        ))
139    }
140}
141
142impl From<&MessageIterator> for Connection {
143    fn from(iter: &MessageIterator) -> Connection {
144        Connection::from(crate::Connection::from(
145            iter.azync.as_ref().expect("Inner stream is `None`"),
146        ))
147    }
148}
149
150impl std::ops::Drop for MessageIterator {
151    fn drop(&mut self) {
152        block_on(async {
153            if let Some(azync) = self.azync.take() {
154                crate::AsyncDrop::async_drop(azync).await;
155            }
156        });
157    }
158}