calloop/sources/ping/eventfd.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
//! Eventfd based implementation of the ping event source.
//!
//! # Implementation notes
//!
//! The eventfd is a much lighter signalling mechanism provided by the Linux
//! kernel. Rather than write an arbitrary sequence of bytes, it only has a
//! 64-bit counter.
//!
//! To avoid closing the eventfd early, we wrap it in a RAII-style closer
//! `CloseOnDrop` in `make_ping()`. When all the senders are dropped, another
//! wrapper `FlagOnDrop` handles signalling this to the event source, which is
//! the sole owner of the eventfd itself. The senders have weak references to
//! the eventfd, and if the source is dropped before the senders, they will
//! simply not do anything (except log a message).
//!
//! To differentiate between regular ping events and close ping events, we add 2
//! to the counter for regular events and 1 for close events. In the source we
//! can then check the LSB and if it's set, we know it was a close event. This
//! only works if a close event never fires more than once.
use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
use std::sync::Arc;
use rustix::event::{eventfd, EventfdFlags};
use rustix::io::{read, write, Errno};
use super::PingError;
use crate::{
generic::Generic, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory,
};
// These are not bitfields! They are increments to add to the eventfd counter.
// Since the fd can only be closed once, we can effectively use the
// INCREMENT_CLOSE value as a bitmask when checking.
const INCREMENT_PING: u64 = 0x2;
const INCREMENT_CLOSE: u64 = 0x1;
#[inline]
pub fn make_ping() -> std::io::Result<(Ping, PingSource)> {
let read = eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK)?;
// We only have one fd for the eventfd. If the sending end closes it when
// all copies are dropped, the receiving end will be closed as well. We need
// to make sure the fd is not closed until all holders of it have dropped
// it.
let fd = Arc::new(read);
let ping = Ping {
event: Arc::new(FlagOnDrop(Arc::clone(&fd))),
};
let source = PingSource {
event: Generic::new(ArcAsFd(fd), Interest::READ, Mode::Level),
};
Ok((ping, source))
}
// Helper functions for the event source IO.
#[inline]
fn send_ping(fd: BorrowedFd<'_>, count: u64) -> std::io::Result<()> {
assert!(count > 0);
match write(fd, &count.to_ne_bytes()) {
// The write succeeded, the ping will wake up the loop.
Ok(_) => Ok(()),
// The counter hit its cap, which means previous calls to write() will
// wake up the loop.
Err(Errno::AGAIN) => Ok(()),
// Anything else is a real error.
Err(e) => Err(e.into()),
}
}
#[inline]
fn drain_ping(fd: BorrowedFd<'_>) -> std::io::Result<u64> {
// The eventfd counter is effectively a u64.
const NBYTES: usize = 8;
let mut buf = [0u8; NBYTES];
match read(fd, &mut buf) {
// Reading from an eventfd should only ever produce 8 bytes. No looping
// is required.
Ok(NBYTES) => Ok(u64::from_ne_bytes(buf)),
Ok(_) => unreachable!(),
// Any other error can be propagated.
Err(e) => Err(e.into()),
}
}
// Rust 1.64.0 adds an `AsFd` implementation for `Arc`, so this won't be needed
#[derive(Debug)]
struct ArcAsFd(Arc<OwnedFd>);
impl AsFd for ArcAsFd {
fn as_fd(&self) -> BorrowedFd {
self.0.as_fd()
}
}
// The event source is simply a generic source with one of the eventfds.
#[derive(Debug)]
pub struct PingSource {
event: Generic<ArcAsFd>,
}
impl EventSource for PingSource {
type Event = ();
type Metadata = ();
type Ret = ();
type Error = PingError;
fn process_events<C>(
&mut self,
readiness: Readiness,
token: Token,
mut callback: C,
) -> Result<PostAction, Self::Error>
where
C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
self.event
.process_events(readiness, token, |_, fd| {
let counter = drain_ping(fd.as_fd())?;
// If the LSB is set, it means we were closed. If anything else
// is also set, it means we were pinged. The two are not
// mutually exclusive.
let close = (counter & INCREMENT_CLOSE) != 0;
let ping = (counter & (u64::MAX - 1)) != 0;
if ping {
callback((), &mut ());
}
if close {
Ok(PostAction::Remove)
} else {
Ok(PostAction::Continue)
}
})
.map_err(|e| PingError(e.into()))
}
fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
self.event.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.event.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
self.event.unregister(poll)
}
}
#[derive(Clone, Debug)]
pub struct Ping {
// This is an Arc because it's potentially shared with clones. The last one
// dropped needs to signal to the event source via the eventfd.
event: Arc<FlagOnDrop>,
}
impl Ping {
/// Send a ping to the `PingSource`.
pub fn ping(&self) {
if let Err(e) = send_ping(self.event.0.as_fd(), INCREMENT_PING) {
log::warn!("[calloop] Failed to write a ping: {:?}", e);
}
}
}
/// This manages signalling to the PingSource when it's dropped. There should
/// only ever be one of these per PingSource.
#[derive(Debug)]
struct FlagOnDrop(Arc<OwnedFd>);
impl Drop for FlagOnDrop {
fn drop(&mut self) {
if let Err(e) = send_ping(self.0.as_fd(), INCREMENT_CLOSE) {
log::warn!("[calloop] Failed to send close ping: {:?}", e);
}
}
}