1use super::event::*;
8use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher};
9use crate::{bounded, unbounded, BoundSender, Receiver, Sender};
10use inotify as inotify_sys;
11use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
12use std::collections::HashMap;
13use std::env;
14use std::ffi::OsStr;
15use std::fs::metadata;
16use std::os::unix::io::AsRawFd;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::thread;
20use walkdir::WalkDir;
21
22const INOTIFY: mio::Token = mio::Token(0);
23const MESSAGE: mio::Token = mio::Token(1);
24
25struct EventLoop {
32 running: bool,
33 poll: mio::Poll,
34 event_loop_waker: Arc<mio::Waker>,
35 event_loop_tx: Sender<EventLoopMsg>,
36 event_loop_rx: Receiver<EventLoopMsg>,
37 inotify: Option<Inotify>,
38 event_handler: Box<dyn EventHandler>,
39 watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
41 paths: HashMap<WatchDescriptor, PathBuf>,
42 rename_event: Option<Event>,
43 follow_links: bool,
44}
45
46#[derive(Debug)]
48pub struct INotifyWatcher {
49 channel: Sender<EventLoopMsg>,
50 waker: Arc<mio::Waker>,
51}
52
53enum EventLoopMsg {
54 AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
55 RemoveWatch(PathBuf, Sender<Result<()>>),
56 Shutdown,
57 Configure(Config, BoundSender<Result<bool>>),
58}
59
60#[inline]
61fn add_watch_by_event(
62 path: &PathBuf,
63 event: &inotify_sys::Event<&OsStr>,
64 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
65 add_watches: &mut Vec<PathBuf>,
66) {
67 if event.mask.contains(EventMask::ISDIR) {
68 if let Some(parent_path) = path.parent() {
69 if let Some(&(_, _, is_recursive, _)) = watches.get(parent_path) {
70 if is_recursive {
71 add_watches.push(path.to_owned());
72 }
73 }
74 }
75 }
76}
77
78#[inline]
79fn remove_watch_by_event(
80 path: &PathBuf,
81 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
82 remove_watches: &mut Vec<PathBuf>,
83) {
84 if watches.contains_key(path) {
85 remove_watches.push(path.to_owned());
86 }
87}
88
89impl EventLoop {
90 pub fn new(
91 inotify: Inotify,
92 event_handler: Box<dyn EventHandler>,
93 follow_links: bool,
94 ) -> Result<Self> {
95 let (event_loop_tx, event_loop_rx) = unbounded::<EventLoopMsg>();
96 let poll = mio::Poll::new()?;
97
98 let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
99
100 let inotify_fd = inotify.as_raw_fd();
101 let mut evented_inotify = mio::unix::SourceFd(&inotify_fd);
102 poll.registry()
103 .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?;
104
105 let event_loop = EventLoop {
106 running: true,
107 poll,
108 event_loop_waker,
109 event_loop_tx,
110 event_loop_rx,
111 inotify: Some(inotify),
112 event_handler,
113 watches: HashMap::new(),
114 paths: HashMap::new(),
115 rename_event: None,
116 follow_links,
117 };
118 Ok(event_loop)
119 }
120
121 pub fn run(self) {
123 let _ = thread::Builder::new()
124 .name("notify-rs inotify loop".to_string())
125 .spawn(|| self.event_loop_thread());
126 }
127
128 fn event_loop_thread(mut self) {
129 let mut events = mio::Events::with_capacity(16);
130 loop {
131 match self.poll.poll(&mut events, None) {
133 Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
134 }
137 Err(e) => panic!("poll failed: {}", e),
138 Ok(()) => {}
139 }
140
141 for event in &events {
143 self.handle_event(event);
144 }
145
146 if !self.running {
148 break;
149 }
150 }
151 }
152
153 fn handle_event(&mut self, event: &mio::event::Event) {
155 match event.token() {
156 MESSAGE => {
157 self.handle_messages()
159 }
160 INOTIFY => {
161 self.handle_inotify()
163 }
164 _ => unreachable!(),
165 }
166 }
167
168 fn handle_messages(&mut self) {
169 while let Ok(msg) = self.event_loop_rx.try_recv() {
170 match msg {
171 EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
172 let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
173 }
174 EventLoopMsg::RemoveWatch(path, tx) => {
175 let _ = tx.send(self.remove_watch(path, false));
176 }
177 EventLoopMsg::Shutdown => {
178 let _ = self.remove_all_watches();
179 if let Some(inotify) = self.inotify.take() {
180 let _ = inotify.close();
181 }
182 self.running = false;
183 break;
184 }
185 EventLoopMsg::Configure(config, tx) => {
186 self.configure_raw_mode(config, tx);
187 }
188 }
189 }
190 }
191
192 fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) {
193 tx.send(Ok(false))
194 .expect("configuration channel disconnected");
195 }
196
197 fn handle_inotify(&mut self) {
198 let mut add_watches = Vec::new();
199 let mut remove_watches = Vec::new();
200
201 if let Some(ref mut inotify) = self.inotify {
202 let mut buffer = [0; 1024];
203 loop {
205 match inotify.read_events(&mut buffer) {
206 Ok(events) => {
207 let mut num_events = 0;
208 for event in events {
209 log::trace!("inotify event: {event:?}");
210
211 num_events += 1;
212 if event.mask.contains(EventMask::Q_OVERFLOW) {
213 let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
214 self.event_handler.handle_event(ev);
215 }
216
217 let path = match event.name {
218 Some(name) => self.paths.get(&event.wd).map(|root| root.join(name)),
219 None => self.paths.get(&event.wd).cloned(),
220 };
221
222 let path = match path {
223 Some(path) => path,
224 None => {
225 log::debug!("inotify event with unknown descriptor: {event:?}");
226 continue;
227 }
228 };
229
230 let mut evs = Vec::new();
231
232 if event.mask.contains(EventMask::MOVED_FROM) {
233 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
234
235 let event = Event::new(EventKind::Modify(ModifyKind::Name(
236 RenameMode::From,
237 )))
238 .add_path(path.clone())
239 .set_tracker(event.cookie as usize);
240
241 self.rename_event = Some(event.clone());
242
243 evs.push(event);
244 } else if event.mask.contains(EventMask::MOVED_TO) {
245 evs.push(
246 Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::To)))
247 .set_tracker(event.cookie as usize)
248 .add_path(path.clone()),
249 );
250
251 let trackers_match =
252 self.rename_event.as_ref().and_then(|e| e.tracker())
253 == Some(event.cookie as usize);
254
255 if trackers_match {
256 let rename_event = self.rename_event.take().unwrap(); evs.push(
258 Event::new(EventKind::Modify(ModifyKind::Name(
259 RenameMode::Both,
260 )))
261 .set_tracker(event.cookie as usize)
262 .add_some_path(rename_event.paths.first().cloned())
263 .add_path(path.clone()),
264 );
265 }
266 add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
267 }
268 if event.mask.contains(EventMask::MOVE_SELF) {
269 evs.push(
270 Event::new(EventKind::Modify(ModifyKind::Name(
271 RenameMode::From,
272 )))
273 .add_path(path.clone()),
274 );
275 }
279 if event.mask.contains(EventMask::CREATE) {
280 evs.push(
281 Event::new(EventKind::Create(
282 if event.mask.contains(EventMask::ISDIR) {
283 CreateKind::Folder
284 } else {
285 CreateKind::File
286 },
287 ))
288 .add_path(path.clone()),
289 );
290 add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
291 }
292 if event.mask.contains(EventMask::DELETE) {
293 evs.push(
294 Event::new(EventKind::Remove(
295 if event.mask.contains(EventMask::ISDIR) {
296 RemoveKind::Folder
297 } else {
298 RemoveKind::File
299 },
300 ))
301 .add_path(path.clone()),
302 );
303 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
304 }
305 if event.mask.contains(EventMask::DELETE_SELF) {
306 let remove_kind = match self.watches.get(&path) {
307 Some(&(_, _, _, true)) => RemoveKind::Folder,
308 Some(&(_, _, _, false)) => RemoveKind::File,
309 None => RemoveKind::Other,
310 };
311 evs.push(
312 Event::new(EventKind::Remove(remove_kind))
313 .add_path(path.clone()),
314 );
315 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
316 }
317 if event.mask.contains(EventMask::MODIFY) {
318 evs.push(
319 Event::new(EventKind::Modify(ModifyKind::Data(
320 DataChange::Any,
321 )))
322 .add_path(path.clone()),
323 );
324 }
325 if event.mask.contains(EventMask::CLOSE_WRITE) {
326 evs.push(
327 Event::new(EventKind::Access(AccessKind::Close(
328 AccessMode::Write,
329 )))
330 .add_path(path.clone()),
331 );
332 }
333 if event.mask.contains(EventMask::CLOSE_NOWRITE) {
334 evs.push(
335 Event::new(EventKind::Access(AccessKind::Close(
336 AccessMode::Read,
337 )))
338 .add_path(path.clone()),
339 );
340 }
341 if event.mask.contains(EventMask::ATTRIB) {
342 evs.push(
343 Event::new(EventKind::Modify(ModifyKind::Metadata(
344 MetadataKind::Any,
345 )))
346 .add_path(path.clone()),
347 );
348 }
349 if event.mask.contains(EventMask::OPEN) {
350 evs.push(
351 Event::new(EventKind::Access(AccessKind::Open(
352 AccessMode::Any,
353 )))
354 .add_path(path.clone()),
355 );
356 }
357
358 for ev in evs {
359 self.event_handler.handle_event(Ok(ev));
360 }
361 }
362
363 if num_events == 0 {
365 break;
366 }
367 }
368 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
369 break;
371 }
372 Err(e) => {
373 self.event_handler.handle_event(Err(Error::io(e)));
374 }
375 }
376 }
377 }
378
379 for path in remove_watches {
380 self.remove_watch(path, true).ok();
381 }
382
383 for path in add_watches {
384 if let Err(add_watch_error) = self.add_watch(path, true, false) {
385 if let ErrorKind::MaxFilesWatch = add_watch_error.kind {
389 self.event_handler.handle_event(Err(add_watch_error));
390
391 break;
395 }
396 }
397 }
398 }
399
400 fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
401 if !is_recursive || !metadata(&path).map_err(Error::io_watch)?.is_dir() {
404 return self.add_single_watch(path, false, true);
405 }
406
407 for entry in WalkDir::new(path)
408 .follow_links(self.follow_links)
409 .into_iter()
410 .filter_map(filter_dir)
411 {
412 self.add_single_watch(entry.into_path(), is_recursive, watch_self)?;
413 watch_self = false;
414 }
415
416 Ok(())
417 }
418
419 fn add_single_watch(
420 &mut self,
421 path: PathBuf,
422 is_recursive: bool,
423 watch_self: bool,
424 ) -> Result<()> {
425 let mut watchmask = WatchMask::ATTRIB
426 | WatchMask::CREATE
427 | WatchMask::OPEN
428 | WatchMask::DELETE
429 | WatchMask::CLOSE_WRITE
430 | WatchMask::MODIFY
431 | WatchMask::MOVED_FROM
432 | WatchMask::MOVED_TO;
433
434 if watch_self {
435 watchmask.insert(WatchMask::DELETE_SELF);
436 watchmask.insert(WatchMask::MOVE_SELF);
437 }
438
439 if let Some(&(_, old_watchmask, _, _)) = self.watches.get(&path) {
440 watchmask.insert(old_watchmask);
441 watchmask.insert(WatchMask::MASK_ADD);
442 }
443
444 if let Some(ref mut inotify) = self.inotify {
445 log::trace!("adding inotify watch: {}", path.display());
446
447 match inotify.watches().add(&path, watchmask) {
448 Err(e) => {
449 Err(if e.raw_os_error() == Some(libc::ENOSPC) {
450 Error::new(ErrorKind::MaxFilesWatch)
452 } else if e.kind() == std::io::ErrorKind::NotFound {
453 Error::new(ErrorKind::PathNotFound)
454 } else {
455 Error::io(e)
456 }
457 .add_path(path))
458 }
459 Ok(w) => {
460 watchmask.remove(WatchMask::MASK_ADD);
461 let is_dir = metadata(&path).map_err(Error::io)?.is_dir();
462 self.watches
463 .insert(path.clone(), (w.clone(), watchmask, is_recursive, is_dir));
464 self.paths.insert(w, path);
465 Ok(())
466 }
467 }
468 } else {
469 Ok(())
470 }
471 }
472
473 fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
474 match self.watches.remove(&path) {
475 None => return Err(Error::watch_not_found().add_path(path)),
476 Some((w, _, is_recursive, _)) => {
477 if let Some(ref mut inotify) = self.inotify {
478 let mut inotify_watches = inotify.watches();
479 log::trace!("removing inotify watch: {}", path.display());
480
481 inotify_watches
482 .remove(w.clone())
483 .map_err(|e| Error::io(e).add_path(path.clone()))?;
484 self.paths.remove(&w);
485
486 if is_recursive || remove_recursive {
487 let mut remove_list = Vec::new();
488 for (w, p) in &self.paths {
489 if p.starts_with(&path) {
490 inotify_watches
491 .remove(w.clone())
492 .map_err(|e| Error::io(e).add_path(p.into()))?;
493 self.watches.remove(p);
494 remove_list.push(w.clone());
495 }
496 }
497 for w in remove_list {
498 self.paths.remove(&w);
499 }
500 }
501 }
502 }
503 }
504 Ok(())
505 }
506
507 fn remove_all_watches(&mut self) -> Result<()> {
508 if let Some(ref mut inotify) = self.inotify {
509 let mut inotify_watches = inotify.watches();
510 for (w, p) in &self.paths {
511 inotify_watches
512 .remove(w.clone())
513 .map_err(|e| Error::io(e).add_path(p.into()))?;
514 }
515 self.watches.clear();
516 self.paths.clear();
517 }
518 Ok(())
519 }
520}
521
522fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
524 if let Ok(e) = e {
525 if let Ok(metadata) = e.metadata() {
526 if metadata.is_dir() {
527 return Some(e);
528 }
529 }
530 }
531 None
532}
533
534impl INotifyWatcher {
535 fn from_event_handler(
536 event_handler: Box<dyn EventHandler>,
537 follow_links: bool,
538 ) -> Result<Self> {
539 let inotify = Inotify::init()?;
540 let event_loop = EventLoop::new(inotify, event_handler, follow_links)?;
541 let channel = event_loop.event_loop_tx.clone();
542 let waker = event_loop.event_loop_waker.clone();
543 event_loop.run();
544 Ok(INotifyWatcher { channel, waker })
545 }
546
547 fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
548 let pb = if path.is_absolute() {
549 path.to_owned()
550 } else {
551 let p = env::current_dir().map_err(Error::io)?;
552 p.join(path)
553 };
554 let (tx, rx) = unbounded();
555 let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
556
557 self.channel.send(msg).unwrap();
559 self.waker.wake().unwrap();
560 rx.recv().unwrap()
561 }
562
563 fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
564 let pb = if path.is_absolute() {
565 path.to_owned()
566 } else {
567 let p = env::current_dir().map_err(Error::io)?;
568 p.join(path)
569 };
570 let (tx, rx) = unbounded();
571 let msg = EventLoopMsg::RemoveWatch(pb, tx);
572
573 self.channel.send(msg).unwrap();
575 self.waker.wake().unwrap();
576 rx.recv().unwrap()
577 }
578}
579
580impl Watcher for INotifyWatcher {
581 fn new<F: EventHandler>(event_handler: F, config: Config) -> Result<Self> {
583 Self::from_event_handler(Box::new(event_handler), config.follow_symlinks())
584 }
585
586 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
587 self.watch_inner(path, recursive_mode)
588 }
589
590 fn unwatch(&mut self, path: &Path) -> Result<()> {
591 self.unwatch_inner(path)
592 }
593
594 fn configure(&mut self, config: Config) -> Result<bool> {
595 let (tx, rx) = bounded(1);
596 self.channel.send(EventLoopMsg::Configure(config, tx))?;
597 self.waker.wake()?;
598 rx.recv()?
599 }
600
601 fn kind() -> crate::WatcherKind {
602 crate::WatcherKind::Inotify
603 }
604}
605
606impl Drop for INotifyWatcher {
607 fn drop(&mut self) {
608 self.channel.send(EventLoopMsg::Shutdown).unwrap();
610 self.waker.wake().unwrap();
611 }
612}
613
614#[cfg(test)]
615mod tests {
616 use std::{
617 sync::{atomic::AtomicBool, mpsc},
618 thread::available_parallelism,
619 };
620
621 use super::*;
622
623 #[test]
624 fn inotify_watcher_is_send_and_sync() {
625 fn check<T: Send + Sync>() {}
626 check::<INotifyWatcher>();
627 }
628
629 #[test]
630 fn native_error_type_on_missing_path() {
631 let mut watcher = INotifyWatcher::new(|_| {}, Config::default()).unwrap();
632
633 let result = watcher.watch(
634 &PathBuf::from("/some/non/existant/path"),
635 RecursiveMode::NonRecursive,
636 );
637
638 assert!(matches!(
639 result,
640 Err(Error {
641 paths: _,
642 kind: ErrorKind::PathNotFound
643 })
644 ))
645 }
646
647 #[test]
655 #[ignore = "requires changing sysctl fs.inotify.max_user_watches while test is running"]
656 fn recursive_watch_calls_handler_if_creating_a_file_raises_max_files_watch() {
657 use std::time::Duration;
658
659 let tmpdir = tempfile::tempdir().unwrap();
660 let (tx, rx) = std::sync::mpsc::channel();
661 let (proc_changed_tx, proc_changed_rx) = std::sync::mpsc::channel();
662 let proc_path = Path::new("/proc/sys/fs/inotify/max_user_watches");
663 let mut watcher = INotifyWatcher::new(
664 move |result: Result<Event>| match result {
665 Ok(event) => {
666 if event.paths.first().is_some_and(|path| path == proc_path) {
667 proc_changed_tx.send(()).unwrap();
668 }
669 }
670 Err(e) => tx.send(e).unwrap(),
671 },
672 Config::default(),
673 )
674 .unwrap();
675
676 watcher
677 .watch(tmpdir.path(), RecursiveMode::Recursive)
678 .unwrap();
679 watcher
680 .watch(proc_path, RecursiveMode::NonRecursive)
681 .unwrap();
682
683 proc_changed_rx
685 .recv_timeout(Duration::from_secs(30))
686 .unwrap();
687
688 let child_dir = tmpdir.path().join("child");
689 std::fs::create_dir(child_dir).unwrap();
690
691 let result = rx.recv_timeout(Duration::from_millis(500));
692
693 assert!(
694 matches!(
695 &result,
696 Ok(Error {
697 kind: ErrorKind::MaxFilesWatch,
698 paths: _,
699 })
700 ),
701 "expected {:?}, found: {:#?}",
702 ErrorKind::MaxFilesWatch,
703 result
704 );
705 }
706
707 #[test]
709 fn race_condition_on_unwatch_and_pending_events_with_deleted_descriptor() {
710 let tmpdir = tempfile::tempdir().expect("tmpdir");
711 let (tx, rx) = mpsc::channel();
712 let mut inotify = INotifyWatcher::new(
713 move |e: Result<Event>| {
714 let e = match e {
715 Ok(e) if e.paths.is_empty() => e,
716 Ok(_) | Err(_) => return,
717 };
718 let _ = tx.send(e);
719 },
720 Config::default(),
721 )
722 .expect("inotify creation");
723
724 let dir_path = tmpdir.path();
725 let file_path = dir_path.join("foo");
726 std::fs::File::create(&file_path).unwrap();
727
728 let stop = Arc::new(AtomicBool::new(false));
729
730 let handles: Vec<_> = (0..available_parallelism().unwrap().get().max(4))
731 .map(|_| {
732 let file_path = file_path.clone();
733 let stop = stop.clone();
734 thread::spawn(move || {
735 while !stop.load(std::sync::atomic::Ordering::Relaxed) {
736 let _ = std::fs::File::open(&file_path).unwrap();
737 }
738 })
739 })
740 .collect();
741
742 let non_recursive = RecursiveMode::NonRecursive;
743 for _ in 0..(handles.len() * 4) {
744 inotify.watch(dir_path, non_recursive).unwrap();
745 inotify.unwatch(dir_path).unwrap();
746 }
747
748 stop.store(true, std::sync::atomic::Ordering::Relaxed);
749 handles
750 .into_iter()
751 .for_each(|handle| handle.join().ok().unwrap_or_default());
752
753 drop(inotify);
754
755 let events: Vec<_> = rx.into_iter().map(|e| format!("{e:?}")).collect();
756
757 const LOG_LEN: usize = 10;
758 let events_len = events.len();
759 assert!(
760 events.is_empty(),
761 "expected no events without path, but got {events_len}. first 10: {:#?}",
762 &events[..LOG_LEN.min(events_len)]
763 );
764 }
765}