1use super::event::*;
8use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher};
9use crate::{bounded, unbounded, BoundSender, Receiver, Sender};
10use inotify as inotify_sys;
11use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
12use std::collections::HashMap;
13use std::env;
14use std::ffi::OsStr;
15use std::fs::metadata;
16use std::os::unix::io::AsRawFd;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::thread;
20use walkdir::WalkDir;
21
22const INOTIFY: mio::Token = mio::Token(0);
23const MESSAGE: mio::Token = mio::Token(1);
24
25struct EventLoop {
32 running: bool,
33 poll: mio::Poll,
34 event_loop_waker: Arc<mio::Waker>,
35 event_loop_tx: Sender<EventLoopMsg>,
36 event_loop_rx: Receiver<EventLoopMsg>,
37 inotify: Option<Inotify>,
38 event_handler: Box<dyn EventHandler>,
39 watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
41 paths: HashMap<WatchDescriptor, PathBuf>,
42 rename_event: Option<Event>,
43 follow_links: bool,
44}
45
46#[derive(Debug)]
48pub struct INotifyWatcher {
49 channel: Sender<EventLoopMsg>,
50 waker: Arc<mio::Waker>,
51}
52
53enum EventLoopMsg {
54 AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
55 RemoveWatch(PathBuf, Sender<Result<()>>),
56 Shutdown,
57 Configure(Config, BoundSender<Result<bool>>),
58}
59
60#[inline]
61fn add_watch_by_event(
62 path: &Option<PathBuf>,
63 event: &inotify_sys::Event<&OsStr>,
64 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
65 add_watches: &mut Vec<PathBuf>,
66) {
67 if let Some(ref path) = *path {
68 if event.mask.contains(EventMask::ISDIR) {
69 if let Some(parent_path) = path.parent() {
70 if let Some(&(_, _, is_recursive, _)) = watches.get(parent_path) {
71 if is_recursive {
72 add_watches.push(path.to_owned());
73 }
74 }
75 }
76 }
77 }
78}
79
80#[inline]
81fn remove_watch_by_event(
82 path: &Option<PathBuf>,
83 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
84 remove_watches: &mut Vec<PathBuf>,
85) {
86 if let Some(ref path) = *path {
87 if watches.contains_key(path) {
88 remove_watches.push(path.to_owned());
89 }
90 }
91}
92
93impl EventLoop {
94 pub fn new(
95 inotify: Inotify,
96 event_handler: Box<dyn EventHandler>,
97 follow_links: bool,
98 ) -> Result<Self> {
99 let (event_loop_tx, event_loop_rx) = unbounded::<EventLoopMsg>();
100 let poll = mio::Poll::new()?;
101
102 let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
103
104 let inotify_fd = inotify.as_raw_fd();
105 let mut evented_inotify = mio::unix::SourceFd(&inotify_fd);
106 poll.registry()
107 .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?;
108
109 let event_loop = EventLoop {
110 running: true,
111 poll,
112 event_loop_waker,
113 event_loop_tx,
114 event_loop_rx,
115 inotify: Some(inotify),
116 event_handler,
117 watches: HashMap::new(),
118 paths: HashMap::new(),
119 rename_event: None,
120 follow_links,
121 };
122 Ok(event_loop)
123 }
124
125 pub fn run(self) {
127 let _ = thread::Builder::new()
128 .name("notify-rs inotify loop".to_string())
129 .spawn(|| self.event_loop_thread());
130 }
131
132 fn event_loop_thread(mut self) {
133 let mut events = mio::Events::with_capacity(16);
134 loop {
135 match self.poll.poll(&mut events, None) {
137 Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
138 }
141 Err(e) => panic!("poll failed: {}", e),
142 Ok(()) => {}
143 }
144
145 for event in &events {
147 self.handle_event(event);
148 }
149
150 if !self.running {
152 break;
153 }
154 }
155 }
156
157 fn handle_event(&mut self, event: &mio::event::Event) {
159 match event.token() {
160 MESSAGE => {
161 self.handle_messages()
163 }
164 INOTIFY => {
165 self.handle_inotify()
167 }
168 _ => unreachable!(),
169 }
170 }
171
172 fn handle_messages(&mut self) {
173 while let Ok(msg) = self.event_loop_rx.try_recv() {
174 match msg {
175 EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
176 let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
177 }
178 EventLoopMsg::RemoveWatch(path, tx) => {
179 let _ = tx.send(self.remove_watch(path, false));
180 }
181 EventLoopMsg::Shutdown => {
182 let _ = self.remove_all_watches();
183 if let Some(inotify) = self.inotify.take() {
184 let _ = inotify.close();
185 }
186 self.running = false;
187 break;
188 }
189 EventLoopMsg::Configure(config, tx) => {
190 self.configure_raw_mode(config, tx);
191 }
192 }
193 }
194 }
195
196 fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) {
197 tx.send(Ok(false))
198 .expect("configuration channel disconnected");
199 }
200
201 fn handle_inotify(&mut self) {
202 let mut add_watches = Vec::new();
203 let mut remove_watches = Vec::new();
204
205 if let Some(ref mut inotify) = self.inotify {
206 let mut buffer = [0; 1024];
207 loop {
209 match inotify.read_events(&mut buffer) {
210 Ok(events) => {
211 let mut num_events = 0;
212 for event in events {
213 log::trace!("inotify event: {event:?}");
214
215 num_events += 1;
216 if event.mask.contains(EventMask::Q_OVERFLOW) {
217 let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
218 self.event_handler.handle_event(ev);
219 }
220
221 let path = match event.name {
222 Some(name) => self.paths.get(&event.wd).map(|root| root.join(name)),
223 None => self.paths.get(&event.wd).cloned(),
224 };
225
226 let mut evs = Vec::new();
227
228 if event.mask.contains(EventMask::MOVED_FROM) {
229 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
230
231 let event = Event::new(EventKind::Modify(ModifyKind::Name(
232 RenameMode::From,
233 )))
234 .add_some_path(path.clone())
235 .set_tracker(event.cookie as usize);
236
237 self.rename_event = Some(event.clone());
238
239 evs.push(event);
240 } else if event.mask.contains(EventMask::MOVED_TO) {
241 evs.push(
242 Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::To)))
243 .set_tracker(event.cookie as usize)
244 .add_some_path(path.clone()),
245 );
246
247 let trackers_match = self
248 .rename_event
249 .as_ref()
250 .and_then(|e| e.tracker())
251 .map_or(false, |from_tracker| {
252 from_tracker == event.cookie as usize
253 });
254
255 if trackers_match {
256 let rename_event = self.rename_event.take().unwrap(); evs.push(
258 Event::new(EventKind::Modify(ModifyKind::Name(
259 RenameMode::Both,
260 )))
261 .set_tracker(event.cookie as usize)
262 .add_some_path(rename_event.paths.first().cloned())
263 .add_some_path(path.clone()),
264 );
265 }
266 add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
267 }
268 if event.mask.contains(EventMask::MOVE_SELF) {
269 evs.push(
270 Event::new(EventKind::Modify(ModifyKind::Name(
271 RenameMode::From,
272 )))
273 .add_some_path(path.clone()),
274 );
275 }
279 if event.mask.contains(EventMask::CREATE) {
280 evs.push(
281 Event::new(EventKind::Create(
282 if event.mask.contains(EventMask::ISDIR) {
283 CreateKind::Folder
284 } else {
285 CreateKind::File
286 },
287 ))
288 .add_some_path(path.clone()),
289 );
290 add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
291 }
292 if event.mask.contains(EventMask::DELETE) {
293 evs.push(
294 Event::new(EventKind::Remove(
295 if event.mask.contains(EventMask::ISDIR) {
296 RemoveKind::Folder
297 } else {
298 RemoveKind::File
299 },
300 ))
301 .add_some_path(path.clone()),
302 );
303 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
304 }
305 if event.mask.contains(EventMask::DELETE_SELF) {
306 let remove_kind = match &path {
307 Some(watched_path) => {
308 let current_watch = self.watches.get(watched_path);
309 match current_watch {
310 Some(&(_, _, _, true)) => RemoveKind::Folder,
311 Some(&(_, _, _, false)) => RemoveKind::File,
312 None => RemoveKind::Other,
313 }
314 }
315 None => {
316 log::trace!(
317 "No patch for DELETE_SELF event, may be a bug?"
318 );
319 RemoveKind::Other
320 }
321 };
322 evs.push(
323 Event::new(EventKind::Remove(remove_kind))
324 .add_some_path(path.clone()),
325 );
326 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
327 }
328 if event.mask.contains(EventMask::MODIFY) {
329 evs.push(
330 Event::new(EventKind::Modify(ModifyKind::Data(
331 DataChange::Any,
332 )))
333 .add_some_path(path.clone()),
334 );
335 }
336 if event.mask.contains(EventMask::CLOSE_WRITE) {
337 evs.push(
338 Event::new(EventKind::Access(AccessKind::Close(
339 AccessMode::Write,
340 )))
341 .add_some_path(path.clone()),
342 );
343 }
344 if event.mask.contains(EventMask::CLOSE_NOWRITE) {
345 evs.push(
346 Event::new(EventKind::Access(AccessKind::Close(
347 AccessMode::Read,
348 )))
349 .add_some_path(path.clone()),
350 );
351 }
352 if event.mask.contains(EventMask::ATTRIB) {
353 evs.push(
354 Event::new(EventKind::Modify(ModifyKind::Metadata(
355 MetadataKind::Any,
356 )))
357 .add_some_path(path.clone()),
358 );
359 }
360 if event.mask.contains(EventMask::OPEN) {
361 evs.push(
362 Event::new(EventKind::Access(AccessKind::Open(
363 AccessMode::Any,
364 )))
365 .add_some_path(path.clone()),
366 );
367 }
368
369 for ev in evs {
370 self.event_handler.handle_event(Ok(ev));
371 }
372 }
373
374 if num_events == 0 {
376 break;
377 }
378 }
379 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
380 break;
382 }
383 Err(e) => {
384 self.event_handler.handle_event(Err(Error::io(e)));
385 }
386 }
387 }
388 }
389
390 for path in remove_watches {
391 self.remove_watch(path, true).ok();
392 }
393
394 for path in add_watches {
395 self.add_watch(path, true, false).ok();
396 }
397 }
398
399 fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
400 if !is_recursive || !metadata(&path).map_err(Error::io_watch)?.is_dir() {
403 return self.add_single_watch(path, false, true);
404 }
405
406 for entry in WalkDir::new(path)
407 .follow_links(self.follow_links)
408 .into_iter()
409 .filter_map(filter_dir)
410 {
411 self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self)?;
412 watch_self = false;
413 }
414
415 Ok(())
416 }
417
418 fn add_single_watch(
419 &mut self,
420 path: PathBuf,
421 is_recursive: bool,
422 watch_self: bool,
423 ) -> Result<()> {
424 let mut watchmask = WatchMask::ATTRIB
425 | WatchMask::CREATE
426 | WatchMask::OPEN
427 | WatchMask::DELETE
428 | WatchMask::CLOSE_WRITE
429 | WatchMask::MODIFY
430 | WatchMask::MOVED_FROM
431 | WatchMask::MOVED_TO;
432
433 if watch_self {
434 watchmask.insert(WatchMask::DELETE_SELF);
435 watchmask.insert(WatchMask::MOVE_SELF);
436 }
437
438 if let Some(&(_, old_watchmask, _, _)) = self.watches.get(&path) {
439 watchmask.insert(old_watchmask);
440 watchmask.insert(WatchMask::MASK_ADD);
441 }
442
443 if let Some(ref mut inotify) = self.inotify {
444 log::trace!("adding inotify watch: {}", path.display());
445
446 match inotify.watches().add(&path, watchmask) {
447 Err(e) => {
448 Err(if e.raw_os_error() == Some(libc::ENOSPC) {
449 Error::new(ErrorKind::MaxFilesWatch)
451 } else {
452 Error::io(e)
453 }
454 .add_path(path))
455 }
456 Ok(w) => {
457 watchmask.remove(WatchMask::MASK_ADD);
458 let is_dir = metadata(&path).map_err(Error::io)?.is_dir();
459 self.watches
460 .insert(path.clone(), (w.clone(), watchmask, is_recursive, is_dir));
461 self.paths.insert(w, path);
462 Ok(())
463 }
464 }
465 } else {
466 Ok(())
467 }
468 }
469
470 fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
471 match self.watches.remove(&path) {
472 None => return Err(Error::watch_not_found().add_path(path)),
473 Some((w, _, is_recursive, _)) => {
474 if let Some(ref mut inotify) = self.inotify {
475 let mut inotify_watches = inotify.watches();
476 log::trace!("removing inotify watch: {}", path.display());
477
478 inotify_watches
479 .remove(w.clone())
480 .map_err(|e| Error::io(e).add_path(path.clone()))?;
481 self.paths.remove(&w);
482
483 if is_recursive || remove_recursive {
484 let mut remove_list = Vec::new();
485 for (w, p) in &self.paths {
486 if p.starts_with(&path) {
487 inotify_watches
488 .remove(w.clone())
489 .map_err(|e| Error::io(e).add_path(p.into()))?;
490 self.watches.remove(p);
491 remove_list.push(w.clone());
492 }
493 }
494 for w in remove_list {
495 self.paths.remove(&w);
496 }
497 }
498 }
499 }
500 }
501 Ok(())
502 }
503
504 fn remove_all_watches(&mut self) -> Result<()> {
505 if let Some(ref mut inotify) = self.inotify {
506 let mut inotify_watches = inotify.watches();
507 for (w, p) in &self.paths {
508 inotify_watches
509 .remove(w.clone())
510 .map_err(|e| Error::io(e).add_path(p.into()))?;
511 }
512 self.watches.clear();
513 self.paths.clear();
514 }
515 Ok(())
516 }
517}
518
519fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
521 if let Ok(e) = e {
522 if let Ok(metadata) = e.metadata() {
523 if metadata.is_dir() {
524 return Some(e);
525 }
526 }
527 }
528 None
529}
530
531impl INotifyWatcher {
532 fn from_event_handler(
533 event_handler: Box<dyn EventHandler>,
534 follow_links: bool,
535 ) -> Result<Self> {
536 let inotify = Inotify::init()?;
537 let event_loop = EventLoop::new(inotify, event_handler, follow_links)?;
538 let channel = event_loop.event_loop_tx.clone();
539 let waker = event_loop.event_loop_waker.clone();
540 event_loop.run();
541 Ok(INotifyWatcher { channel, waker })
542 }
543
544 fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
545 let pb = if path.is_absolute() {
546 path.to_owned()
547 } else {
548 let p = env::current_dir().map_err(Error::io)?;
549 p.join(path)
550 };
551 let (tx, rx) = unbounded();
552 let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
553
554 self.channel.send(msg).unwrap();
556 self.waker.wake().unwrap();
557 rx.recv().unwrap()
558 }
559
560 fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
561 let pb = if path.is_absolute() {
562 path.to_owned()
563 } else {
564 let p = env::current_dir().map_err(Error::io)?;
565 p.join(path)
566 };
567 let (tx, rx) = unbounded();
568 let msg = EventLoopMsg::RemoveWatch(pb, tx);
569
570 self.channel.send(msg).unwrap();
572 self.waker.wake().unwrap();
573 rx.recv().unwrap()
574 }
575}
576
577impl Watcher for INotifyWatcher {
578 fn new<F: EventHandler>(event_handler: F, config: Config) -> Result<Self> {
580 Self::from_event_handler(Box::new(event_handler), config.follow_symlinks())
581 }
582
583 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
584 self.watch_inner(path, recursive_mode)
585 }
586
587 fn unwatch(&mut self, path: &Path) -> Result<()> {
588 self.unwatch_inner(path)
589 }
590
591 fn configure(&mut self, config: Config) -> Result<bool> {
592 let (tx, rx) = bounded(1);
593 self.channel.send(EventLoopMsg::Configure(config, tx))?;
594 self.waker.wake()?;
595 rx.recv()?
596 }
597
598 fn kind() -> crate::WatcherKind {
599 crate::WatcherKind::Inotify
600 }
601}
602
603impl Drop for INotifyWatcher {
604 fn drop(&mut self) {
605 self.channel.send(EventLoopMsg::Shutdown).unwrap();
607 self.waker.wake().unwrap();
608 }
609}
610
611#[test]
612fn inotify_watcher_is_send_and_sync() {
613 fn check<T: Send + Sync>() {}
614 check::<INotifyWatcher>();
615}