notify/
inotify.rs

1//! Watcher implementation for the inotify Linux API
2//!
3//! The inotify API provides a mechanism for monitoring filesystem events.  Inotify can be used to
4//! monitor individual files, or to monitor directories.  When a directory is monitored, inotify
5//! will return events for the directory itself, and for files inside the directory.
6
7use super::event::*;
8use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher};
9use crate::{bounded, unbounded, BoundSender, Receiver, Sender};
10use inotify as inotify_sys;
11use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
12use std::collections::HashMap;
13use std::env;
14use std::ffi::OsStr;
15use std::fs::metadata;
16use std::os::unix::io::AsRawFd;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::thread;
20use walkdir::WalkDir;
21
22const INOTIFY: mio::Token = mio::Token(0);
23const MESSAGE: mio::Token = mio::Token(1);
24
25// The EventLoop will set up a mio::Poll and use it to wait for the following:
26//
27// -  messages telling it what to do
28//
29// -  events telling it that something has happened on one of the watched files.
30
31struct EventLoop {
32    running: bool,
33    poll: mio::Poll,
34    event_loop_waker: Arc<mio::Waker>,
35    event_loop_tx: Sender<EventLoopMsg>,
36    event_loop_rx: Receiver<EventLoopMsg>,
37    inotify: Option<Inotify>,
38    event_handler: Box<dyn EventHandler>,
39    /// PathBuf -> (WatchDescriptor, WatchMask, is_recursive, is_dir)
40    watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
41    paths: HashMap<WatchDescriptor, PathBuf>,
42    rename_event: Option<Event>,
43    follow_links: bool,
44}
45
46/// Watcher implementation based on inotify
47#[derive(Debug)]
48pub struct INotifyWatcher {
49    channel: Sender<EventLoopMsg>,
50    waker: Arc<mio::Waker>,
51}
52
53enum EventLoopMsg {
54    AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
55    RemoveWatch(PathBuf, Sender<Result<()>>),
56    Shutdown,
57    Configure(Config, BoundSender<Result<bool>>),
58}
59
60#[inline]
61fn add_watch_by_event(
62    path: &Option<PathBuf>,
63    event: &inotify_sys::Event<&OsStr>,
64    watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
65    add_watches: &mut Vec<PathBuf>,
66) {
67    if let Some(ref path) = *path {
68        if event.mask.contains(EventMask::ISDIR) {
69            if let Some(parent_path) = path.parent() {
70                if let Some(&(_, _, is_recursive, _)) = watches.get(parent_path) {
71                    if is_recursive {
72                        add_watches.push(path.to_owned());
73                    }
74                }
75            }
76        }
77    }
78}
79
80#[inline]
81fn remove_watch_by_event(
82    path: &Option<PathBuf>,
83    watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
84    remove_watches: &mut Vec<PathBuf>,
85) {
86    if let Some(ref path) = *path {
87        if watches.contains_key(path) {
88            remove_watches.push(path.to_owned());
89        }
90    }
91}
92
93impl EventLoop {
94    pub fn new(
95        inotify: Inotify,
96        event_handler: Box<dyn EventHandler>,
97        follow_links: bool,
98    ) -> Result<Self> {
99        let (event_loop_tx, event_loop_rx) = unbounded::<EventLoopMsg>();
100        let poll = mio::Poll::new()?;
101
102        let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
103
104        let inotify_fd = inotify.as_raw_fd();
105        let mut evented_inotify = mio::unix::SourceFd(&inotify_fd);
106        poll.registry()
107            .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?;
108
109        let event_loop = EventLoop {
110            running: true,
111            poll,
112            event_loop_waker,
113            event_loop_tx,
114            event_loop_rx,
115            inotify: Some(inotify),
116            event_handler,
117            watches: HashMap::new(),
118            paths: HashMap::new(),
119            rename_event: None,
120            follow_links,
121        };
122        Ok(event_loop)
123    }
124
125    // Run the event loop.
126    pub fn run(self) {
127        let _ = thread::Builder::new()
128            .name("notify-rs inotify loop".to_string())
129            .spawn(|| self.event_loop_thread());
130    }
131
132    fn event_loop_thread(mut self) {
133        let mut events = mio::Events::with_capacity(16);
134        loop {
135            // Wait for something to happen.
136            match self.poll.poll(&mut events, None) {
137                Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
138                    // System call was interrupted, we will retry
139                    // TODO: Not covered by tests (to reproduce likely need to setup signal handlers)
140                }
141                Err(e) => panic!("poll failed: {}", e),
142                Ok(()) => {}
143            }
144
145            // Process whatever happened.
146            for event in &events {
147                self.handle_event(event);
148            }
149
150            // Stop, if we're done.
151            if !self.running {
152                break;
153            }
154        }
155    }
156
157    // Handle a single event.
158    fn handle_event(&mut self, event: &mio::event::Event) {
159        match event.token() {
160            MESSAGE => {
161                // The channel is readable - handle messages.
162                self.handle_messages()
163            }
164            INOTIFY => {
165                // inotify has something to tell us.
166                self.handle_inotify()
167            }
168            _ => unreachable!(),
169        }
170    }
171
172    fn handle_messages(&mut self) {
173        while let Ok(msg) = self.event_loop_rx.try_recv() {
174            match msg {
175                EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
176                    let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
177                }
178                EventLoopMsg::RemoveWatch(path, tx) => {
179                    let _ = tx.send(self.remove_watch(path, false));
180                }
181                EventLoopMsg::Shutdown => {
182                    let _ = self.remove_all_watches();
183                    if let Some(inotify) = self.inotify.take() {
184                        let _ = inotify.close();
185                    }
186                    self.running = false;
187                    break;
188                }
189                EventLoopMsg::Configure(config, tx) => {
190                    self.configure_raw_mode(config, tx);
191                }
192            }
193        }
194    }
195
196    fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) {
197        tx.send(Ok(false))
198            .expect("configuration channel disconnected");
199    }
200
201    fn handle_inotify(&mut self) {
202        let mut add_watches = Vec::new();
203        let mut remove_watches = Vec::new();
204
205        if let Some(ref mut inotify) = self.inotify {
206            let mut buffer = [0; 1024];
207            // Read all buffers available.
208            loop {
209                match inotify.read_events(&mut buffer) {
210                    Ok(events) => {
211                        let mut num_events = 0;
212                        for event in events {
213                            log::trace!("inotify event: {event:?}");
214
215                            num_events += 1;
216                            if event.mask.contains(EventMask::Q_OVERFLOW) {
217                                let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
218                                self.event_handler.handle_event(ev);
219                            }
220
221                            let path = match event.name {
222                                Some(name) => self.paths.get(&event.wd).map(|root| root.join(name)),
223                                None => self.paths.get(&event.wd).cloned(),
224                            };
225
226                            let mut evs = Vec::new();
227
228                            if event.mask.contains(EventMask::MOVED_FROM) {
229                                remove_watch_by_event(&path, &self.watches, &mut remove_watches);
230
231                                let event = Event::new(EventKind::Modify(ModifyKind::Name(
232                                    RenameMode::From,
233                                )))
234                                .add_some_path(path.clone())
235                                .set_tracker(event.cookie as usize);
236
237                                self.rename_event = Some(event.clone());
238
239                                evs.push(event);
240                            } else if event.mask.contains(EventMask::MOVED_TO) {
241                                evs.push(
242                                    Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::To)))
243                                        .set_tracker(event.cookie as usize)
244                                        .add_some_path(path.clone()),
245                                );
246
247                                let trackers_match = self
248                                    .rename_event
249                                    .as_ref()
250                                    .and_then(|e| e.tracker())
251                                    .map_or(false, |from_tracker| {
252                                        from_tracker == event.cookie as usize
253                                    });
254
255                                if trackers_match {
256                                    let rename_event = self.rename_event.take().unwrap(); // unwrap is safe because `rename_event` must be set at this point
257                                    evs.push(
258                                        Event::new(EventKind::Modify(ModifyKind::Name(
259                                            RenameMode::Both,
260                                        )))
261                                        .set_tracker(event.cookie as usize)
262                                        .add_some_path(rename_event.paths.first().cloned())
263                                        .add_some_path(path.clone()),
264                                    );
265                                }
266                                add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
267                            }
268                            if event.mask.contains(EventMask::MOVE_SELF) {
269                                evs.push(
270                                    Event::new(EventKind::Modify(ModifyKind::Name(
271                                        RenameMode::From,
272                                    )))
273                                    .add_some_path(path.clone()),
274                                );
275                                // TODO stat the path and get to new path
276                                // - emit To and Both events
277                                // - change prefix for further events
278                            }
279                            if event.mask.contains(EventMask::CREATE) {
280                                evs.push(
281                                    Event::new(EventKind::Create(
282                                        if event.mask.contains(EventMask::ISDIR) {
283                                            CreateKind::Folder
284                                        } else {
285                                            CreateKind::File
286                                        },
287                                    ))
288                                    .add_some_path(path.clone()),
289                                );
290                                add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
291                            }
292                            if event.mask.contains(EventMask::DELETE) {
293                                evs.push(
294                                    Event::new(EventKind::Remove(
295                                        if event.mask.contains(EventMask::ISDIR) {
296                                            RemoveKind::Folder
297                                        } else {
298                                            RemoveKind::File
299                                        },
300                                    ))
301                                    .add_some_path(path.clone()),
302                                );
303                                remove_watch_by_event(&path, &self.watches, &mut remove_watches);
304                            }
305                            if event.mask.contains(EventMask::DELETE_SELF) {
306                                let remove_kind = match &path {
307                                    Some(watched_path) => {
308                                        let current_watch = self.watches.get(watched_path);
309                                        match current_watch {
310                                            Some(&(_, _, _, true)) => RemoveKind::Folder,
311                                            Some(&(_, _, _, false)) => RemoveKind::File,
312                                            None => RemoveKind::Other,
313                                        }
314                                    }
315                                    None => {
316                                        log::trace!(
317                                            "No patch for DELETE_SELF event, may be a bug?"
318                                        );
319                                        RemoveKind::Other
320                                    }
321                                };
322                                evs.push(
323                                    Event::new(EventKind::Remove(remove_kind))
324                                        .add_some_path(path.clone()),
325                                );
326                                remove_watch_by_event(&path, &self.watches, &mut remove_watches);
327                            }
328                            if event.mask.contains(EventMask::MODIFY) {
329                                evs.push(
330                                    Event::new(EventKind::Modify(ModifyKind::Data(
331                                        DataChange::Any,
332                                    )))
333                                    .add_some_path(path.clone()),
334                                );
335                            }
336                            if event.mask.contains(EventMask::CLOSE_WRITE) {
337                                evs.push(
338                                    Event::new(EventKind::Access(AccessKind::Close(
339                                        AccessMode::Write,
340                                    )))
341                                    .add_some_path(path.clone()),
342                                );
343                            }
344                            if event.mask.contains(EventMask::CLOSE_NOWRITE) {
345                                evs.push(
346                                    Event::new(EventKind::Access(AccessKind::Close(
347                                        AccessMode::Read,
348                                    )))
349                                    .add_some_path(path.clone()),
350                                );
351                            }
352                            if event.mask.contains(EventMask::ATTRIB) {
353                                evs.push(
354                                    Event::new(EventKind::Modify(ModifyKind::Metadata(
355                                        MetadataKind::Any,
356                                    )))
357                                    .add_some_path(path.clone()),
358                                );
359                            }
360                            if event.mask.contains(EventMask::OPEN) {
361                                evs.push(
362                                    Event::new(EventKind::Access(AccessKind::Open(
363                                        AccessMode::Any,
364                                    )))
365                                    .add_some_path(path.clone()),
366                                );
367                            }
368
369                            for ev in evs {
370                                self.event_handler.handle_event(Ok(ev));
371                            }
372                        }
373
374                        // All events read. Break out.
375                        if num_events == 0 {
376                            break;
377                        }
378                    }
379                    Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
380                        // No events read. Break out.
381                        break;
382                    }
383                    Err(e) => {
384                        self.event_handler.handle_event(Err(Error::io(e)));
385                    }
386                }
387            }
388        }
389
390        for path in remove_watches {
391            self.remove_watch(path, true).ok();
392        }
393
394        for path in add_watches {
395            self.add_watch(path, true, false).ok();
396        }
397    }
398
399    fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
400        // If the watch is not recursive, or if we determine (by stat'ing the path to get its
401        // metadata) that the watched path is not a directory, add a single path watch.
402        if !is_recursive || !metadata(&path).map_err(Error::io_watch)?.is_dir() {
403            return self.add_single_watch(path, false, true);
404        }
405
406        for entry in WalkDir::new(path)
407            .follow_links(self.follow_links)
408            .into_iter()
409            .filter_map(filter_dir)
410        {
411            self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self)?;
412            watch_self = false;
413        }
414
415        Ok(())
416    }
417
418    fn add_single_watch(
419        &mut self,
420        path: PathBuf,
421        is_recursive: bool,
422        watch_self: bool,
423    ) -> Result<()> {
424        let mut watchmask = WatchMask::ATTRIB
425            | WatchMask::CREATE
426            | WatchMask::OPEN
427            | WatchMask::DELETE
428            | WatchMask::CLOSE_WRITE
429            | WatchMask::MODIFY
430            | WatchMask::MOVED_FROM
431            | WatchMask::MOVED_TO;
432
433        if watch_self {
434            watchmask.insert(WatchMask::DELETE_SELF);
435            watchmask.insert(WatchMask::MOVE_SELF);
436        }
437
438        if let Some(&(_, old_watchmask, _, _)) = self.watches.get(&path) {
439            watchmask.insert(old_watchmask);
440            watchmask.insert(WatchMask::MASK_ADD);
441        }
442
443        if let Some(ref mut inotify) = self.inotify {
444            log::trace!("adding inotify watch: {}", path.display());
445
446            match inotify.watches().add(&path, watchmask) {
447                Err(e) => {
448                    Err(if e.raw_os_error() == Some(libc::ENOSPC) {
449                        // do not report inotify limits as "no more space" on linux #266
450                        Error::new(ErrorKind::MaxFilesWatch)
451                    } else {
452                        Error::io(e)
453                    }
454                    .add_path(path))
455                }
456                Ok(w) => {
457                    watchmask.remove(WatchMask::MASK_ADD);
458                    let is_dir = metadata(&path).map_err(Error::io)?.is_dir();
459                    self.watches
460                        .insert(path.clone(), (w.clone(), watchmask, is_recursive, is_dir));
461                    self.paths.insert(w, path);
462                    Ok(())
463                }
464            }
465        } else {
466            Ok(())
467        }
468    }
469
470    fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
471        match self.watches.remove(&path) {
472            None => return Err(Error::watch_not_found().add_path(path)),
473            Some((w, _, is_recursive, _)) => {
474                if let Some(ref mut inotify) = self.inotify {
475                    let mut inotify_watches = inotify.watches();
476                    log::trace!("removing inotify watch: {}", path.display());
477
478                    inotify_watches
479                        .remove(w.clone())
480                        .map_err(|e| Error::io(e).add_path(path.clone()))?;
481                    self.paths.remove(&w);
482
483                    if is_recursive || remove_recursive {
484                        let mut remove_list = Vec::new();
485                        for (w, p) in &self.paths {
486                            if p.starts_with(&path) {
487                                inotify_watches
488                                    .remove(w.clone())
489                                    .map_err(|e| Error::io(e).add_path(p.into()))?;
490                                self.watches.remove(p);
491                                remove_list.push(w.clone());
492                            }
493                        }
494                        for w in remove_list {
495                            self.paths.remove(&w);
496                        }
497                    }
498                }
499            }
500        }
501        Ok(())
502    }
503
504    fn remove_all_watches(&mut self) -> Result<()> {
505        if let Some(ref mut inotify) = self.inotify {
506            let mut inotify_watches = inotify.watches();
507            for (w, p) in &self.paths {
508                inotify_watches
509                    .remove(w.clone())
510                    .map_err(|e| Error::io(e).add_path(p.into()))?;
511            }
512            self.watches.clear();
513            self.paths.clear();
514        }
515        Ok(())
516    }
517}
518
519/// return `DirEntry` when it is a directory
520fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
521    if let Ok(e) = e {
522        if let Ok(metadata) = e.metadata() {
523            if metadata.is_dir() {
524                return Some(e);
525            }
526        }
527    }
528    None
529}
530
531impl INotifyWatcher {
532    fn from_event_handler(
533        event_handler: Box<dyn EventHandler>,
534        follow_links: bool,
535    ) -> Result<Self> {
536        let inotify = Inotify::init()?;
537        let event_loop = EventLoop::new(inotify, event_handler, follow_links)?;
538        let channel = event_loop.event_loop_tx.clone();
539        let waker = event_loop.event_loop_waker.clone();
540        event_loop.run();
541        Ok(INotifyWatcher { channel, waker })
542    }
543
544    fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
545        let pb = if path.is_absolute() {
546            path.to_owned()
547        } else {
548            let p = env::current_dir().map_err(Error::io)?;
549            p.join(path)
550        };
551        let (tx, rx) = unbounded();
552        let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
553
554        // we expect the event loop to live and reply => unwraps must not panic
555        self.channel.send(msg).unwrap();
556        self.waker.wake().unwrap();
557        rx.recv().unwrap()
558    }
559
560    fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
561        let pb = if path.is_absolute() {
562            path.to_owned()
563        } else {
564            let p = env::current_dir().map_err(Error::io)?;
565            p.join(path)
566        };
567        let (tx, rx) = unbounded();
568        let msg = EventLoopMsg::RemoveWatch(pb, tx);
569
570        // we expect the event loop to live and reply => unwraps must not panic
571        self.channel.send(msg).unwrap();
572        self.waker.wake().unwrap();
573        rx.recv().unwrap()
574    }
575}
576
577impl Watcher for INotifyWatcher {
578    /// Create a new watcher.
579    fn new<F: EventHandler>(event_handler: F, config: Config) -> Result<Self> {
580        Self::from_event_handler(Box::new(event_handler), config.follow_symlinks())
581    }
582
583    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
584        self.watch_inner(path, recursive_mode)
585    }
586
587    fn unwatch(&mut self, path: &Path) -> Result<()> {
588        self.unwatch_inner(path)
589    }
590
591    fn configure(&mut self, config: Config) -> Result<bool> {
592        let (tx, rx) = bounded(1);
593        self.channel.send(EventLoopMsg::Configure(config, tx))?;
594        self.waker.wake()?;
595        rx.recv()?
596    }
597
598    fn kind() -> crate::WatcherKind {
599        crate::WatcherKind::Inotify
600    }
601}
602
603impl Drop for INotifyWatcher {
604    fn drop(&mut self) {
605        // we expect the event loop to live => unwrap must not panic
606        self.channel.send(EventLoopMsg::Shutdown).unwrap();
607        self.waker.wake().unwrap();
608    }
609}
610
611#[test]
612fn inotify_watcher_is_send_and_sync() {
613    fn check<T: Send + Sync>() {}
614    check::<INotifyWatcher>();
615}