zbus/blocking/message_iterator.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
use futures_util::StreamExt;
use static_assertions::assert_impl_all;
use std::{convert::TryInto, sync::Arc};
use crate::{blocking::Connection, utils::block_on, MatchRule, Message, OwnedMatchRule, Result};
/// A blocking wrapper of [`crate::MessageStream`].
///
/// Just like [`crate::MessageStream`] must be continuously polled, you must continuously iterate
/// over this type until it's consumed or dropped.
#[derive(derivative::Derivative, Clone)]
#[derivative(Debug)]
pub struct MessageIterator {
// Wrap it in an `Option` to ensure the stream is dropped in a `block_on` call. This is needed
// for tokio because the proxy spawns a task in its `Drop` impl and that needs a runtime
// context in case of tokio. Moreover, we want to use `AsyncDrop::async_drop` to drop the
// stream to ensure any associated match rule is deregistered before the iterator is
// dropped.
pub(crate) azync: Option<crate::MessageStream>,
}
assert_impl_all!(MessageIterator: Send, Sync, Unpin);
impl MessageIterator {
/// Get a reference to the underlying async message stream.
pub fn inner(&self) -> &crate::MessageStream {
self.azync.as_ref().expect("Inner stream is `None`")
}
/// Get the underlying async message stream, consuming `self`.
pub fn into_inner(mut self) -> crate::MessageStream {
self.azync.take().expect("Inner stream is `None`")
}
/// Create a message iterator for the given match rule.
///
/// This is a wrapper around [`crate::MessageStream::for_match_rule`]. Unlike the underlying
/// `MessageStream`, the match rule is immediately deregistered when the iterator is dropped.
///
/// # Example
///
/// ```
/// use zbus::{blocking::{Connection, MessageIterator}, MatchRule, fdo::NameOwnerChanged};
///
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let conn = Connection::session()?;
/// let rule = MatchRule::builder()
/// .msg_type(zbus::MessageType::Signal)
/// .sender("org.freedesktop.DBus")?
/// .interface("org.freedesktop.DBus")?
/// .member("NameOwnerChanged")?
/// .add_arg("org.freedesktop.zbus.MatchRuleIteratorTest42")?
/// .build();
/// let mut iter = MessageIterator::for_match_rule(
/// rule,
/// &conn,
/// // For such a specific match rule, we don't need a big queue.
/// Some(1),
/// )?;
///
/// let rule_str = "type='signal',sender='org.freedesktop.DBus',\
/// interface='org.freedesktop.DBus',member='NameOwnerChanged',\
/// arg0='org.freedesktop.zbus.MatchRuleIteratorTest42'";
/// assert_eq!(
/// iter.match_rule().map(|r| r.to_string()).as_deref(),
/// Some(rule_str),
/// );
///
/// // We register 2 names, starting with the uninteresting one. If `iter` wasn't filtering
/// // messages based on the match rule, we'd receive method return call for each of these 2
/// // calls first.
/// //
/// // Note that the `NameOwnerChanged` signal will not be sent by the bus for the first name
/// // we register since we setup an arg filter.
/// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest44")?;
/// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest42")?;
///
/// let msg = iter.next().unwrap()?;
/// let signal = NameOwnerChanged::from_message(msg).unwrap();
/// assert_eq!(signal.args()?.name(), "org.freedesktop.zbus.MatchRuleIteratorTest42");
///
/// # Ok(())
/// # }
/// ```
///
/// # Caveats
///
/// Since this method relies on [`MatchRule::matches`], it inherits its caveats.
pub fn for_match_rule<R>(rule: R, conn: &Connection, max_queued: Option<usize>) -> Result<Self>
where
R: TryInto<OwnedMatchRule>,
R::Error: Into<crate::Error>,
{
block_on(crate::MessageStream::for_match_rule(
rule,
conn.inner(),
max_queued,
))
.map(Some)
.map(|s| Self { azync: s })
}
/// The associated match rule, if any.
pub fn match_rule(&self) -> Option<MatchRule<'_>> {
self.azync
.as_ref()
.expect("Inner stream is `None`")
.match_rule()
}
}
impl Iterator for MessageIterator {
type Item = Result<Arc<Message>>;
fn next(&mut self) -> Option<Self::Item> {
block_on(self.azync.as_mut().expect("Inner stream is `None`").next())
}
}
impl From<Connection> for MessageIterator {
fn from(conn: Connection) -> Self {
let azync = crate::MessageStream::from(conn.into_inner());
Self { azync: Some(azync) }
}
}
impl From<&Connection> for MessageIterator {
fn from(conn: &Connection) -> Self {
Self::from(conn.clone())
}
}
impl From<MessageIterator> for Connection {
fn from(mut iter: MessageIterator) -> Connection {
Connection::from(crate::Connection::from(
iter.azync.take().expect("Inner stream is `None`"),
))
}
}
impl From<&MessageIterator> for Connection {
fn from(iter: &MessageIterator) -> Connection {
Connection::from(crate::Connection::from(
iter.azync.as_ref().expect("Inner stream is `None`"),
))
}
}
impl std::ops::Drop for MessageIterator {
fn drop(&mut self) {
block_on(async {
if let Some(azync) = self.azync.take() {
crate::AsyncDrop::async_drop(azync).await;
}
});
}
}