zbus/blocking/
message_iterator.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
use futures_util::StreamExt;
use static_assertions::assert_impl_all;
use std::{convert::TryInto, sync::Arc};

use crate::{blocking::Connection, utils::block_on, MatchRule, Message, OwnedMatchRule, Result};

/// A blocking wrapper of [`crate::MessageStream`].
///
/// Just like [`crate::MessageStream`] must be continuously polled, you must continuously iterate
/// over this type until it's consumed or dropped.
#[derive(derivative::Derivative, Clone)]
#[derivative(Debug)]
pub struct MessageIterator {
    // Wrap it in an `Option` to ensure the stream is dropped in a `block_on` call. This is needed
    // for tokio because the proxy spawns a task in its `Drop` impl and that needs a runtime
    // context in case of tokio. Moreover, we want to use `AsyncDrop::async_drop` to drop the
    // stream to ensure any associated match rule is deregistered before the iterator is
    // dropped.
    pub(crate) azync: Option<crate::MessageStream>,
}

assert_impl_all!(MessageIterator: Send, Sync, Unpin);

impl MessageIterator {
    /// Get a reference to the underlying async message stream.
    pub fn inner(&self) -> &crate::MessageStream {
        self.azync.as_ref().expect("Inner stream is `None`")
    }

    /// Get the underlying async message stream, consuming `self`.
    pub fn into_inner(mut self) -> crate::MessageStream {
        self.azync.take().expect("Inner stream is `None`")
    }

    /// Create a message iterator for the given match rule.
    ///
    /// This is a wrapper around [`crate::MessageStream::for_match_rule`]. Unlike the underlying
    /// `MessageStream`, the match rule is immediately deregistered when the iterator is dropped.
    ///
    /// # Example
    ///
    /// ```
    /// use zbus::{blocking::{Connection, MessageIterator}, MatchRule, fdo::NameOwnerChanged};
    ///
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// let conn = Connection::session()?;
    /// let rule = MatchRule::builder()
    ///     .msg_type(zbus::MessageType::Signal)
    ///     .sender("org.freedesktop.DBus")?
    ///     .interface("org.freedesktop.DBus")?
    ///     .member("NameOwnerChanged")?
    ///     .add_arg("org.freedesktop.zbus.MatchRuleIteratorTest42")?
    ///     .build();
    /// let mut iter = MessageIterator::for_match_rule(
    ///     rule,
    ///     &conn,
    ///     // For such a specific match rule, we don't need a big queue.
    ///     Some(1),
    /// )?;
    ///
    /// let rule_str = "type='signal',sender='org.freedesktop.DBus',\
    ///                 interface='org.freedesktop.DBus',member='NameOwnerChanged',\
    ///                 arg0='org.freedesktop.zbus.MatchRuleIteratorTest42'";
    /// assert_eq!(
    ///     iter.match_rule().map(|r| r.to_string()).as_deref(),
    ///     Some(rule_str),
    /// );
    ///
    /// // We register 2 names, starting with the uninteresting one. If `iter` wasn't filtering
    /// // messages based on the match rule, we'd receive method return call for each of these 2
    /// // calls first.
    /// //
    /// // Note that the `NameOwnerChanged` signal will not be sent by the bus  for the first name
    /// // we register since we setup an arg filter.
    /// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest44")?;
    /// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest42")?;
    ///
    /// let msg = iter.next().unwrap()?;
    /// let signal = NameOwnerChanged::from_message(msg).unwrap();
    /// assert_eq!(signal.args()?.name(), "org.freedesktop.zbus.MatchRuleIteratorTest42");
    ///
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Caveats
    ///
    /// Since this method relies on [`MatchRule::matches`], it inherits its caveats.
    pub fn for_match_rule<R>(rule: R, conn: &Connection, max_queued: Option<usize>) -> Result<Self>
    where
        R: TryInto<OwnedMatchRule>,
        R::Error: Into<crate::Error>,
    {
        block_on(crate::MessageStream::for_match_rule(
            rule,
            conn.inner(),
            max_queued,
        ))
        .map(Some)
        .map(|s| Self { azync: s })
    }

    /// The associated match rule, if any.
    pub fn match_rule(&self) -> Option<MatchRule<'_>> {
        self.azync
            .as_ref()
            .expect("Inner stream is `None`")
            .match_rule()
    }
}

impl Iterator for MessageIterator {
    type Item = Result<Arc<Message>>;

    fn next(&mut self) -> Option<Self::Item> {
        block_on(self.azync.as_mut().expect("Inner stream is `None`").next())
    }
}

impl From<Connection> for MessageIterator {
    fn from(conn: Connection) -> Self {
        let azync = crate::MessageStream::from(conn.into_inner());

        Self { azync: Some(azync) }
    }
}

impl From<&Connection> for MessageIterator {
    fn from(conn: &Connection) -> Self {
        Self::from(conn.clone())
    }
}

impl From<MessageIterator> for Connection {
    fn from(mut iter: MessageIterator) -> Connection {
        Connection::from(crate::Connection::from(
            iter.azync.take().expect("Inner stream is `None`"),
        ))
    }
}

impl From<&MessageIterator> for Connection {
    fn from(iter: &MessageIterator) -> Connection {
        Connection::from(crate::Connection::from(
            iter.azync.as_ref().expect("Inner stream is `None`"),
        ))
    }
}

impl std::ops::Drop for MessageIterator {
    fn drop(&mut self) {
        block_on(async {
            if let Some(azync) = self.azync.take() {
                crate::AsyncDrop::async_drop(azync).await;
            }
        });
    }
}