polling/
epoll.rs

1//! Bindings to epoll (Linux, Android).
2
3use std::io;
4use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
5use std::time::Duration;
6
7#[cfg(not(target_os = "redox"))]
8use rustix::event::{eventfd, EventfdFlags};
9#[cfg(not(target_os = "redox"))]
10use rustix::time::{
11    timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags,
12};
13
14use rustix::buffer::spare_capacity;
15use rustix::event::{epoll, Timespec};
16use rustix::fd::OwnedFd;
17use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
18use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
19use rustix::pipe::{pipe, pipe_with, PipeFlags};
20
21use crate::{Event, PollMode};
22
23/// Interface to epoll.
24#[derive(Debug)]
25pub struct Poller {
26    /// File descriptor for the epoll instance.
27    epoll_fd: OwnedFd,
28
29    /// Notifier used to wake up epoll.
30    notifier: Notifier,
31
32    /// File descriptor for the timerfd that produces timeouts.
33    ///
34    /// Redox does not support timerfd.
35    #[cfg(not(target_os = "redox"))]
36    timer_fd: Option<OwnedFd>,
37}
38
39impl Poller {
40    /// Creates a new poller.
41    pub fn new() -> io::Result<Poller> {
42        // Create an epoll instance.
43        //
44        // Use `epoll_create1` with `EPOLL_CLOEXEC`.
45        let epoll_fd = epoll::create(epoll::CreateFlags::CLOEXEC)?;
46
47        // Set up notifier and timerfd.
48        let notifier = Notifier::new()?;
49        #[cfg(not(target_os = "redox"))]
50        let timer_fd = timerfd_create(
51            TimerfdClockId::Monotonic,
52            TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK,
53        )
54        .ok();
55
56        let poller = Poller {
57            epoll_fd,
58            notifier,
59            #[cfg(not(target_os = "redox"))]
60            timer_fd,
61        };
62
63        unsafe {
64            #[cfg(not(target_os = "redox"))]
65            if let Some(ref timer_fd) = poller.timer_fd {
66                poller.add(
67                    timer_fd.as_raw_fd(),
68                    Event::none(crate::NOTIFY_KEY),
69                    PollMode::Oneshot,
70                )?;
71            }
72
73            poller.add(
74                poller.notifier.as_fd().as_raw_fd(),
75                Event::readable(crate::NOTIFY_KEY),
76                PollMode::Oneshot,
77            )?;
78        }
79
80        tracing::trace!(
81            epoll_fd = ?poller.epoll_fd.as_raw_fd(),
82            notifier = ?poller.notifier,
83            "new",
84        );
85        Ok(poller)
86    }
87
88    /// Whether this poller supports level-triggered events.
89    pub fn supports_level(&self) -> bool {
90        true
91    }
92
93    /// Whether the poller supports edge-triggered events.
94    pub fn supports_edge(&self) -> bool {
95        true
96    }
97
98    /// Adds a new file descriptor.
99    ///
100    /// # Safety
101    ///
102    /// The `fd` must be a valid file descriptor. The usual condition of remaining registered in
103    /// the `Poller` doesn't apply to `epoll`.
104    pub unsafe fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
105        let span = tracing::trace_span!(
106            "add",
107            epoll_fd = ?self.epoll_fd.as_raw_fd(),
108            ?fd,
109            ?ev,
110        );
111        let _enter = span.enter();
112
113        epoll::add(
114            &self.epoll_fd,
115            unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) },
116            epoll::EventData::new_u64(ev.key as u64),
117            epoll_flags(&ev, mode) | ev.extra.flags,
118        )?;
119
120        Ok(())
121    }
122
123    /// Modifies an existing file descriptor.
124    pub fn modify(&self, fd: BorrowedFd<'_>, ev: Event, mode: PollMode) -> io::Result<()> {
125        let span = tracing::trace_span!(
126            "modify",
127            epoll_fd = ?self.epoll_fd.as_raw_fd(),
128            ?fd,
129            ?ev,
130        );
131        let _enter = span.enter();
132
133        epoll::modify(
134            &self.epoll_fd,
135            fd,
136            epoll::EventData::new_u64(ev.key as u64),
137            epoll_flags(&ev, mode) | ev.extra.flags,
138        )?;
139
140        Ok(())
141    }
142
143    /// Deletes a file descriptor.
144    pub fn delete(&self, fd: BorrowedFd<'_>) -> io::Result<()> {
145        let span = tracing::trace_span!(
146            "delete",
147            epoll_fd = ?self.epoll_fd.as_raw_fd(),
148            ?fd,
149        );
150        let _enter = span.enter();
151
152        epoll::delete(&self.epoll_fd, fd)?;
153
154        Ok(())
155    }
156
157    /// Waits for I/O events with an optional timeout.
158    #[allow(clippy::needless_update)]
159    pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
160        let span = tracing::trace_span!(
161            "wait",
162            epoll_fd = ?self.epoll_fd.as_raw_fd(),
163            ?timeout,
164        );
165        let _enter = span.enter();
166
167        #[cfg(not(target_os = "redox"))]
168        if let Some(ref timer_fd) = self.timer_fd {
169            // Configure the timeout using timerfd.
170            let new_val = Itimerspec {
171                it_interval: TS_ZERO,
172                it_value: match timeout {
173                    None => TS_ZERO,
174                    Some(t) => {
175                        let mut ts = TS_ZERO;
176                        ts.tv_sec = t.as_secs() as _;
177                        ts.tv_nsec = t.subsec_nanos() as _;
178                        ts
179                    }
180                },
181                ..unsafe { std::mem::zeroed() }
182            };
183
184            timerfd_settime(timer_fd, TimerfdTimerFlags::empty(), &new_val)?;
185
186            // Set interest in timerfd.
187            self.modify(
188                timer_fd.as_fd(),
189                Event::readable(crate::NOTIFY_KEY),
190                PollMode::Oneshot,
191            )?;
192        }
193
194        #[cfg(not(target_os = "redox"))]
195        let timer_fd = &self.timer_fd;
196        #[cfg(target_os = "redox")]
197        let timer_fd: Option<core::convert::Infallible> = None;
198
199        // Timeout for epoll. In case of overflow, use no timeout.
200        let timeout = match (timer_fd, timeout) {
201            (_, Some(t)) if t == Duration::from_secs(0) => Some(Timespec::default()),
202            (None, Some(t)) => Timespec::try_from(t).ok(),
203            _ => None,
204        };
205
206        // Wait for I/O events.
207        epoll::wait(
208            &self.epoll_fd,
209            spare_capacity(&mut events.list),
210            timeout.as_ref(),
211        )?;
212        tracing::trace!(
213            epoll_fd = ?self.epoll_fd.as_raw_fd(),
214            res = ?events.list.len(),
215            "new events",
216        );
217
218        // Clear the notification (if received) and re-register interest in it.
219        self.notifier.clear();
220        self.modify(
221            self.notifier.as_fd(),
222            Event::readable(crate::NOTIFY_KEY),
223            PollMode::Oneshot,
224        )?;
225        Ok(())
226    }
227
228    /// Sends a notification to wake up the current or next `wait()` call.
229    pub fn notify(&self) -> io::Result<()> {
230        let span = tracing::trace_span!(
231            "notify",
232            epoll_fd = ?self.epoll_fd.as_raw_fd(),
233            notifier = ?self.notifier,
234        );
235        let _enter = span.enter();
236
237        self.notifier.notify();
238        Ok(())
239    }
240}
241
242impl AsRawFd for Poller {
243    fn as_raw_fd(&self) -> RawFd {
244        self.epoll_fd.as_raw_fd()
245    }
246}
247
248impl AsFd for Poller {
249    fn as_fd(&self) -> BorrowedFd<'_> {
250        self.epoll_fd.as_fd()
251    }
252}
253
254impl Drop for Poller {
255    fn drop(&mut self) {
256        let span = tracing::trace_span!(
257            "drop",
258            epoll_fd = ?self.epoll_fd.as_raw_fd(),
259            notifier = ?self.notifier,
260        );
261        let _enter = span.enter();
262
263        #[cfg(not(target_os = "redox"))]
264        if let Some(timer_fd) = self.timer_fd.take() {
265            let _ = self.delete(timer_fd.as_fd());
266        }
267        let _ = self.delete(self.notifier.as_fd());
268    }
269}
270
271/// `timespec` value that equals zero.
272#[cfg(not(target_os = "redox"))]
273const TS_ZERO: Timespec = unsafe { std::mem::transmute([0u8; std::mem::size_of::<Timespec>()]) };
274
275/// Get the EPOLL flags for the interest.
276fn epoll_flags(interest: &Event, mode: PollMode) -> epoll::EventFlags {
277    let mut flags = match mode {
278        PollMode::Oneshot => epoll::EventFlags::ONESHOT,
279        PollMode::Level => epoll::EventFlags::empty(),
280        PollMode::Edge => epoll::EventFlags::ET,
281        PollMode::EdgeOneshot => epoll::EventFlags::ET | epoll::EventFlags::ONESHOT,
282    };
283    if interest.readable {
284        flags |= read_flags();
285    }
286    if interest.writable {
287        flags |= write_flags();
288    }
289    flags
290}
291
292/// Epoll flags for all possible readability events.
293fn read_flags() -> epoll::EventFlags {
294    use epoll::EventFlags as Epoll;
295    Epoll::IN | Epoll::HUP | Epoll::ERR | Epoll::PRI
296}
297
298/// Epoll flags for all possible writability events.
299fn write_flags() -> epoll::EventFlags {
300    use epoll::EventFlags as Epoll;
301    Epoll::OUT | Epoll::HUP | Epoll::ERR
302}
303
304/// A list of reported I/O events.
305pub struct Events {
306    list: Vec<epoll::Event>,
307}
308
309unsafe impl Send for Events {}
310
311impl Events {
312    /// Creates an empty list.
313    pub fn with_capacity(cap: usize) -> Events {
314        Events {
315            list: Vec::with_capacity(cap),
316        }
317    }
318
319    /// Iterates over I/O events.
320    pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
321        self.list.iter().map(|ev| {
322            let flags = ev.flags;
323            Event {
324                key: ev.data.u64() as usize,
325                readable: flags.intersects(read_flags()),
326                writable: flags.intersects(write_flags()),
327                extra: EventExtra { flags },
328            }
329        })
330    }
331
332    /// Clear the list.
333    pub fn clear(&mut self) {
334        self.list.clear();
335    }
336
337    /// Get the capacity of the list.
338    pub fn capacity(&self) -> usize {
339        self.list.capacity()
340    }
341}
342
343/// Extra information about this event.
344#[derive(Debug, Clone, Copy, PartialEq, Eq)]
345pub struct EventExtra {
346    flags: epoll::EventFlags,
347}
348
349impl EventExtra {
350    /// Create an empty version of the data.
351    #[inline]
352    pub const fn empty() -> EventExtra {
353        EventExtra {
354            flags: epoll::EventFlags::empty(),
355        }
356    }
357
358    /// Add the interrupt flag to this event.
359    #[inline]
360    pub fn set_hup(&mut self, active: bool) {
361        self.flags.set(epoll::EventFlags::HUP, active);
362    }
363
364    /// Add the priority flag to this event.
365    #[inline]
366    pub fn set_pri(&mut self, active: bool) {
367        self.flags.set(epoll::EventFlags::PRI, active);
368    }
369
370    /// Tell if the interrupt flag is set.
371    #[inline]
372    pub fn is_hup(&self) -> bool {
373        self.flags.contains(epoll::EventFlags::HUP)
374    }
375
376    /// Tell if the priority flag is set.
377    #[inline]
378    pub fn is_pri(&self) -> bool {
379        self.flags.contains(epoll::EventFlags::PRI)
380    }
381
382    #[inline]
383    pub fn is_connect_failed(&self) -> Option<bool> {
384        Some(
385            self.flags.contains(epoll::EventFlags::ERR)
386                && self.flags.contains(epoll::EventFlags::HUP),
387        )
388    }
389
390    #[inline]
391    pub fn is_err(&self) -> Option<bool> {
392        Some(self.flags.contains(epoll::EventFlags::ERR))
393    }
394}
395
396/// The notifier for Linux.
397///
398/// Certain container runtimes do not expose eventfd to the client, as it relies on the host and
399/// can be used to "escape" the container under certain conditions. Gramine is the prime example,
400/// see [here](gramine). In this case, fall back to using a pipe.
401///
402/// [gramine]: https://gramine.readthedocs.io/en/stable/manifest-syntax.html#allowing-eventfd
403#[derive(Debug)]
404enum Notifier {
405    /// The primary notifier, using eventfd.
406    #[cfg(not(target_os = "redox"))]
407    EventFd(OwnedFd),
408
409    /// The fallback notifier, using a pipe.
410    Pipe {
411        /// The read end of the pipe.
412        read_pipe: OwnedFd,
413
414        /// The write end of the pipe.
415        write_pipe: OwnedFd,
416    },
417}
418
419impl Notifier {
420    /// Create a new notifier.
421    fn new() -> io::Result<Self> {
422        // Skip eventfd for testing if necessary.
423        #[cfg(not(target_os = "redox"))]
424        {
425            if !cfg!(polling_test_epoll_pipe) {
426                // Try to create an eventfd.
427                match eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK) {
428                    Ok(fd) => {
429                        tracing::trace!("created eventfd for notifier");
430                        return Ok(Notifier::EventFd(fd));
431                    }
432
433                    Err(err) => {
434                        tracing::warn!(
435                            "eventfd() failed with error ({}), falling back to pipe",
436                            err
437                        );
438                    }
439                }
440            }
441        }
442
443        let (read, write) = pipe_with(PipeFlags::CLOEXEC).or_else(|_| {
444            let (read, write) = pipe()?;
445            fcntl_setfd(&read, fcntl_getfd(&read)? | FdFlags::CLOEXEC)?;
446            fcntl_setfd(&write, fcntl_getfd(&write)? | FdFlags::CLOEXEC)?;
447            io::Result::Ok((read, write))
448        })?;
449
450        fcntl_setfl(&read, fcntl_getfl(&read)? | OFlags::NONBLOCK)?;
451        Ok(Notifier::Pipe {
452            read_pipe: read,
453            write_pipe: write,
454        })
455    }
456
457    /// The file descriptor to register in the poller.
458    fn as_fd(&self) -> BorrowedFd<'_> {
459        match self {
460            #[cfg(not(target_os = "redox"))]
461            Notifier::EventFd(fd) => fd.as_fd(),
462            Notifier::Pipe {
463                read_pipe: read, ..
464            } => read.as_fd(),
465        }
466    }
467
468    /// Notify the poller.
469    fn notify(&self) {
470        match self {
471            #[cfg(not(target_os = "redox"))]
472            Self::EventFd(fd) => {
473                let buf: [u8; 8] = 1u64.to_ne_bytes();
474                let _ = write(fd, &buf);
475            }
476
477            Self::Pipe { write_pipe, .. } => {
478                write(write_pipe, &[0; 1]).ok();
479            }
480        }
481    }
482
483    /// Clear the notification.
484    fn clear(&self) {
485        match self {
486            #[cfg(not(target_os = "redox"))]
487            Self::EventFd(fd) => {
488                let mut buf = [0u8; 8];
489                let _ = read(fd, &mut buf);
490            }
491
492            Self::Pipe { read_pipe, .. } => while read(read_pipe, &mut [0u8; 1024]).is_ok() {},
493        }
494    }
495}