1use std::io;
4use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
5use std::time::Duration;
6
7#[cfg(not(target_os = "redox"))]
8use rustix::event::{eventfd, EventfdFlags};
9#[cfg(not(target_os = "redox"))]
10use rustix::time::{
11 timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags,
12};
13
14use rustix::buffer::spare_capacity;
15use rustix::event::{epoll, Timespec};
16use rustix::fd::OwnedFd;
17use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
18use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
19use rustix::pipe::{pipe, pipe_with, PipeFlags};
20
21use crate::{Event, PollMode};
22
23#[derive(Debug)]
25pub struct Poller {
26 epoll_fd: OwnedFd,
28
29 notifier: Notifier,
31
32 #[cfg(not(target_os = "redox"))]
36 timer_fd: Option<OwnedFd>,
37}
38
39impl Poller {
40 pub fn new() -> io::Result<Poller> {
42 let epoll_fd = epoll::create(epoll::CreateFlags::CLOEXEC)?;
46
47 let notifier = Notifier::new()?;
49 #[cfg(not(target_os = "redox"))]
50 let timer_fd = timerfd_create(
51 TimerfdClockId::Monotonic,
52 TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK,
53 )
54 .ok();
55
56 let poller = Poller {
57 epoll_fd,
58 notifier,
59 #[cfg(not(target_os = "redox"))]
60 timer_fd,
61 };
62
63 unsafe {
64 #[cfg(not(target_os = "redox"))]
65 if let Some(ref timer_fd) = poller.timer_fd {
66 poller.add(
67 timer_fd.as_raw_fd(),
68 Event::none(crate::NOTIFY_KEY),
69 PollMode::Oneshot,
70 )?;
71 }
72
73 poller.add(
74 poller.notifier.as_fd().as_raw_fd(),
75 Event::readable(crate::NOTIFY_KEY),
76 PollMode::Oneshot,
77 )?;
78 }
79
80 tracing::trace!(
81 epoll_fd = ?poller.epoll_fd.as_raw_fd(),
82 notifier = ?poller.notifier,
83 "new",
84 );
85 Ok(poller)
86 }
87
88 pub fn supports_level(&self) -> bool {
90 true
91 }
92
93 pub fn supports_edge(&self) -> bool {
95 true
96 }
97
98 pub unsafe fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
105 let span = tracing::trace_span!(
106 "add",
107 epoll_fd = ?self.epoll_fd.as_raw_fd(),
108 ?fd,
109 ?ev,
110 );
111 let _enter = span.enter();
112
113 epoll::add(
114 &self.epoll_fd,
115 unsafe { rustix::fd::BorrowedFd::borrow_raw(fd) },
116 epoll::EventData::new_u64(ev.key as u64),
117 epoll_flags(&ev, mode) | ev.extra.flags,
118 )?;
119
120 Ok(())
121 }
122
123 pub fn modify(&self, fd: BorrowedFd<'_>, ev: Event, mode: PollMode) -> io::Result<()> {
125 let span = tracing::trace_span!(
126 "modify",
127 epoll_fd = ?self.epoll_fd.as_raw_fd(),
128 ?fd,
129 ?ev,
130 );
131 let _enter = span.enter();
132
133 epoll::modify(
134 &self.epoll_fd,
135 fd,
136 epoll::EventData::new_u64(ev.key as u64),
137 epoll_flags(&ev, mode) | ev.extra.flags,
138 )?;
139
140 Ok(())
141 }
142
143 pub fn delete(&self, fd: BorrowedFd<'_>) -> io::Result<()> {
145 let span = tracing::trace_span!(
146 "delete",
147 epoll_fd = ?self.epoll_fd.as_raw_fd(),
148 ?fd,
149 );
150 let _enter = span.enter();
151
152 epoll::delete(&self.epoll_fd, fd)?;
153
154 Ok(())
155 }
156
157 #[allow(clippy::needless_update)]
159 pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
160 let span = tracing::trace_span!(
161 "wait",
162 epoll_fd = ?self.epoll_fd.as_raw_fd(),
163 ?timeout,
164 );
165 let _enter = span.enter();
166
167 #[cfg(not(target_os = "redox"))]
168 if let Some(ref timer_fd) = self.timer_fd {
169 let new_val = Itimerspec {
171 it_interval: TS_ZERO,
172 it_value: match timeout {
173 None => TS_ZERO,
174 Some(t) => {
175 let mut ts = TS_ZERO;
176 ts.tv_sec = t.as_secs() as _;
177 ts.tv_nsec = t.subsec_nanos() as _;
178 ts
179 }
180 },
181 ..unsafe { std::mem::zeroed() }
182 };
183
184 timerfd_settime(timer_fd, TimerfdTimerFlags::empty(), &new_val)?;
185
186 self.modify(
188 timer_fd.as_fd(),
189 Event::readable(crate::NOTIFY_KEY),
190 PollMode::Oneshot,
191 )?;
192 }
193
194 #[cfg(not(target_os = "redox"))]
195 let timer_fd = &self.timer_fd;
196 #[cfg(target_os = "redox")]
197 let timer_fd: Option<core::convert::Infallible> = None;
198
199 let timeout = match (timer_fd, timeout) {
201 (_, Some(t)) if t == Duration::from_secs(0) => Some(Timespec::default()),
202 (None, Some(t)) => Timespec::try_from(t).ok(),
203 _ => None,
204 };
205
206 epoll::wait(
208 &self.epoll_fd,
209 spare_capacity(&mut events.list),
210 timeout.as_ref(),
211 )?;
212 tracing::trace!(
213 epoll_fd = ?self.epoll_fd.as_raw_fd(),
214 res = ?events.list.len(),
215 "new events",
216 );
217
218 self.notifier.clear();
220 self.modify(
221 self.notifier.as_fd(),
222 Event::readable(crate::NOTIFY_KEY),
223 PollMode::Oneshot,
224 )?;
225 Ok(())
226 }
227
228 pub fn notify(&self) -> io::Result<()> {
230 let span = tracing::trace_span!(
231 "notify",
232 epoll_fd = ?self.epoll_fd.as_raw_fd(),
233 notifier = ?self.notifier,
234 );
235 let _enter = span.enter();
236
237 self.notifier.notify();
238 Ok(())
239 }
240}
241
242impl AsRawFd for Poller {
243 fn as_raw_fd(&self) -> RawFd {
244 self.epoll_fd.as_raw_fd()
245 }
246}
247
248impl AsFd for Poller {
249 fn as_fd(&self) -> BorrowedFd<'_> {
250 self.epoll_fd.as_fd()
251 }
252}
253
254impl Drop for Poller {
255 fn drop(&mut self) {
256 let span = tracing::trace_span!(
257 "drop",
258 epoll_fd = ?self.epoll_fd.as_raw_fd(),
259 notifier = ?self.notifier,
260 );
261 let _enter = span.enter();
262
263 #[cfg(not(target_os = "redox"))]
264 if let Some(timer_fd) = self.timer_fd.take() {
265 let _ = self.delete(timer_fd.as_fd());
266 }
267 let _ = self.delete(self.notifier.as_fd());
268 }
269}
270
271#[cfg(not(target_os = "redox"))]
273const TS_ZERO: Timespec = unsafe { std::mem::transmute([0u8; std::mem::size_of::<Timespec>()]) };
274
275fn epoll_flags(interest: &Event, mode: PollMode) -> epoll::EventFlags {
277 let mut flags = match mode {
278 PollMode::Oneshot => epoll::EventFlags::ONESHOT,
279 PollMode::Level => epoll::EventFlags::empty(),
280 PollMode::Edge => epoll::EventFlags::ET,
281 PollMode::EdgeOneshot => epoll::EventFlags::ET | epoll::EventFlags::ONESHOT,
282 };
283 if interest.readable {
284 flags |= read_flags();
285 }
286 if interest.writable {
287 flags |= write_flags();
288 }
289 flags
290}
291
292fn read_flags() -> epoll::EventFlags {
294 use epoll::EventFlags as Epoll;
295 Epoll::IN | Epoll::HUP | Epoll::ERR | Epoll::PRI
296}
297
298fn write_flags() -> epoll::EventFlags {
300 use epoll::EventFlags as Epoll;
301 Epoll::OUT | Epoll::HUP | Epoll::ERR
302}
303
304pub struct Events {
306 list: Vec<epoll::Event>,
307}
308
309unsafe impl Send for Events {}
310
311impl Events {
312 pub fn with_capacity(cap: usize) -> Events {
314 Events {
315 list: Vec::with_capacity(cap),
316 }
317 }
318
319 pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
321 self.list.iter().map(|ev| {
322 let flags = ev.flags;
323 Event {
324 key: ev.data.u64() as usize,
325 readable: flags.intersects(read_flags()),
326 writable: flags.intersects(write_flags()),
327 extra: EventExtra { flags },
328 }
329 })
330 }
331
332 pub fn clear(&mut self) {
334 self.list.clear();
335 }
336
337 pub fn capacity(&self) -> usize {
339 self.list.capacity()
340 }
341}
342
343#[derive(Debug, Clone, Copy, PartialEq, Eq)]
345pub struct EventExtra {
346 flags: epoll::EventFlags,
347}
348
349impl EventExtra {
350 #[inline]
352 pub const fn empty() -> EventExtra {
353 EventExtra {
354 flags: epoll::EventFlags::empty(),
355 }
356 }
357
358 #[inline]
360 pub fn set_hup(&mut self, active: bool) {
361 self.flags.set(epoll::EventFlags::HUP, active);
362 }
363
364 #[inline]
366 pub fn set_pri(&mut self, active: bool) {
367 self.flags.set(epoll::EventFlags::PRI, active);
368 }
369
370 #[inline]
372 pub fn is_hup(&self) -> bool {
373 self.flags.contains(epoll::EventFlags::HUP)
374 }
375
376 #[inline]
378 pub fn is_pri(&self) -> bool {
379 self.flags.contains(epoll::EventFlags::PRI)
380 }
381
382 #[inline]
383 pub fn is_connect_failed(&self) -> Option<bool> {
384 Some(
385 self.flags.contains(epoll::EventFlags::ERR)
386 && self.flags.contains(epoll::EventFlags::HUP),
387 )
388 }
389
390 #[inline]
391 pub fn is_err(&self) -> Option<bool> {
392 Some(self.flags.contains(epoll::EventFlags::ERR))
393 }
394}
395
396#[derive(Debug)]
404enum Notifier {
405 #[cfg(not(target_os = "redox"))]
407 EventFd(OwnedFd),
408
409 Pipe {
411 read_pipe: OwnedFd,
413
414 write_pipe: OwnedFd,
416 },
417}
418
419impl Notifier {
420 fn new() -> io::Result<Self> {
422 #[cfg(not(target_os = "redox"))]
424 {
425 if !cfg!(polling_test_epoll_pipe) {
426 match eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK) {
428 Ok(fd) => {
429 tracing::trace!("created eventfd for notifier");
430 return Ok(Notifier::EventFd(fd));
431 }
432
433 Err(err) => {
434 tracing::warn!(
435 "eventfd() failed with error ({}), falling back to pipe",
436 err
437 );
438 }
439 }
440 }
441 }
442
443 let (read, write) = pipe_with(PipeFlags::CLOEXEC).or_else(|_| {
444 let (read, write) = pipe()?;
445 fcntl_setfd(&read, fcntl_getfd(&read)? | FdFlags::CLOEXEC)?;
446 fcntl_setfd(&write, fcntl_getfd(&write)? | FdFlags::CLOEXEC)?;
447 io::Result::Ok((read, write))
448 })?;
449
450 fcntl_setfl(&read, fcntl_getfl(&read)? | OFlags::NONBLOCK)?;
451 Ok(Notifier::Pipe {
452 read_pipe: read,
453 write_pipe: write,
454 })
455 }
456
457 fn as_fd(&self) -> BorrowedFd<'_> {
459 match self {
460 #[cfg(not(target_os = "redox"))]
461 Notifier::EventFd(fd) => fd.as_fd(),
462 Notifier::Pipe {
463 read_pipe: read, ..
464 } => read.as_fd(),
465 }
466 }
467
468 fn notify(&self) {
470 match self {
471 #[cfg(not(target_os = "redox"))]
472 Self::EventFd(fd) => {
473 let buf: [u8; 8] = 1u64.to_ne_bytes();
474 let _ = write(fd, &buf);
475 }
476
477 Self::Pipe { write_pipe, .. } => {
478 write(write_pipe, &[0; 1]).ok();
479 }
480 }
481 }
482
483 fn clear(&self) {
485 match self {
486 #[cfg(not(target_os = "redox"))]
487 Self::EventFd(fd) => {
488 let mut buf = [0u8; 8];
489 let _ = read(fd, &mut buf);
490 }
491
492 Self::Pipe { read_pipe, .. } => while read(read_pipe, &mut [0u8; 1024]).is_ok() {},
493 }
494 }
495}