zbus/blocking/message_iterator.rs
1use futures_lite::StreamExt;
2
3use crate::{
4 blocking::Connection, message::Message, utils::block_on, MatchRule, OwnedMatchRule, Result,
5};
6
7/// A blocking wrapper of [`crate::MessageStream`].
8///
9/// Just like [`crate::MessageStream`] must be continuously polled, you must continuously iterate
10/// over this type until it's consumed or dropped.
11#[derive(Debug, Clone)]
12pub struct MessageIterator {
13 // Wrap it in an `Option` to ensure the stream is dropped in a `block_on` call. This is needed
14 // for tokio because the proxy spawns a task in its `Drop` impl and that needs a runtime
15 // context in case of tokio. Moreover, we want to use `AsyncDrop::async_drop` to drop the
16 // stream to ensure any associated match rule is deregistered before the iterator is
17 // dropped.
18 pub(crate) azync: Option<crate::MessageStream>,
19}
20
21impl MessageIterator {
22 /// Get a reference to the underlying async message stream.
23 pub fn inner(&self) -> &crate::MessageStream {
24 self.azync.as_ref().expect("Inner stream is `None`")
25 }
26
27 /// Get the underlying async message stream, consuming `self`.
28 pub fn into_inner(mut self) -> crate::MessageStream {
29 self.azync.take().expect("Inner stream is `None`")
30 }
31
32 /// Create a message iterator for the given match rule.
33 ///
34 /// This is a wrapper around [`crate::MessageStream::for_match_rule`]. Unlike the underlying
35 /// `MessageStream`, the match rule is immediately deregistered when the iterator is dropped.
36 ///
37 /// # Example
38 ///
39 /// ```
40 /// use zbus::{blocking::{Connection, MessageIterator}, MatchRule, fdo::NameOwnerChanged};
41 ///
42 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
43 /// let conn = Connection::session()?;
44 /// let rule = MatchRule::builder()
45 /// .msg_type(zbus::message::Type::Signal)
46 /// .sender("org.freedesktop.DBus")?
47 /// .interface("org.freedesktop.DBus")?
48 /// .member("NameOwnerChanged")?
49 /// .add_arg("org.freedesktop.zbus.MatchRuleIteratorTest42")?
50 /// .build();
51 /// let mut iter = MessageIterator::for_match_rule(
52 /// rule,
53 /// &conn,
54 /// // For such a specific match rule, we don't need a big queue.
55 /// Some(1),
56 /// )?;
57 ///
58 /// let rule_str = "type='signal',sender='org.freedesktop.DBus',\
59 /// interface='org.freedesktop.DBus',member='NameOwnerChanged',\
60 /// arg0='org.freedesktop.zbus.MatchRuleIteratorTest42'";
61 /// assert_eq!(
62 /// iter.match_rule().map(|r| r.to_string()).as_deref(),
63 /// Some(rule_str),
64 /// );
65 ///
66 /// // We register 2 names, starting with the uninteresting one. If `iter` wasn't filtering
67 /// // messages based on the match rule, we'd receive method return calls for each of these 2
68 /// // calls first.
69 /// //
70 /// // Note that the `NameOwnerChanged` signal will not be sent by the bus for the first name
71 /// // we register since we set up an arg filter.
72 /// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest44")?;
73 /// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest42")?;
74 ///
75 /// let msg = iter.next().unwrap()?;
76 /// let signal = NameOwnerChanged::from_message(msg).unwrap();
77 /// assert_eq!(signal.args()?.name(), "org.freedesktop.zbus.MatchRuleIteratorTest42");
78 ///
79 /// # Ok(())
80 /// # }
81 /// ```
82 ///
83 /// # Caveats
84 ///
85 /// Since this method relies on [`MatchRule::matches`], it inherits its caveats.
86 pub fn for_match_rule<R>(rule: R, conn: &Connection, max_queued: Option<usize>) -> Result<Self>
87 where
88 R: TryInto<OwnedMatchRule>,
89 R::Error: Into<crate::Error>,
90 {
91 block_on(crate::MessageStream::for_match_rule(
92 rule,
93 conn.inner(),
94 max_queued,
95 ))
96 .map(Some)
97 .map(|s| Self { azync: s })
98 }
99
100 /// The associated match rule, if any.
101 pub fn match_rule(&self) -> Option<MatchRule<'_>> {
102 self.azync
103 .as_ref()
104 .expect("Inner stream is `None`")
105 .match_rule()
106 }
107}
108
109impl Iterator for MessageIterator {
110 type Item = Result<Message>;
111
112 fn next(&mut self) -> Option<Self::Item> {
113 block_on(self.azync.as_mut().expect("Inner stream is `None`").next())
114 }
115}
116
117impl From<Connection> for MessageIterator {
118 fn from(conn: Connection) -> Self {
119 let azync = crate::MessageStream::from(conn.into_inner());
120
121 Self { azync: Some(azync) }
122 }
123}
124
125impl From<&Connection> for MessageIterator {
126 fn from(conn: &Connection) -> Self {
127 Self::from(conn.clone())
128 }
129}
130
131impl From<MessageIterator> for Connection {
132 fn from(mut iter: MessageIterator) -> Connection {
133 Connection::from(crate::Connection::from(
134 iter.azync.take().expect("Inner stream is `None`"),
135 ))
136 }
137}
138
139impl From<&MessageIterator> for Connection {
140 fn from(iter: &MessageIterator) -> Connection {
141 Connection::from(crate::Connection::from(
142 iter.azync.as_ref().expect("Inner stream is `None`"),
143 ))
144 }
145}
146
147impl std::ops::Drop for MessageIterator {
148 fn drop(&mut self) {
149 block_on(async {
150 if let Some(azync) = self.azync.take() {
151 crate::AsyncDrop::async_drop(azync).await;
152 }
153 });
154 }
155}