notify/
inotify.rs

1//! Watcher implementation for the inotify Linux API
2//!
3//! The inotify API provides a mechanism for monitoring filesystem events.  Inotify can be used to
4//! monitor individual files, or to monitor directories.  When a directory is monitored, inotify
5//! will return events for the directory itself, and for files inside the directory.
6
7use super::event::*;
8use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher};
9use crate::{bounded, unbounded, BoundSender, Receiver, Sender};
10use inotify as inotify_sys;
11use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
12use std::collections::HashMap;
13use std::env;
14use std::ffi::OsStr;
15use std::fs::metadata;
16use std::os::unix::io::AsRawFd;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::thread;
20use walkdir::WalkDir;
21
22const INOTIFY: mio::Token = mio::Token(0);
23const MESSAGE: mio::Token = mio::Token(1);
24
25// The EventLoop will set up a mio::Poll and use it to wait for the following:
26//
27// -  messages telling it what to do
28//
29// -  events telling it that something has happened on one of the watched files.
30
31struct EventLoop {
32    running: bool,
33    poll: mio::Poll,
34    event_loop_waker: Arc<mio::Waker>,
35    event_loop_tx: Sender<EventLoopMsg>,
36    event_loop_rx: Receiver<EventLoopMsg>,
37    inotify: Option<Inotify>,
38    event_handler: Box<dyn EventHandler>,
39    /// PathBuf -> (WatchDescriptor, WatchMask, is_recursive, is_dir)
40    watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
41    paths: HashMap<WatchDescriptor, PathBuf>,
42    rename_event: Option<Event>,
43    follow_links: bool,
44}
45
46/// Watcher implementation based on inotify
47#[derive(Debug)]
48pub struct INotifyWatcher {
49    channel: Sender<EventLoopMsg>,
50    waker: Arc<mio::Waker>,
51}
52
53enum EventLoopMsg {
54    AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
55    RemoveWatch(PathBuf, Sender<Result<()>>),
56    Shutdown,
57    Configure(Config, BoundSender<Result<bool>>),
58}
59
60#[inline]
61fn add_watch_by_event(
62    path: &PathBuf,
63    event: &inotify_sys::Event<&OsStr>,
64    watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
65    add_watches: &mut Vec<PathBuf>,
66) {
67    if event.mask.contains(EventMask::ISDIR) {
68        if let Some(parent_path) = path.parent() {
69            if let Some(&(_, _, is_recursive, _)) = watches.get(parent_path) {
70                if is_recursive {
71                    add_watches.push(path.to_owned());
72                }
73            }
74        }
75    }
76}
77
78#[inline]
79fn remove_watch_by_event(
80    path: &PathBuf,
81    watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
82    remove_watches: &mut Vec<PathBuf>,
83) {
84    if watches.contains_key(path) {
85        remove_watches.push(path.to_owned());
86    }
87}
88
89impl EventLoop {
90    pub fn new(
91        inotify: Inotify,
92        event_handler: Box<dyn EventHandler>,
93        follow_links: bool,
94    ) -> Result<Self> {
95        let (event_loop_tx, event_loop_rx) = unbounded::<EventLoopMsg>();
96        let poll = mio::Poll::new()?;
97
98        let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
99
100        let inotify_fd = inotify.as_raw_fd();
101        let mut evented_inotify = mio::unix::SourceFd(&inotify_fd);
102        poll.registry()
103            .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?;
104
105        let event_loop = EventLoop {
106            running: true,
107            poll,
108            event_loop_waker,
109            event_loop_tx,
110            event_loop_rx,
111            inotify: Some(inotify),
112            event_handler,
113            watches: HashMap::new(),
114            paths: HashMap::new(),
115            rename_event: None,
116            follow_links,
117        };
118        Ok(event_loop)
119    }
120
121    // Run the event loop.
122    pub fn run(self) {
123        let _ = thread::Builder::new()
124            .name("notify-rs inotify loop".to_string())
125            .spawn(|| self.event_loop_thread());
126    }
127
128    fn event_loop_thread(mut self) {
129        let mut events = mio::Events::with_capacity(16);
130        loop {
131            // Wait for something to happen.
132            match self.poll.poll(&mut events, None) {
133                Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
134                    // System call was interrupted, we will retry
135                    // TODO: Not covered by tests (to reproduce likely need to setup signal handlers)
136                }
137                Err(e) => panic!("poll failed: {}", e),
138                Ok(()) => {}
139            }
140
141            // Process whatever happened.
142            for event in &events {
143                self.handle_event(event);
144            }
145
146            // Stop, if we're done.
147            if !self.running {
148                break;
149            }
150        }
151    }
152
153    // Handle a single event.
154    fn handle_event(&mut self, event: &mio::event::Event) {
155        match event.token() {
156            MESSAGE => {
157                // The channel is readable - handle messages.
158                self.handle_messages()
159            }
160            INOTIFY => {
161                // inotify has something to tell us.
162                self.handle_inotify()
163            }
164            _ => unreachable!(),
165        }
166    }
167
168    fn handle_messages(&mut self) {
169        while let Ok(msg) = self.event_loop_rx.try_recv() {
170            match msg {
171                EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
172                    let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
173                }
174                EventLoopMsg::RemoveWatch(path, tx) => {
175                    let _ = tx.send(self.remove_watch(path, false));
176                }
177                EventLoopMsg::Shutdown => {
178                    let _ = self.remove_all_watches();
179                    if let Some(inotify) = self.inotify.take() {
180                        let _ = inotify.close();
181                    }
182                    self.running = false;
183                    break;
184                }
185                EventLoopMsg::Configure(config, tx) => {
186                    self.configure_raw_mode(config, tx);
187                }
188            }
189        }
190    }
191
192    fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) {
193        tx.send(Ok(false))
194            .expect("configuration channel disconnected");
195    }
196
197    fn handle_inotify(&mut self) {
198        let mut add_watches = Vec::new();
199        let mut remove_watches = Vec::new();
200
201        if let Some(ref mut inotify) = self.inotify {
202            let mut buffer = [0; 1024];
203            // Read all buffers available.
204            loop {
205                match inotify.read_events(&mut buffer) {
206                    Ok(events) => {
207                        let mut num_events = 0;
208                        for event in events {
209                            log::trace!("inotify event: {event:?}");
210
211                            num_events += 1;
212                            if event.mask.contains(EventMask::Q_OVERFLOW) {
213                                let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
214                                self.event_handler.handle_event(ev);
215                            }
216
217                            let path = match event.name {
218                                Some(name) => self.paths.get(&event.wd).map(|root| root.join(name)),
219                                None => self.paths.get(&event.wd).cloned(),
220                            };
221
222                            let path = match path {
223                                Some(path) => path,
224                                None => {
225                                    log::debug!("inotify event with unknown descriptor: {event:?}");
226                                    continue;
227                                }
228                            };
229
230                            let mut evs = Vec::new();
231
232                            if event.mask.contains(EventMask::MOVED_FROM) {
233                                remove_watch_by_event(&path, &self.watches, &mut remove_watches);
234
235                                let event = Event::new(EventKind::Modify(ModifyKind::Name(
236                                    RenameMode::From,
237                                )))
238                                .add_path(path.clone())
239                                .set_tracker(event.cookie as usize);
240
241                                self.rename_event = Some(event.clone());
242
243                                evs.push(event);
244                            } else if event.mask.contains(EventMask::MOVED_TO) {
245                                evs.push(
246                                    Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::To)))
247                                        .set_tracker(event.cookie as usize)
248                                        .add_path(path.clone()),
249                                );
250
251                                let trackers_match =
252                                    self.rename_event.as_ref().and_then(|e| e.tracker())
253                                        == Some(event.cookie as usize);
254
255                                if trackers_match {
256                                    let rename_event = self.rename_event.take().unwrap(); // unwrap is safe because `rename_event` must be set at this point
257                                    evs.push(
258                                        Event::new(EventKind::Modify(ModifyKind::Name(
259                                            RenameMode::Both,
260                                        )))
261                                        .set_tracker(event.cookie as usize)
262                                        .add_some_path(rename_event.paths.first().cloned())
263                                        .add_path(path.clone()),
264                                    );
265                                }
266                                add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
267                            }
268                            if event.mask.contains(EventMask::MOVE_SELF) {
269                                evs.push(
270                                    Event::new(EventKind::Modify(ModifyKind::Name(
271                                        RenameMode::From,
272                                    )))
273                                    .add_path(path.clone()),
274                                );
275                                // TODO stat the path and get to new path
276                                // - emit To and Both events
277                                // - change prefix for further events
278                            }
279                            if event.mask.contains(EventMask::CREATE) {
280                                evs.push(
281                                    Event::new(EventKind::Create(
282                                        if event.mask.contains(EventMask::ISDIR) {
283                                            CreateKind::Folder
284                                        } else {
285                                            CreateKind::File
286                                        },
287                                    ))
288                                    .add_path(path.clone()),
289                                );
290                                add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
291                            }
292                            if event.mask.contains(EventMask::DELETE) {
293                                evs.push(
294                                    Event::new(EventKind::Remove(
295                                        if event.mask.contains(EventMask::ISDIR) {
296                                            RemoveKind::Folder
297                                        } else {
298                                            RemoveKind::File
299                                        },
300                                    ))
301                                    .add_path(path.clone()),
302                                );
303                                remove_watch_by_event(&path, &self.watches, &mut remove_watches);
304                            }
305                            if event.mask.contains(EventMask::DELETE_SELF) {
306                                let remove_kind = match self.watches.get(&path) {
307                                    Some(&(_, _, _, true)) => RemoveKind::Folder,
308                                    Some(&(_, _, _, false)) => RemoveKind::File,
309                                    None => RemoveKind::Other,
310                                };
311                                evs.push(
312                                    Event::new(EventKind::Remove(remove_kind))
313                                        .add_path(path.clone()),
314                                );
315                                remove_watch_by_event(&path, &self.watches, &mut remove_watches);
316                            }
317                            if event.mask.contains(EventMask::MODIFY) {
318                                evs.push(
319                                    Event::new(EventKind::Modify(ModifyKind::Data(
320                                        DataChange::Any,
321                                    )))
322                                    .add_path(path.clone()),
323                                );
324                            }
325                            if event.mask.contains(EventMask::CLOSE_WRITE) {
326                                evs.push(
327                                    Event::new(EventKind::Access(AccessKind::Close(
328                                        AccessMode::Write,
329                                    )))
330                                    .add_path(path.clone()),
331                                );
332                            }
333                            if event.mask.contains(EventMask::CLOSE_NOWRITE) {
334                                evs.push(
335                                    Event::new(EventKind::Access(AccessKind::Close(
336                                        AccessMode::Read,
337                                    )))
338                                    .add_path(path.clone()),
339                                );
340                            }
341                            if event.mask.contains(EventMask::ATTRIB) {
342                                evs.push(
343                                    Event::new(EventKind::Modify(ModifyKind::Metadata(
344                                        MetadataKind::Any,
345                                    )))
346                                    .add_path(path.clone()),
347                                );
348                            }
349                            if event.mask.contains(EventMask::OPEN) {
350                                evs.push(
351                                    Event::new(EventKind::Access(AccessKind::Open(
352                                        AccessMode::Any,
353                                    )))
354                                    .add_path(path.clone()),
355                                );
356                            }
357
358                            for ev in evs {
359                                self.event_handler.handle_event(Ok(ev));
360                            }
361                        }
362
363                        // All events read. Break out.
364                        if num_events == 0 {
365                            break;
366                        }
367                    }
368                    Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
369                        // No events read. Break out.
370                        break;
371                    }
372                    Err(e) => {
373                        self.event_handler.handle_event(Err(Error::io(e)));
374                    }
375                }
376            }
377        }
378
379        for path in remove_watches {
380            self.remove_watch(path, true).ok();
381        }
382
383        for path in add_watches {
384            if let Err(add_watch_error) = self.add_watch(path, true, false) {
385                // The handler should be notified if we have reached the limit.
386                // Otherwise, the user might expect that a recursive watch
387                // is continuing to work correctly, but it's not.
388                if let ErrorKind::MaxFilesWatch = add_watch_error.kind {
389                    self.event_handler.handle_event(Err(add_watch_error));
390
391                    // After that kind of a error we should stop adding watches,
392                    // because the limit has already reached and all next calls
393                    // will return us only the same error.
394                    break;
395                }
396            }
397        }
398    }
399
400    fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
401        // If the watch is not recursive, or if we determine (by stat'ing the path to get its
402        // metadata) that the watched path is not a directory, add a single path watch.
403        if !is_recursive || !metadata(&path).map_err(Error::io_watch)?.is_dir() {
404            return self.add_single_watch(path, false, true);
405        }
406
407        for entry in WalkDir::new(path)
408            .follow_links(self.follow_links)
409            .into_iter()
410            .filter_map(filter_dir)
411        {
412            self.add_single_watch(entry.into_path(), is_recursive, watch_self)?;
413            watch_self = false;
414        }
415
416        Ok(())
417    }
418
419    fn add_single_watch(
420        &mut self,
421        path: PathBuf,
422        is_recursive: bool,
423        watch_self: bool,
424    ) -> Result<()> {
425        let mut watchmask = WatchMask::ATTRIB
426            | WatchMask::CREATE
427            | WatchMask::OPEN
428            | WatchMask::DELETE
429            | WatchMask::CLOSE_WRITE
430            | WatchMask::MODIFY
431            | WatchMask::MOVED_FROM
432            | WatchMask::MOVED_TO;
433
434        if watch_self {
435            watchmask.insert(WatchMask::DELETE_SELF);
436            watchmask.insert(WatchMask::MOVE_SELF);
437        }
438
439        if let Some(&(_, old_watchmask, _, _)) = self.watches.get(&path) {
440            watchmask.insert(old_watchmask);
441            watchmask.insert(WatchMask::MASK_ADD);
442        }
443
444        if let Some(ref mut inotify) = self.inotify {
445            log::trace!("adding inotify watch: {}", path.display());
446
447            match inotify.watches().add(&path, watchmask) {
448                Err(e) => {
449                    Err(if e.raw_os_error() == Some(libc::ENOSPC) {
450                        // do not report inotify limits as "no more space" on linux #266
451                        Error::new(ErrorKind::MaxFilesWatch)
452                    } else if e.kind() == std::io::ErrorKind::NotFound {
453                        Error::new(ErrorKind::PathNotFound)
454                    } else {
455                        Error::io(e)
456                    }
457                    .add_path(path))
458                }
459                Ok(w) => {
460                    watchmask.remove(WatchMask::MASK_ADD);
461                    let is_dir = metadata(&path).map_err(Error::io)?.is_dir();
462                    self.watches
463                        .insert(path.clone(), (w.clone(), watchmask, is_recursive, is_dir));
464                    self.paths.insert(w, path);
465                    Ok(())
466                }
467            }
468        } else {
469            Ok(())
470        }
471    }
472
473    fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
474        match self.watches.remove(&path) {
475            None => return Err(Error::watch_not_found().add_path(path)),
476            Some((w, _, is_recursive, _)) => {
477                if let Some(ref mut inotify) = self.inotify {
478                    let mut inotify_watches = inotify.watches();
479                    log::trace!("removing inotify watch: {}", path.display());
480
481                    inotify_watches
482                        .remove(w.clone())
483                        .map_err(|e| Error::io(e).add_path(path.clone()))?;
484                    self.paths.remove(&w);
485
486                    if is_recursive || remove_recursive {
487                        let mut remove_list = Vec::new();
488                        for (w, p) in &self.paths {
489                            if p.starts_with(&path) {
490                                inotify_watches
491                                    .remove(w.clone())
492                                    .map_err(|e| Error::io(e).add_path(p.into()))?;
493                                self.watches.remove(p);
494                                remove_list.push(w.clone());
495                            }
496                        }
497                        for w in remove_list {
498                            self.paths.remove(&w);
499                        }
500                    }
501                }
502            }
503        }
504        Ok(())
505    }
506
507    fn remove_all_watches(&mut self) -> Result<()> {
508        if let Some(ref mut inotify) = self.inotify {
509            let mut inotify_watches = inotify.watches();
510            for (w, p) in &self.paths {
511                inotify_watches
512                    .remove(w.clone())
513                    .map_err(|e| Error::io(e).add_path(p.into()))?;
514            }
515            self.watches.clear();
516            self.paths.clear();
517        }
518        Ok(())
519    }
520}
521
522/// return `DirEntry` when it is a directory
523fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
524    if let Ok(e) = e {
525        if let Ok(metadata) = e.metadata() {
526            if metadata.is_dir() {
527                return Some(e);
528            }
529        }
530    }
531    None
532}
533
534impl INotifyWatcher {
535    fn from_event_handler(
536        event_handler: Box<dyn EventHandler>,
537        follow_links: bool,
538    ) -> Result<Self> {
539        let inotify = Inotify::init()?;
540        let event_loop = EventLoop::new(inotify, event_handler, follow_links)?;
541        let channel = event_loop.event_loop_tx.clone();
542        let waker = event_loop.event_loop_waker.clone();
543        event_loop.run();
544        Ok(INotifyWatcher { channel, waker })
545    }
546
547    fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
548        let pb = if path.is_absolute() {
549            path.to_owned()
550        } else {
551            let p = env::current_dir().map_err(Error::io)?;
552            p.join(path)
553        };
554        let (tx, rx) = unbounded();
555        let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
556
557        // we expect the event loop to live and reply => unwraps must not panic
558        self.channel.send(msg).unwrap();
559        self.waker.wake().unwrap();
560        rx.recv().unwrap()
561    }
562
563    fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
564        let pb = if path.is_absolute() {
565            path.to_owned()
566        } else {
567            let p = env::current_dir().map_err(Error::io)?;
568            p.join(path)
569        };
570        let (tx, rx) = unbounded();
571        let msg = EventLoopMsg::RemoveWatch(pb, tx);
572
573        // we expect the event loop to live and reply => unwraps must not panic
574        self.channel.send(msg).unwrap();
575        self.waker.wake().unwrap();
576        rx.recv().unwrap()
577    }
578}
579
580impl Watcher for INotifyWatcher {
581    /// Create a new watcher.
582    fn new<F: EventHandler>(event_handler: F, config: Config) -> Result<Self> {
583        Self::from_event_handler(Box::new(event_handler), config.follow_symlinks())
584    }
585
586    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
587        self.watch_inner(path, recursive_mode)
588    }
589
590    fn unwatch(&mut self, path: &Path) -> Result<()> {
591        self.unwatch_inner(path)
592    }
593
594    fn configure(&mut self, config: Config) -> Result<bool> {
595        let (tx, rx) = bounded(1);
596        self.channel.send(EventLoopMsg::Configure(config, tx))?;
597        self.waker.wake()?;
598        rx.recv()?
599    }
600
601    fn kind() -> crate::WatcherKind {
602        crate::WatcherKind::Inotify
603    }
604}
605
606impl Drop for INotifyWatcher {
607    fn drop(&mut self) {
608        // we expect the event loop to live => unwrap must not panic
609        self.channel.send(EventLoopMsg::Shutdown).unwrap();
610        self.waker.wake().unwrap();
611    }
612}
613
614#[cfg(test)]
615mod tests {
616    use std::{
617        sync::{atomic::AtomicBool, mpsc},
618        thread::available_parallelism,
619    };
620
621    use super::*;
622
623    #[test]
624    fn inotify_watcher_is_send_and_sync() {
625        fn check<T: Send + Sync>() {}
626        check::<INotifyWatcher>();
627    }
628
629    #[test]
630    fn native_error_type_on_missing_path() {
631        let mut watcher = INotifyWatcher::new(|_| {}, Config::default()).unwrap();
632
633        let result = watcher.watch(
634            &PathBuf::from("/some/non/existant/path"),
635            RecursiveMode::NonRecursive,
636        );
637
638        assert!(matches!(
639            result,
640            Err(Error {
641                paths: _,
642                kind: ErrorKind::PathNotFound
643            })
644        ))
645    }
646
647    /// Runs manually.
648    ///
649    /// * Save actual value of the limit: `MAX_USER_WATCHES=$(sysctl -n fs.inotify.max_user_watches)`
650    /// * Run the test.
651    /// * Set the limit to 0: `sudo sysctl fs.inotify.max_user_watches=0` while test is running
652    /// * Wait for the test to complete
653    /// * Restore the limit `sudo sysctl fs.inotify.max_user_watches=$MAX_USER_WATCHES`
654    #[test]
655    #[ignore = "requires changing sysctl fs.inotify.max_user_watches while test is running"]
656    fn recursive_watch_calls_handler_if_creating_a_file_raises_max_files_watch() {
657        use std::time::Duration;
658
659        let tmpdir = tempfile::tempdir().unwrap();
660        let (tx, rx) = std::sync::mpsc::channel();
661        let (proc_changed_tx, proc_changed_rx) = std::sync::mpsc::channel();
662        let proc_path = Path::new("/proc/sys/fs/inotify/max_user_watches");
663        let mut watcher = INotifyWatcher::new(
664            move |result: Result<Event>| match result {
665                Ok(event) => {
666                    if event.paths.first().is_some_and(|path| path == proc_path) {
667                        proc_changed_tx.send(()).unwrap();
668                    }
669                }
670                Err(e) => tx.send(e).unwrap(),
671            },
672            Config::default(),
673        )
674        .unwrap();
675
676        watcher
677            .watch(tmpdir.path(), RecursiveMode::Recursive)
678            .unwrap();
679        watcher
680            .watch(proc_path, RecursiveMode::NonRecursive)
681            .unwrap();
682
683        // give the time to set the limit
684        proc_changed_rx
685            .recv_timeout(Duration::from_secs(30))
686            .unwrap();
687
688        let child_dir = tmpdir.path().join("child");
689        std::fs::create_dir(child_dir).unwrap();
690
691        let result = rx.recv_timeout(Duration::from_millis(500));
692
693        assert!(
694            matches!(
695                &result,
696                Ok(Error {
697                    kind: ErrorKind::MaxFilesWatch,
698                    paths: _,
699                })
700            ),
701            "expected {:?}, found: {:#?}",
702            ErrorKind::MaxFilesWatch,
703            result
704        );
705    }
706
707    /// https://github.com/notify-rs/notify/issues/678
708    #[test]
709    fn race_condition_on_unwatch_and_pending_events_with_deleted_descriptor() {
710        let tmpdir = tempfile::tempdir().expect("tmpdir");
711        let (tx, rx) = mpsc::channel();
712        let mut inotify = INotifyWatcher::new(
713            move |e: Result<Event>| {
714                let e = match e {
715                    Ok(e) if e.paths.is_empty() => e,
716                    Ok(_) | Err(_) => return,
717                };
718                let _ = tx.send(e);
719            },
720            Config::default(),
721        )
722        .expect("inotify creation");
723
724        let dir_path = tmpdir.path();
725        let file_path = dir_path.join("foo");
726        std::fs::File::create(&file_path).unwrap();
727
728        let stop = Arc::new(AtomicBool::new(false));
729
730        let handles: Vec<_> = (0..available_parallelism().unwrap().get().max(4))
731            .map(|_| {
732                let file_path = file_path.clone();
733                let stop = stop.clone();
734                thread::spawn(move || {
735                    while !stop.load(std::sync::atomic::Ordering::Relaxed) {
736                        let _ = std::fs::File::open(&file_path).unwrap();
737                    }
738                })
739            })
740            .collect();
741
742        let non_recursive = RecursiveMode::NonRecursive;
743        for _ in 0..(handles.len() * 4) {
744            inotify.watch(dir_path, non_recursive).unwrap();
745            inotify.unwatch(dir_path).unwrap();
746        }
747
748        stop.store(true, std::sync::atomic::Ordering::Relaxed);
749        handles
750            .into_iter()
751            .for_each(|handle| handle.join().ok().unwrap_or_default());
752
753        drop(inotify);
754
755        let events: Vec<_> = rx.into_iter().map(|e| format!("{e:?}")).collect();
756
757        const LOG_LEN: usize = 10;
758        let events_len = events.len();
759        assert!(
760            events.is_empty(),
761            "expected no events without path, but got {events_len}. first 10: {:#?}",
762            &events[..LOG_LEN.min(events_len)]
763        );
764    }
765}