use crate::futures::futures::{
channel::mpsc,
select,
task::{Context, Poll},
Future, Sink, StreamExt,
};
use crate::program::Event;
use crate::runtime::Action;
use std::pin::Pin;
#[derive(Debug)]
pub struct Proxy<T: 'static> {
pub(crate) raw: winit::event_loop::EventLoopProxy,
sender: mpsc::Sender<Action<T>>,
event_sender: mpsc::UnboundedSender<Event<T>>,
notifier: mpsc::Sender<usize>,
}
impl<T: 'static> Clone for Proxy<T> {
fn clone(&self) -> Self {
Self {
raw: self.raw.clone(),
sender: self.sender.clone(),
notifier: self.notifier.clone(),
event_sender: self.event_sender.clone(),
}
}
}
impl<T: 'static> Proxy<T> {
const MAX_SIZE: usize = 100;
pub fn new(
raw: winit::event_loop::EventLoopProxy,
event_sender: mpsc::UnboundedSender<Event<T>>,
) -> (Self, impl Future<Output = ()>) {
let (notifier, mut processed) = mpsc::channel(Self::MAX_SIZE);
let (sender, mut receiver): (mpsc::Sender<Action<T>>, _) =
mpsc::channel(Self::MAX_SIZE);
let proxy = raw.clone();
let event_sender_clone = event_sender.clone();
let worker = async move {
let mut count = 0;
loop {
if count < Self::MAX_SIZE {
select! {
message = receiver.select_next_some() => {
let _ = event_sender_clone.unbounded_send(Event::UserEvent(message));
let _ = proxy.wake_up();
count += 1;
}
amount = processed.select_next_some() => {
count = count.saturating_sub(amount);
}
complete => break,
}
} else {
select! {
amount = processed.select_next_some() => {
count = count.saturating_sub(amount);
}
complete => break,
}
}
}
};
(
Self {
raw,
sender,
notifier,
event_sender,
},
worker,
)
}
pub fn send(&mut self, value: T)
where
T: std::fmt::Debug,
{
self.send_action(Action::Output(value));
}
pub fn send_action(&mut self, action: Action<T>)
where
T: std::fmt::Debug,
{
self.event_sender
.unbounded_send(Event::UserEvent(action))
.expect("Send message to event loop");
}
pub fn free_slots(&mut self, amount: usize) {
let _ = self.notifier.start_send(amount);
}
}
impl<T: 'static> Sink<Action<T>> for Proxy<T> {
type Error = mpsc::SendError;
fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.sender.poll_ready(cx)
}
fn start_send(
mut self: Pin<&mut Self>,
action: Action<T>,
) -> Result<(), Self::Error> {
self.sender.start_send(action)
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
match self.sender.poll_ready(cx) {
Poll::Ready(Err(ref e)) if e.is_disconnected() => {
Poll::Ready(Ok(()))
}
x => x,
}
}
fn poll_close(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.sender.disconnect();
Poll::Ready(Ok(()))
}
}