1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
//! Run commands and keep track of subscriptions.
use crate::subscription;
use crate::{BoxFuture, BoxStream, Executor, MaybeSend};
use futures::{channel::mpsc, Sink};
use std::marker::PhantomData;
/// A batteries-included runtime of commands and subscriptions.
///
/// If you have an [`Executor`], a [`Runtime`] can be leveraged to run any
/// `Command` or [`Subscription`] and get notified of the results!
///
/// [`Subscription`]: crate::Subscription
#[derive(Debug)]
pub struct Runtime<Executor, Sender, Message> {
executor: Executor,
sender: Sender,
subscriptions: subscription::Tracker,
_message: PhantomData<Message>,
}
impl<Executor, Sender, Message> Runtime<Executor, Sender, Message>
where
Executor: self::Executor,
Sender: Sink<Message, Error = mpsc::SendError>
+ Unpin
+ MaybeSend
+ Clone
+ 'static,
Message: MaybeSend + 'static,
{
/// Creates a new empty [`Runtime`].
///
/// You need to provide:
/// - an [`Executor`] to spawn futures
/// - a `Sender` implementing `Sink` to receive the results
pub fn new(executor: Executor, sender: Sender) -> Self {
Self {
executor,
sender,
subscriptions: subscription::Tracker::new(),
_message: PhantomData,
}
}
/// Runs the given closure inside the [`Executor`] of the [`Runtime`].
///
/// See [`Executor::enter`] to learn more.
pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
self.executor.enter(f)
}
/// Spawns a [`Future`] in the [`Runtime`].
///
/// The resulting `Message` will be forwarded to the `Sender` of the
/// [`Runtime`].
///
/// [`Future`]: BoxFuture
pub fn spawn(&mut self, future: BoxFuture<Message>) {
use futures::{FutureExt, SinkExt};
let mut sender = self.sender.clone();
let future = future.then(|message| async move {
let _ = sender.send(message).await;
});
self.executor.spawn(future);
}
/// Runs a [`Stream`] in the [`Runtime`] until completion.
///
/// The resulting `Message`s will be forwarded to the `Sender` of the
/// [`Runtime`].
///
/// [`Stream`]: BoxStream
pub fn run(&mut self, stream: BoxStream<Message>) {
use futures::{FutureExt, StreamExt};
let sender = self.sender.clone();
let future =
stream.map(Ok).forward(sender).map(|result| match result {
Ok(()) => (),
Err(error) => {
log::warn!(
"Stream could not run until completion: {error}"
);
}
});
self.executor.spawn(future);
}
/// Tracks a [`Subscription`] in the [`Runtime`].
///
/// It will spawn new streams or close old ones as necessary! See
/// [`Tracker::update`] to learn more about this!
///
/// [`Tracker::update`]: subscription::Tracker::update
/// [`Subscription`]: crate::Subscription
pub fn track(
&mut self,
recipes: impl IntoIterator<
Item = Box<dyn subscription::Recipe<Output = Message>>,
>,
) {
let Runtime {
executor,
subscriptions,
sender,
..
} = self;
let futures = executor.enter(|| {
subscriptions.update(recipes.into_iter(), sender.clone())
});
for future in futures {
executor.spawn(future);
}
}
/// Broadcasts an event to all the subscriptions currently alive in the
/// [`Runtime`].
///
/// See [`Tracker::broadcast`] to learn more.
///
/// [`Tracker::broadcast`]: subscription::Tracker::broadcast
pub fn broadcast(&mut self, event: subscription::Event) {
self.subscriptions.broadcast(event);
}
}