event_listener/
lib.rs

1//! Notify async tasks or threads.
2//!
3//! This is a synchronization primitive similar to [eventcounts] invented by Dmitry Vyukov.
4//!
5//! You can use this crate to turn non-blocking data structures into async or blocking data
6//! structures. See a [simple mutex] implementation that exposes an async and a blocking interface
7//! for acquiring locks.
8//!
9//! [eventcounts]: http://www.1024cores.net/home/lock-free-algorithms/eventcounts
10//! [simple mutex]: https://github.com/smol-rs/event-listener/blob/master/examples/mutex.rs
11//!
12//! # Examples
13//!
14//! Wait until another thread sets a boolean flag:
15//!
16//! ```
17//! use std::sync::atomic::{AtomicBool, Ordering};
18//! use std::sync::Arc;
19//! use std::thread;
20//! use std::time::Duration;
21//! use std::usize;
22//! use event_listener::Event;
23//!
24//! let flag = Arc::new(AtomicBool::new(false));
25//! let event = Arc::new(Event::new());
26//!
27//! // Spawn a thread that will set the flag after 1 second.
28//! thread::spawn({
29//!     let flag = flag.clone();
30//!     let event = event.clone();
31//!     move || {
32//!         // Wait for a second.
33//!         thread::sleep(Duration::from_secs(1));
34//!
35//!         // Set the flag.
36//!         flag.store(true, Ordering::SeqCst);
37//!
38//!         // Notify all listeners that the flag has been set.
39//!         event.notify(usize::MAX);
40//!     }
41//! });
42//!
43//! // Wait until the flag is set.
44//! loop {
45//!     // Check the flag.
46//!     if flag.load(Ordering::SeqCst) {
47//!         break;
48//!     }
49//!
50//!     // Start listening for events.
51//!     let listener = event.listen();
52//!
53//!     // Check the flag again after creating the listener.
54//!     if flag.load(Ordering::SeqCst) {
55//!         break;
56//!     }
57//!
58//!     // Wait for a notification and continue the loop.
59//!     listener.wait();
60//! }
61//! ```
62
63#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
64
65use std::cell::{Cell, UnsafeCell};
66use std::fmt;
67use std::future::Future;
68use std::mem::{self, ManuallyDrop};
69use std::ops::{Deref, DerefMut};
70use std::panic::{RefUnwindSafe, UnwindSafe};
71use std::pin::Pin;
72use std::ptr::{self, NonNull};
73use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
74use std::sync::{Arc, Mutex, MutexGuard};
75use std::task::{Context, Poll, Waker};
76use std::thread::{self, Thread};
77use std::time::{Duration, Instant};
78use std::usize;
79
80/// Inner state of [`Event`].
81struct Inner {
82    /// The number of notified entries, or `usize::MAX` if all of them have been notified.
83    ///
84    /// If there are no entries, this value is set to `usize::MAX`.
85    notified: AtomicUsize,
86
87    /// A linked list holding registered listeners.
88    list: Mutex<List>,
89
90    /// A single cached list entry to avoid allocations on the fast path of the insertion.
91    cache: UnsafeCell<Entry>,
92}
93
94impl Inner {
95    /// Locks the list.
96    fn lock(&self) -> ListGuard<'_> {
97        ListGuard {
98            inner: self,
99            guard: self.list.lock().unwrap(),
100        }
101    }
102
103    /// Returns the pointer to the single cached list entry.
104    #[inline(always)]
105    fn cache_ptr(&self) -> NonNull<Entry> {
106        unsafe { NonNull::new_unchecked(self.cache.get()) }
107    }
108}
109
110/// A synchronization primitive for notifying async tasks and threads.
111///
112/// Listeners can be registered using [`Event::listen()`]. There are two ways to notify listeners:
113///
114/// 1. [`Event::notify()`] notifies a number of listeners.
115/// 2. [`Event::notify_additional()`] notifies a number of previously unnotified listeners.
116///
117/// If there are no active listeners at the time a notification is sent, it simply gets lost.
118///
119/// There are two ways for a listener to wait for a notification:
120///
121/// 1. In an asynchronous manner using `.await`.
122/// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
123///
124/// If a notified listener is dropped without receiving a notification, dropping will notify
125/// another active listener. Whether one *additional* listener will be notified depends on what
126/// kind of notification was delivered.
127///
128/// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness.
129pub struct Event {
130    /// A pointer to heap-allocated inner state.
131    ///
132    /// This pointer is initially null and gets lazily initialized on first use. Semantically, it
133    /// is an `Arc<Inner>` so it's important to keep in mind that it contributes to the [`Arc`]'s
134    /// reference count.
135    inner: AtomicPtr<Inner>,
136}
137
138unsafe impl Send for Event {}
139unsafe impl Sync for Event {}
140
141impl UnwindSafe for Event {}
142impl RefUnwindSafe for Event {}
143
144impl Event {
145    /// Creates a new [`Event`].
146    ///
147    /// # Examples
148    ///
149    /// ```
150    /// use event_listener::Event;
151    ///
152    /// let event = Event::new();
153    /// ```
154    #[inline]
155    pub const fn new() -> Event {
156        Event {
157            inner: AtomicPtr::new(ptr::null_mut()),
158        }
159    }
160
161    /// Returns a guard listening for a notification.
162    ///
163    /// This method emits a `SeqCst` fence after registering a listener.
164    ///
165    /// # Examples
166    ///
167    /// ```
168    /// use event_listener::Event;
169    ///
170    /// let event = Event::new();
171    /// let listener = event.listen();
172    /// ```
173    #[cold]
174    pub fn listen(&self) -> EventListener {
175        let inner = self.inner();
176        let listener = EventListener {
177            inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) },
178            entry: unsafe { Some((*inner).lock().insert((*inner).cache_ptr())) },
179        };
180
181        // Make sure the listener is registered before whatever happens next.
182        full_fence();
183        listener
184    }
185
186    /// Notifies a number of active listeners.
187    ///
188    /// The number is allowed to be zero or exceed the current number of listeners.
189    ///
190    /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n`
191    /// listeners among the active ones are notified.
192    ///
193    /// This method emits a `SeqCst` fence before notifying listeners.
194    ///
195    /// # Examples
196    ///
197    /// ```
198    /// use event_listener::Event;
199    ///
200    /// let event = Event::new();
201    ///
202    /// // This notification gets lost because there are no listeners.
203    /// event.notify(1);
204    ///
205    /// let listener1 = event.listen();
206    /// let listener2 = event.listen();
207    /// let listener3 = event.listen();
208    ///
209    /// // Notifies two listeners.
210    /// //
211    /// // Listener queueing is fair, which means `listener1` and `listener2`
212    /// // get notified here since they start listening before `listener3`.
213    /// event.notify(2);
214    /// ```
215    #[inline]
216    pub fn notify(&self, n: usize) {
217        // Make sure the notification comes after whatever triggered it.
218        full_fence();
219
220        if let Some(inner) = self.try_inner() {
221            // Notify if there is at least one unnotified listener and the number of notified
222            // listeners is less than `n`.
223            if inner.notified.load(Ordering::Acquire) < n {
224                inner.lock().notify(n);
225            }
226        }
227    }
228
229    /// Notifies a number of active listeners without emitting a `SeqCst` fence.
230    ///
231    /// The number is allowed to be zero or exceed the current number of listeners.
232    ///
233    /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n`
234    /// listeners among the active ones are notified.
235    ///
236    /// Unlike [`Event::notify()`], this method does not emit a `SeqCst` fence.
237    ///
238    /// # Examples
239    ///
240    /// ```
241    /// use event_listener::Event;
242    /// use std::sync::atomic::{self, Ordering};
243    ///
244    /// let event = Event::new();
245    ///
246    /// // This notification gets lost because there are no listeners.
247    /// event.notify(1);
248    ///
249    /// let listener1 = event.listen();
250    /// let listener2 = event.listen();
251    /// let listener3 = event.listen();
252    ///
253    /// // We should emit a fence manually when using relaxed notifications.
254    /// atomic::fence(Ordering::SeqCst);
255    ///
256    /// // Notifies two listeners.
257    /// //
258    /// // Listener queueing is fair, which means `listener1` and `listener2`
259    /// // get notified here since they start listening before `listener3`.
260    /// event.notify(2);
261    /// ```
262    #[inline]
263    pub fn notify_relaxed(&self, n: usize) {
264        if let Some(inner) = self.try_inner() {
265            // Notify if there is at least one unnotified listener and the number of notified
266            // listeners is less than `n`.
267            if inner.notified.load(Ordering::Acquire) < n {
268                inner.lock().notify(n);
269            }
270        }
271    }
272
273    /// Notifies a number of active and still unnotified listeners.
274    ///
275    /// The number is allowed to be zero or exceed the current number of listeners.
276    ///
277    /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that
278    /// were previously unnotified.
279    ///
280    /// This method emits a `SeqCst` fence before notifying listeners.
281    ///
282    /// # Examples
283    ///
284    /// ```
285    /// use event_listener::Event;
286    ///
287    /// let event = Event::new();
288    ///
289    /// // This notification gets lost because there are no listeners.
290    /// event.notify(1);
291    ///
292    /// let listener1 = event.listen();
293    /// let listener2 = event.listen();
294    /// let listener3 = event.listen();
295    ///
296    /// // Notifies two listeners.
297    /// //
298    /// // Listener queueing is fair, which means `listener1` and `listener2`
299    /// // get notified here since they start listening before `listener3`.
300    /// event.notify_additional(1);
301    /// event.notify_additional(1);
302    /// ```
303    #[inline]
304    pub fn notify_additional(&self, n: usize) {
305        // Make sure the notification comes after whatever triggered it.
306        full_fence();
307
308        if let Some(inner) = self.try_inner() {
309            // Notify if there is at least one unnotified listener.
310            if inner.notified.load(Ordering::Acquire) < usize::MAX {
311                inner.lock().notify_additional(n);
312            }
313        }
314    }
315
316    /// Notifies a number of active and still unnotified listeners without emitting a `SeqCst`
317    /// fence.
318    ///
319    /// The number is allowed to be zero or exceed the current number of listeners.
320    ///
321    /// In contrast to [`Event::notify()`], this method will notify `n` *additional* listeners that
322    /// were previously unnotified.
323    ///
324    /// Unlike [`Event::notify_additional()`], this method does not emit a `SeqCst` fence.
325    ///
326    /// # Examples
327    ///
328    /// ```
329    /// use event_listener::Event;
330    /// use std::sync::atomic::{self, Ordering};
331    ///
332    /// let event = Event::new();
333    ///
334    /// // This notification gets lost because there are no listeners.
335    /// event.notify(1);
336    ///
337    /// let listener1 = event.listen();
338    /// let listener2 = event.listen();
339    /// let listener3 = event.listen();
340    ///
341    /// // We should emit a fence manually when using relaxed notifications.
342    /// atomic::fence(Ordering::SeqCst);
343    ///
344    /// // Notifies two listeners.
345    /// //
346    /// // Listener queueing is fair, which means `listener1` and `listener2`
347    /// // get notified here since they start listening before `listener3`.
348    /// event.notify_additional_relaxed(1);
349    /// event.notify_additional_relaxed(1);
350    /// ```
351    #[inline]
352    pub fn notify_additional_relaxed(&self, n: usize) {
353        if let Some(inner) = self.try_inner() {
354            // Notify if there is at least one unnotified listener.
355            if inner.notified.load(Ordering::Acquire) < usize::MAX {
356                inner.lock().notify_additional(n);
357            }
358        }
359    }
360
361    /// Returns a reference to the inner state if it was initialized.
362    #[inline]
363    fn try_inner(&self) -> Option<&Inner> {
364        let inner = self.inner.load(Ordering::Acquire);
365        unsafe { inner.as_ref() }
366    }
367
368    /// Returns a raw pointer to the inner state, initializing it if necessary.
369    ///
370    /// This returns a raw pointer instead of reference because `from_raw`
371    /// requires raw/mut provenance: <https://github.com/rust-lang/rust/pull/67339>
372    fn inner(&self) -> *const Inner {
373        let mut inner = self.inner.load(Ordering::Acquire);
374
375        // Initialize the state if this is its first use.
376        if inner.is_null() {
377            // Allocate on the heap.
378            let new = Arc::new(Inner {
379                notified: AtomicUsize::new(usize::MAX),
380                list: std::sync::Mutex::new(List {
381                    head: None,
382                    tail: None,
383                    start: None,
384                    len: 0,
385                    notified: 0,
386                    cache_used: false,
387                }),
388                cache: UnsafeCell::new(Entry {
389                    state: Cell::new(State::Created),
390                    prev: Cell::new(None),
391                    next: Cell::new(None),
392                }),
393            });
394            // Convert the heap-allocated state into a raw pointer.
395            let new = Arc::into_raw(new) as *mut Inner;
396
397            // Attempt to replace the null-pointer with the new state pointer.
398            inner = self
399                .inner
400                .compare_exchange(inner, new, Ordering::AcqRel, Ordering::Acquire)
401                .unwrap_or_else(|x| x);
402
403            // Check if the old pointer value was indeed null.
404            if inner.is_null() {
405                // If yes, then use the new state pointer.
406                inner = new;
407            } else {
408                // If not, that means a concurrent operation has initialized the state.
409                // In that case, use the old pointer and deallocate the new one.
410                unsafe {
411                    drop(Arc::from_raw(new));
412                }
413            }
414        }
415
416        inner
417    }
418}
419
420impl Drop for Event {
421    #[inline]
422    fn drop(&mut self) {
423        let inner: *mut Inner = *self.inner.get_mut();
424
425        // If the state pointer has been initialized, deallocate it.
426        if !inner.is_null() {
427            unsafe {
428                drop(Arc::from_raw(inner));
429            }
430        }
431    }
432}
433
434impl fmt::Debug for Event {
435    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
436        f.pad("Event { .. }")
437    }
438}
439
440impl Default for Event {
441    fn default() -> Event {
442        Event::new()
443    }
444}
445
446/// A guard waiting for a notification from an [`Event`].
447///
448/// There are two ways for a listener to wait for a notification:
449///
450/// 1. In an asynchronous manner using `.await`.
451/// 2. In a blocking manner by calling [`EventListener::wait()`] on it.
452///
453/// If a notified listener is dropped without receiving a notification, dropping will notify
454/// another active listener. Whether one *additional* listener will be notified depends on what
455/// kind of notification was delivered.
456pub struct EventListener {
457    /// A reference to [`Event`]'s inner state.
458    inner: Arc<Inner>,
459
460    /// A pointer to this listener's entry in the linked list.
461    entry: Option<NonNull<Entry>>,
462}
463
464unsafe impl Send for EventListener {}
465unsafe impl Sync for EventListener {}
466
467impl UnwindSafe for EventListener {}
468impl RefUnwindSafe for EventListener {}
469
470impl EventListener {
471    /// Blocks until a notification is received.
472    ///
473    /// # Examples
474    ///
475    /// ```
476    /// use event_listener::Event;
477    ///
478    /// let event = Event::new();
479    /// let listener = event.listen();
480    ///
481    /// // Notify `listener`.
482    /// event.notify(1);
483    ///
484    /// // Receive the notification.
485    /// listener.wait();
486    /// ```
487    pub fn wait(self) {
488        self.wait_internal(None);
489    }
490
491    /// Blocks until a notification is received or a timeout is reached.
492    ///
493    /// Returns `true` if a notification was received.
494    ///
495    /// # Examples
496    ///
497    /// ```
498    /// use std::time::Duration;
499    /// use event_listener::Event;
500    ///
501    /// let event = Event::new();
502    /// let listener = event.listen();
503    ///
504    /// // There are no notification so this times out.
505    /// assert!(!listener.wait_timeout(Duration::from_secs(1)));
506    /// ```
507    pub fn wait_timeout(self, timeout: Duration) -> bool {
508        self.wait_internal(Some(Instant::now() + timeout))
509    }
510
511    /// Blocks until a notification is received or a deadline is reached.
512    ///
513    /// Returns `true` if a notification was received.
514    ///
515    /// # Examples
516    ///
517    /// ```
518    /// use std::time::{Duration, Instant};
519    /// use event_listener::Event;
520    ///
521    /// let event = Event::new();
522    /// let listener = event.listen();
523    ///
524    /// // There are no notification so this times out.
525    /// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1)));
526    /// ```
527    pub fn wait_deadline(self, deadline: Instant) -> bool {
528        self.wait_internal(Some(deadline))
529    }
530
531    /// Drops this listener and discards its notification (if any) without notifying another
532    /// active listener.
533    ///
534    /// Returns `true` if a notification was discarded.
535    ///
536    /// # Examples
537    /// ```
538    /// use event_listener::Event;
539    ///
540    /// let event = Event::new();
541    /// let listener1 = event.listen();
542    /// let listener2 = event.listen();
543    ///
544    /// event.notify(1);
545    ///
546    /// assert!(listener1.discard());
547    /// assert!(!listener2.discard());
548    /// ```
549    pub fn discard(mut self) -> bool {
550        // If this listener has never picked up a notification...
551        if let Some(entry) = self.entry.take() {
552            let mut list = self.inner.lock();
553            // Remove the listener from the list and return `true` if it was notified.
554            if let State::Notified(_) = list.remove(entry, self.inner.cache_ptr()) {
555                return true;
556            }
557        }
558        false
559    }
560
561    /// Returns `true` if this listener listens to the given `Event`.
562    ///
563    /// # Examples
564    ///
565    /// ```
566    /// use event_listener::Event;
567    ///
568    /// let event = Event::new();
569    /// let listener = event.listen();
570    ///
571    /// assert!(listener.listens_to(&event));
572    /// ```
573    #[inline]
574    pub fn listens_to(&self, event: &Event) -> bool {
575        ptr::eq::<Inner>(&*self.inner, event.inner.load(Ordering::Acquire))
576    }
577
578    /// Returns `true` if both listeners listen to the same `Event`.
579    ///
580    /// # Examples
581    ///
582    /// ```
583    /// use event_listener::Event;
584    ///
585    /// let event = Event::new();
586    /// let listener1 = event.listen();
587    /// let listener2 = event.listen();
588    ///
589    /// assert!(listener1.same_event(&listener2));
590    /// ```
591    pub fn same_event(&self, other: &EventListener) -> bool {
592        ptr::eq::<Inner>(&*self.inner, &*other.inner)
593    }
594
595    fn wait_internal(mut self, deadline: Option<Instant>) -> bool {
596        // Take out the entry pointer and set it to `None`.
597        let entry = match self.entry.take() {
598            None => unreachable!("cannot wait twice on an `EventListener`"),
599            Some(entry) => entry,
600        };
601
602        // Set this listener's state to `Waiting`.
603        {
604            let mut list = self.inner.lock();
605            let e = unsafe { entry.as_ref() };
606
607            // Do a dummy replace operation in order to take out the state.
608            match e.state.replace(State::Notified(false)) {
609                State::Notified(_) => {
610                    // If this listener has been notified, remove it from the list and return.
611                    list.remove(entry, self.inner.cache_ptr());
612                    return true;
613                }
614                // Otherwise, set the state to `Waiting`.
615                _ => e.state.set(State::Waiting(thread::current())),
616            }
617        }
618
619        // Wait until a notification is received or the timeout is reached.
620        loop {
621            match deadline {
622                None => thread::park(),
623
624                Some(deadline) => {
625                    // Check for timeout.
626                    let now = Instant::now();
627                    if now >= deadline {
628                        // Remove the entry and check if notified.
629                        return self
630                            .inner
631                            .lock()
632                            .remove(entry, self.inner.cache_ptr())
633                            .is_notified();
634                    }
635
636                    // Park until the deadline.
637                    thread::park_timeout(deadline - now);
638                }
639            }
640
641            let mut list = self.inner.lock();
642            let e = unsafe { entry.as_ref() };
643
644            // Do a dummy replace operation in order to take out the state.
645            match e.state.replace(State::Notified(false)) {
646                State::Notified(_) => {
647                    // If this listener has been notified, remove it from the list and return.
648                    list.remove(entry, self.inner.cache_ptr());
649                    return true;
650                }
651                // Otherwise, set the state back to `Waiting`.
652                state => e.state.set(state),
653            }
654        }
655    }
656}
657
658impl fmt::Debug for EventListener {
659    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
660        f.pad("EventListener { .. }")
661    }
662}
663
664impl Future for EventListener {
665    type Output = ();
666
667    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
668        let mut list = self.inner.lock();
669
670        let entry = match self.entry {
671            None => unreachable!("cannot poll a completed `EventListener` future"),
672            Some(entry) => entry,
673        };
674        let state = unsafe { &entry.as_ref().state };
675
676        // Do a dummy replace operation in order to take out the state.
677        match state.replace(State::Notified(false)) {
678            State::Notified(_) => {
679                // If this listener has been notified, remove it from the list and return.
680                list.remove(entry, self.inner.cache_ptr());
681                drop(list);
682                self.entry = None;
683                return Poll::Ready(());
684            }
685            State::Created => {
686                // If the listener was just created, put it in the `Polling` state.
687                state.set(State::Polling(cx.waker().clone()));
688            }
689            State::Polling(w) => {
690                // If the listener was in the `Polling` state, update the waker.
691                if w.will_wake(cx.waker()) {
692                    state.set(State::Polling(w));
693                } else {
694                    state.set(State::Polling(cx.waker().clone()));
695                }
696            }
697            State::Waiting(_) => {
698                unreachable!("cannot poll and wait on `EventListener` at the same time")
699            }
700        }
701
702        Poll::Pending
703    }
704}
705
706impl Drop for EventListener {
707    fn drop(&mut self) {
708        // If this listener has never picked up a notification...
709        if let Some(entry) = self.entry.take() {
710            let mut list = self.inner.lock();
711
712            // But if a notification was delivered to it...
713            if let State::Notified(additional) = list.remove(entry, self.inner.cache_ptr()) {
714                // Then pass it on to another active listener.
715                if additional {
716                    list.notify_additional(1);
717                } else {
718                    list.notify(1);
719                }
720            }
721        }
722    }
723}
724
725/// A guard holding the linked list locked.
726struct ListGuard<'a> {
727    /// A reference to [`Event`]'s inner state.
728    inner: &'a Inner,
729
730    /// The actual guard that acquired the linked list.
731    guard: MutexGuard<'a, List>,
732}
733
734impl Drop for ListGuard<'_> {
735    #[inline]
736    fn drop(&mut self) {
737        let list = &mut **self;
738
739        // Update the atomic `notified` counter.
740        let notified = if list.notified < list.len {
741            list.notified
742        } else {
743            usize::MAX
744        };
745        self.inner.notified.store(notified, Ordering::Release);
746    }
747}
748
749impl Deref for ListGuard<'_> {
750    type Target = List;
751
752    #[inline]
753    fn deref(&self) -> &List {
754        &*self.guard
755    }
756}
757
758impl DerefMut for ListGuard<'_> {
759    #[inline]
760    fn deref_mut(&mut self) -> &mut List {
761        &mut *self.guard
762    }
763}
764
765/// The state of a listener.
766enum State {
767    /// It has just been created.
768    Created,
769
770    /// It has received a notification.
771    ///
772    /// The `bool` is `true` if this was an "additional" notification.
773    Notified(bool),
774
775    /// An async task is polling it.
776    Polling(Waker),
777
778    /// A thread is blocked on it.
779    Waiting(Thread),
780}
781
782impl State {
783    /// Returns `true` if this is the `Notified` state.
784    #[inline]
785    fn is_notified(&self) -> bool {
786        match self {
787            State::Notified(_) => true,
788            State::Created | State::Polling(_) | State::Waiting(_) => false,
789        }
790    }
791}
792
793/// An entry representing a registered listener.
794struct Entry {
795    /// THe state of this listener.
796    state: Cell<State>,
797
798    /// Previous entry in the linked list.
799    prev: Cell<Option<NonNull<Entry>>>,
800
801    /// Next entry in the linked list.
802    next: Cell<Option<NonNull<Entry>>>,
803}
804
805/// A linked list of entries.
806struct List {
807    /// First entry in the list.
808    head: Option<NonNull<Entry>>,
809
810    /// Last entry in the list.
811    tail: Option<NonNull<Entry>>,
812
813    /// The first unnotified entry in the list.
814    start: Option<NonNull<Entry>>,
815
816    /// Total number of entries in the list.
817    len: usize,
818
819    /// The number of notified entries in the list.
820    notified: usize,
821
822    /// Whether the cached entry is used.
823    cache_used: bool,
824}
825
826impl List {
827    /// Inserts a new entry into the list.
828    fn insert(&mut self, cache: NonNull<Entry>) -> NonNull<Entry> {
829        unsafe {
830            let entry = Entry {
831                state: Cell::new(State::Created),
832                prev: Cell::new(self.tail),
833                next: Cell::new(None),
834            };
835
836            let entry = if self.cache_used {
837                // Allocate an entry that is going to become the new tail.
838                NonNull::new_unchecked(Box::into_raw(Box::new(entry)))
839            } else {
840                // No need to allocate - we can use the cached entry.
841                self.cache_used = true;
842                cache.as_ptr().write(entry);
843                cache
844            };
845
846            // Replace the tail with the new entry.
847            match mem::replace(&mut self.tail, Some(entry)) {
848                None => self.head = Some(entry),
849                Some(t) => t.as_ref().next.set(Some(entry)),
850            }
851
852            // If there were no unnotified entries, this one is the first now.
853            if self.start.is_none() {
854                self.start = self.tail;
855            }
856
857            // Bump the entry count.
858            self.len += 1;
859
860            entry
861        }
862    }
863
864    /// Removes an entry from the list and returns its state.
865    fn remove(&mut self, entry: NonNull<Entry>, cache: NonNull<Entry>) -> State {
866        unsafe {
867            let prev = entry.as_ref().prev.get();
868            let next = entry.as_ref().next.get();
869
870            // Unlink from the previous entry.
871            match prev {
872                None => self.head = next,
873                Some(p) => p.as_ref().next.set(next),
874            }
875
876            // Unlink from the next entry.
877            match next {
878                None => self.tail = prev,
879                Some(n) => n.as_ref().prev.set(prev),
880            }
881
882            // If this was the first unnotified entry, move the pointer to the next one.
883            if self.start == Some(entry) {
884                self.start = next;
885            }
886
887            // Extract the state.
888            let state = if ptr::eq(entry.as_ptr(), cache.as_ptr()) {
889                // Free the cached entry.
890                self.cache_used = false;
891                entry.as_ref().state.replace(State::Created)
892            } else {
893                // Deallocate the entry.
894                Box::from_raw(entry.as_ptr()).state.into_inner()
895            };
896
897            // Update the counters.
898            if state.is_notified() {
899                self.notified -= 1;
900            }
901            self.len -= 1;
902
903            state
904        }
905    }
906
907    /// Notifies a number of entries.
908    #[cold]
909    fn notify(&mut self, mut n: usize) {
910        if n <= self.notified {
911            return;
912        }
913        n -= self.notified;
914
915        while n > 0 {
916            n -= 1;
917
918            // Notify the first unnotified entry.
919            match self.start {
920                None => break,
921                Some(e) => {
922                    // Get the entry and move the pointer forward.
923                    let e = unsafe { e.as_ref() };
924                    self.start = e.next.get();
925
926                    // Set the state of this entry to `Notified` and notify.
927                    match e.state.replace(State::Notified(false)) {
928                        State::Notified(_) => {}
929                        State::Created => {}
930                        State::Polling(w) => w.wake(),
931                        State::Waiting(t) => t.unpark(),
932                    }
933
934                    // Update the counter.
935                    self.notified += 1;
936                }
937            }
938        }
939    }
940
941    /// Notifies a number of additional entries.
942    #[cold]
943    fn notify_additional(&mut self, mut n: usize) {
944        while n > 0 {
945            n -= 1;
946
947            // Notify the first unnotified entry.
948            match self.start {
949                None => break,
950                Some(e) => {
951                    // Get the entry and move the pointer forward.
952                    let e = unsafe { e.as_ref() };
953                    self.start = e.next.get();
954
955                    // Set the state of this entry to `Notified` and notify.
956                    match e.state.replace(State::Notified(true)) {
957                        State::Notified(_) => {}
958                        State::Created => {}
959                        State::Polling(w) => w.wake(),
960                        State::Waiting(t) => t.unpark(),
961                    }
962
963                    // Update the counter.
964                    self.notified += 1;
965                }
966            }
967        }
968    }
969}
970
971/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
972#[inline]
973fn full_fence() {
974    if cfg!(all(
975        any(target_arch = "x86", target_arch = "x86_64"),
976        not(miri)
977    )) {
978        // HACK(stjepang): On x86 architectures there are two different ways of executing
979        // a `SeqCst` fence.
980        //
981        // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
982        // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg` instruction.
983        //
984        // Both instructions have the effect of a full barrier, but empirical benchmarks have shown
985        // that the second one is sometimes a bit faster.
986        //
987        // The ideal solution here would be to use inline assembly, but we're instead creating a
988        // temporary atomic variable and compare-and-exchanging its value. No sane compiler to
989        // x86 platforms is going to optimize this away.
990        atomic::compiler_fence(Ordering::SeqCst);
991        let a = AtomicUsize::new(0);
992        let _ = a.compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst);
993        atomic::compiler_fence(Ordering::SeqCst);
994    } else {
995        atomic::fence(Ordering::SeqCst);
996    }
997}