iced_winit/
proxy.rs

1use crate::futures::futures::{
2    channel::mpsc,
3    select,
4    task::{Context, Poll},
5    Future, Sink, StreamExt,
6};
7use crate::program::Event;
8use crate::runtime::Action;
9use std::pin::Pin;
10
11/// An event loop proxy with backpressure that implements `Sink`.
12#[derive(Debug)]
13pub struct Proxy<T: 'static> {
14    pub(crate) raw: winit::event_loop::EventLoopProxy,
15    sender: mpsc::Sender<Action<T>>,
16    event_sender: mpsc::UnboundedSender<Event<T>>,
17    notifier: mpsc::Sender<usize>,
18}
19
20impl<T: 'static> Clone for Proxy<T> {
21    fn clone(&self) -> Self {
22        Self {
23            raw: self.raw.clone(),
24            sender: self.sender.clone(),
25            notifier: self.notifier.clone(),
26            event_sender: self.event_sender.clone(),
27        }
28    }
29}
30
31impl<T: 'static> Proxy<T> {
32    const MAX_SIZE: usize = 100;
33
34    /// Creates a new [`Proxy`] from an `EventLoopProxy`.
35    pub fn new(
36        raw: winit::event_loop::EventLoopProxy,
37        event_sender: mpsc::UnboundedSender<Event<T>>,
38    ) -> (Self, impl Future<Output = ()>) {
39        let (notifier, mut processed) = mpsc::channel(Self::MAX_SIZE);
40        let (sender, mut receiver): (mpsc::Sender<Action<T>>, _) =
41            mpsc::channel(Self::MAX_SIZE);
42        let proxy = raw.clone();
43        let event_sender_clone = event_sender.clone();
44
45        let worker = async move {
46            let mut count = 0;
47
48            loop {
49                if count < Self::MAX_SIZE {
50                    select! {
51                        message = receiver.select_next_some() => {
52                            let _ = event_sender_clone.unbounded_send(Event::UserEvent(message));
53                            let _ = proxy.wake_up();
54                            count += 1;
55
56                        }
57                        amount = processed.select_next_some() => {
58                            count = count.saturating_sub(amount);
59                        }
60                        complete => break,
61                    }
62                } else {
63                    select! {
64                        amount = processed.select_next_some() => {
65                            count = count.saturating_sub(amount);
66                        }
67                        complete => break,
68                    }
69                }
70            }
71        };
72
73        (
74            Self {
75                raw,
76                sender,
77                notifier,
78                event_sender,
79            },
80            worker,
81        )
82    }
83
84    /// Sends a value to the event loop.
85    ///
86    /// Note: This skips the backpressure mechanism with an unbounded
87    /// channel. Use sparingly!
88    pub fn send(&mut self, value: T)
89    where
90        T: std::fmt::Debug,
91    {
92        self.send_action(Action::Output(value));
93    }
94
95    /// Sends an action to the event loop.
96    ///
97    /// Note: This skips the backpressure mechanism with an unbounded
98    /// channel. Use sparingly!
99    pub fn send_action(&mut self, action: Action<T>)
100    where
101        T: std::fmt::Debug,
102    {
103        self.event_sender
104            .unbounded_send(Event::UserEvent(action))
105            .expect("Send message to event loop");
106    }
107
108    /// Frees an amount of slots for additional messages to be queued in
109    /// this [`Proxy`].
110    pub fn free_slots(&mut self, amount: usize) {
111        let _ = self.notifier.start_send(amount);
112    }
113}
114
115impl<T: 'static> Sink<Action<T>> for Proxy<T> {
116    type Error = mpsc::SendError;
117
118    fn poll_ready(
119        mut self: Pin<&mut Self>,
120        cx: &mut Context<'_>,
121    ) -> Poll<Result<(), Self::Error>> {
122        self.sender.poll_ready(cx)
123    }
124
125    fn start_send(
126        mut self: Pin<&mut Self>,
127        action: Action<T>,
128    ) -> Result<(), Self::Error> {
129        self.sender.start_send(action)
130    }
131
132    fn poll_flush(
133        mut self: Pin<&mut Self>,
134        cx: &mut Context<'_>,
135    ) -> Poll<Result<(), Self::Error>> {
136        match self.sender.poll_ready(cx) {
137            Poll::Ready(Err(ref e)) if e.is_disconnected() => {
138                // If the receiver disconnected, we consider the sink to be flushed.
139                Poll::Ready(Ok(()))
140            }
141            x => x,
142        }
143    }
144
145    fn poll_close(
146        mut self: Pin<&mut Self>,
147        _cx: &mut Context<'_>,
148    ) -> Poll<Result<(), Self::Error>> {
149        self.sender.disconnect();
150        Poll::Ready(Ok(()))
151    }
152}