zbus/blocking/
message_iterator.rs

1use futures_lite::StreamExt;
2
3use crate::{
4    blocking::Connection, message::Message, utils::block_on, MatchRule, OwnedMatchRule, Result,
5};
6
7/// A blocking wrapper of [`crate::MessageStream`].
8///
9/// Just like [`crate::MessageStream`] must be continuously polled, you must continuously iterate
10/// over this type until it's consumed or dropped.
11#[derive(Debug, Clone)]
12pub struct MessageIterator {
13    // Wrap it in an `Option` to ensure the stream is dropped in a `block_on` call. This is needed
14    // for tokio because the proxy spawns a task in its `Drop` impl and that needs a runtime
15    // context in case of tokio. Moreover, we want to use `AsyncDrop::async_drop` to drop the
16    // stream to ensure any associated match rule is deregistered before the iterator is
17    // dropped.
18    pub(crate) azync: Option<crate::MessageStream>,
19}
20
21impl MessageIterator {
22    /// Get a reference to the underlying async message stream.
23    pub fn inner(&self) -> &crate::MessageStream {
24        self.azync.as_ref().expect("Inner stream is `None`")
25    }
26
27    /// Get the underlying async message stream, consuming `self`.
28    pub fn into_inner(mut self) -> crate::MessageStream {
29        self.azync.take().expect("Inner stream is `None`")
30    }
31
32    /// Create a message iterator for the given match rule.
33    ///
34    /// This is a wrapper around [`crate::MessageStream::for_match_rule`]. Unlike the underlying
35    /// `MessageStream`, the match rule is immediately deregistered when the iterator is dropped.
36    ///
37    /// # Example
38    ///
39    /// ```
40    /// use zbus::{blocking::{Connection, MessageIterator}, MatchRule, fdo::NameOwnerChanged};
41    ///
42    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
43    /// let conn = Connection::session()?;
44    /// let rule = MatchRule::builder()
45    ///     .msg_type(zbus::message::Type::Signal)
46    ///     .sender("org.freedesktop.DBus")?
47    ///     .interface("org.freedesktop.DBus")?
48    ///     .member("NameOwnerChanged")?
49    ///     .add_arg("org.freedesktop.zbus.MatchRuleIteratorTest42")?
50    ///     .build();
51    /// let mut iter = MessageIterator::for_match_rule(
52    ///     rule,
53    ///     &conn,
54    ///     // For such a specific match rule, we don't need a big queue.
55    ///     Some(1),
56    /// )?;
57    ///
58    /// let rule_str = "type='signal',sender='org.freedesktop.DBus',\
59    ///                 interface='org.freedesktop.DBus',member='NameOwnerChanged',\
60    ///                 arg0='org.freedesktop.zbus.MatchRuleIteratorTest42'";
61    /// assert_eq!(
62    ///     iter.match_rule().map(|r| r.to_string()).as_deref(),
63    ///     Some(rule_str),
64    /// );
65    ///
66    /// // We register 2 names, starting with the uninteresting one. If `iter` wasn't filtering
67    /// // messages based on the match rule, we'd receive method return calls for each of these 2
68    /// // calls first.
69    /// //
70    /// // Note that the `NameOwnerChanged` signal will not be sent by the bus for the first name
71    /// // we register since we set up an arg filter.
72    /// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest44")?;
73    /// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest42")?;
74    ///
75    /// let msg = iter.next().unwrap()?;
76    /// let signal = NameOwnerChanged::from_message(msg).unwrap();
77    /// assert_eq!(signal.args()?.name(), "org.freedesktop.zbus.MatchRuleIteratorTest42");
78    ///
79    /// # Ok(())
80    /// # }
81    /// ```
82    ///
83    /// # Caveats
84    ///
85    /// Since this method relies on [`MatchRule::matches`], it inherits its caveats.
86    pub fn for_match_rule<R>(rule: R, conn: &Connection, max_queued: Option<usize>) -> Result<Self>
87    where
88        R: TryInto<OwnedMatchRule>,
89        R::Error: Into<crate::Error>,
90    {
91        block_on(crate::MessageStream::for_match_rule(
92            rule,
93            conn.inner(),
94            max_queued,
95        ))
96        .map(Some)
97        .map(|s| Self { azync: s })
98    }
99
100    /// The associated match rule, if any.
101    pub fn match_rule(&self) -> Option<MatchRule<'_>> {
102        self.azync
103            .as_ref()
104            .expect("Inner stream is `None`")
105            .match_rule()
106    }
107}
108
109impl Iterator for MessageIterator {
110    type Item = Result<Message>;
111
112    fn next(&mut self) -> Option<Self::Item> {
113        block_on(self.azync.as_mut().expect("Inner stream is `None`").next())
114    }
115}
116
117impl From<Connection> for MessageIterator {
118    fn from(conn: Connection) -> Self {
119        let azync = crate::MessageStream::from(conn.into_inner());
120
121        Self { azync: Some(azync) }
122    }
123}
124
125impl From<&Connection> for MessageIterator {
126    fn from(conn: &Connection) -> Self {
127        Self::from(conn.clone())
128    }
129}
130
131impl From<MessageIterator> for Connection {
132    fn from(mut iter: MessageIterator) -> Connection {
133        Connection::from(crate::Connection::from(
134            iter.azync.take().expect("Inner stream is `None`"),
135        ))
136    }
137}
138
139impl From<&MessageIterator> for Connection {
140    fn from(iter: &MessageIterator) -> Connection {
141        Connection::from(crate::Connection::from(
142            iter.azync.as_ref().expect("Inner stream is `None`"),
143        ))
144    }
145}
146
147impl std::ops::Drop for MessageIterator {
148    fn drop(&mut self) {
149        block_on(async {
150            if let Some(azync) = self.azync.take() {
151                crate::AsyncDrop::async_drop(azync).await;
152            }
153        });
154    }
155}