notify/
poll.rs

1//! Generic Watcher implementation based on polling
2//!
3//! Checks the `watch`ed paths periodically to detect changes. This implementation only uses
4//! Rust stdlib APIs and should work on all of the platforms it supports.
5
6use crate::{unbounded, Config, Error, EventHandler, Receiver, RecursiveMode, Sender, Watcher};
7use std::{
8    collections::HashMap,
9    path::{Path, PathBuf},
10    sync::{
11        atomic::{AtomicBool, Ordering},
12        Arc, Mutex,
13    },
14    thread,
15    time::Duration,
16};
17
18/// Event sent for registered handlers on initial directory scans
19pub type ScanEvent = crate::Result<PathBuf>;
20
21/// Handler trait for receivers of [`ScanEvent`].
22/// Very much the same as [`EventHandler`], but including the Result.
23///
24/// See the full example for more information.
25pub trait ScanEventHandler: Send + 'static {
26    /// Handles an event.
27    fn handle_event(&mut self, event: ScanEvent);
28}
29
30impl<F> ScanEventHandler for F
31where
32    F: FnMut(ScanEvent) + Send + 'static,
33{
34    fn handle_event(&mut self, event: ScanEvent) {
35        (self)(event);
36    }
37}
38
39#[cfg(feature = "crossbeam-channel")]
40impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
41    fn handle_event(&mut self, event: ScanEvent) {
42        let _ = self.send(event);
43    }
44}
45
46impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
47    fn handle_event(&mut self, event: ScanEvent) {
48        let _ = self.send(event);
49    }
50}
51
52impl ScanEventHandler for () {
53    fn handle_event(&mut self, _event: ScanEvent) {}
54}
55
56use data::{DataBuilder, WatchData};
57mod data {
58    use crate::{
59        event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
60        EventHandler,
61    };
62    use filetime::FileTime;
63    use std::{
64        cell::RefCell,
65        collections::{hash_map::RandomState, HashMap},
66        fmt::{self, Debug},
67        fs::{self, File, Metadata},
68        hash::{BuildHasher, Hasher},
69        io::{self, Read},
70        path::{Path, PathBuf},
71        time::Instant,
72    };
73    use walkdir::WalkDir;
74
75    use super::ScanEventHandler;
76
77    /// Builder for [`WatchData`] & [`PathData`].
78    pub(super) struct DataBuilder {
79        emitter: EventEmitter,
80        scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
81
82        // TODO: May allow user setup their custom BuildHasher / BuildHasherDefault
83        // in future.
84        build_hasher: Option<RandomState>,
85
86        // current timestamp for building Data.
87        now: Instant,
88    }
89
90    impl DataBuilder {
91        pub(super) fn new<F, G>(
92            event_handler: F,
93            compare_content: bool,
94            scan_emitter: Option<G>,
95        ) -> Self
96        where
97            F: EventHandler,
98            G: ScanEventHandler,
99        {
100            let scan_emitter = match scan_emitter {
101                None => None,
102                Some(v) => {
103                    // workaround for a weird type resolution bug when directly going to dyn Trait
104                    let intermediate: Box<RefCell<dyn ScanEventHandler>> =
105                        Box::new(RefCell::new(v));
106                    Some(intermediate)
107                }
108            };
109            Self {
110                emitter: EventEmitter::new(event_handler),
111                scan_emitter,
112                build_hasher: compare_content.then(RandomState::default),
113                now: Instant::now(),
114            }
115        }
116
117        /// Update internal timestamp.
118        pub(super) fn update_timestamp(&mut self) {
119            self.now = Instant::now();
120        }
121
122        /// Create [`WatchData`].
123        ///
124        /// This function will return `Err(_)` if can not retrieve metadata from
125        /// the path location. (e.g., not found).
126        pub(super) fn build_watch_data(
127            &self,
128            root: PathBuf,
129            is_recursive: bool,
130            follow_symlinks: bool,
131        ) -> Option<WatchData> {
132            WatchData::new(self, root, is_recursive, follow_symlinks)
133        }
134
135        /// Create [`PathData`].
136        fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
137            PathData::new(self, meta_path)
138        }
139    }
140
141    impl Debug for DataBuilder {
142        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
143            f.debug_struct("DataBuilder")
144                .field("build_hasher", &self.build_hasher)
145                .field("now", &self.now)
146                .finish()
147        }
148    }
149
150    #[derive(Debug)]
151    pub(super) struct WatchData {
152        // config part, won't change.
153        root: PathBuf,
154        is_recursive: bool,
155        follow_symlinks: bool,
156
157        // current status part.
158        all_path_data: HashMap<PathBuf, PathData>,
159    }
160
161    impl WatchData {
162        /// Scan filesystem and create a new `WatchData`.
163        ///
164        /// # Side effect
165        ///
166        /// This function may send event by `data_builder.emitter`.
167        fn new(
168            data_builder: &DataBuilder,
169            root: PathBuf,
170            is_recursive: bool,
171            follow_symlinks: bool,
172        ) -> Option<Self> {
173            // If metadata read error at `root` path, it will emit
174            // a error event and stop to create the whole `WatchData`.
175            //
176            // QUESTION: inconsistent?
177            //
178            // When user try to *CREATE* a watch by `poll_watcher.watch(root, ..)`,
179            // if `root` path hit an io error, then watcher will reject to
180            // create this new watch.
181            //
182            // This may inconsistent with *POLLING* a watch. When watcher
183            // continue polling, io error at root path will not delete
184            // a existing watch. polling still working.
185            //
186            // So, consider a config file may not exists at first time but may
187            // create after a while, developer cannot watch it.
188            //
189            // FIXME: Can we always allow to watch a path, even file not
190            // found at this path?
191            if let Err(e) = fs::metadata(&root) {
192                data_builder.emitter.emit_io_err(e, Some(&root));
193                return None;
194            }
195
196            let all_path_data = Self::scan_all_path_data(
197                data_builder,
198                root.clone(),
199                is_recursive,
200                follow_symlinks,
201                true,
202            )
203            .collect();
204
205            Some(Self {
206                root,
207                is_recursive,
208                follow_symlinks,
209                all_path_data,
210            })
211        }
212
213        /// Rescan filesystem and update this `WatchData`.
214        ///
215        /// # Side effect
216        ///
217        /// This function may emit event by `data_builder.emitter`.
218        pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
219            // scan current filesystem.
220            for (path, new_path_data) in Self::scan_all_path_data(
221                data_builder,
222                self.root.clone(),
223                self.is_recursive,
224                self.follow_symlinks,
225                false,
226            ) {
227                let old_path_data = self
228                    .all_path_data
229                    .insert(path.clone(), new_path_data.clone());
230
231                // emit event
232                let event =
233                    PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data));
234                if let Some(event) = event {
235                    data_builder.emitter.emit_ok(event);
236                }
237            }
238
239            // scan for disappeared paths.
240            let mut disappeared_paths = Vec::new();
241            for (path, path_data) in self.all_path_data.iter() {
242                if path_data.last_check < data_builder.now {
243                    disappeared_paths.push(path.clone());
244                }
245            }
246
247            // remove disappeared paths
248            for path in disappeared_paths {
249                let old_path_data = self.all_path_data.remove(&path);
250
251                // emit event
252                let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
253                if let Some(event) = event {
254                    data_builder.emitter.emit_ok(event);
255                }
256            }
257        }
258
259        /// Get all `PathData` by given configuration.
260        ///
261        /// # Side Effect
262        ///
263        /// This function may emit some IO Error events by `data_builder.emitter`.
264        fn scan_all_path_data(
265            data_builder: &'_ DataBuilder,
266            root: PathBuf,
267            is_recursive: bool,
268            follow_symlinks: bool,
269            // whether this is an initial scan, used only for events
270            is_initial: bool,
271        ) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
272            log::trace!("rescanning {root:?}");
273            // WalkDir return only one entry if root is a file (not a folder),
274            // so we can use single logic to do the both file & dir's jobs.
275            //
276            // See: https://docs.rs/walkdir/2.0.1/walkdir/struct.WalkDir.html#method.new
277            WalkDir::new(root)
278                .follow_links(follow_symlinks)
279                .max_depth(Self::dir_scan_depth(is_recursive))
280                .into_iter()
281                .filter_map(|entry_res| match entry_res {
282                    Ok(entry) => Some(entry),
283                    Err(err) => {
284                        log::warn!("walkdir error scanning {err:?}");
285                        if let Some(io_error) = err.io_error() {
286                            // clone an io::Error, so we have to create a new one.
287                            let new_io_error = io::Error::new(io_error.kind(), err.to_string());
288                            data_builder.emitter.emit_io_err(new_io_error, err.path());
289                        } else {
290                            let crate_err =
291                                crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
292                            data_builder.emitter.emit(Err(crate_err));
293                        }
294                        None
295                    }
296                })
297                .filter_map(move |entry| match entry.metadata() {
298                    Ok(metadata) => {
299                        let path = entry.into_path();
300                        if is_initial {
301                            // emit initial scans
302                            if let Some(ref emitter) = data_builder.scan_emitter {
303                                emitter.borrow_mut().handle_event(Ok(path.clone()));
304                            }
305                        }
306                        let meta_path = MetaPath::from_parts_unchecked(path, metadata);
307                        let data_path = data_builder.build_path_data(&meta_path);
308
309                        Some((meta_path.into_path(), data_path))
310                    }
311                    Err(e) => {
312                        // emit event.
313                        let path = entry.into_path();
314                        data_builder.emitter.emit_io_err(e, Some(path));
315
316                        None
317                    }
318                })
319        }
320
321        fn dir_scan_depth(is_recursive: bool) -> usize {
322            if is_recursive {
323                usize::MAX
324            } else {
325                1
326            }
327        }
328    }
329
330    /// Stored data for a one path locations.
331    ///
332    /// See [`WatchData`] for more detail.
333    #[derive(Debug, Clone)]
334    struct PathData {
335        /// File updated time.
336        mtime: i64,
337
338        /// Content's hash value, only available if user request compare file
339        /// contents and read successful.
340        hash: Option<u64>,
341
342        /// Checked time.
343        last_check: Instant,
344    }
345
346    impl PathData {
347        /// Create a new `PathData`.
348        fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
349            let metadata = meta_path.metadata();
350
351            PathData {
352                mtime: FileTime::from_last_modification_time(metadata).seconds(),
353                hash: data_builder
354                    .build_hasher
355                    .as_ref()
356                    .filter(|_| metadata.is_file())
357                    .and_then(|build_hasher| {
358                        Self::get_content_hash(build_hasher, meta_path.path()).ok()
359                    }),
360
361                last_check: data_builder.now,
362            }
363        }
364
365        /// Get hash value for the data content in given file `path`.
366        fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
367            let mut hasher = build_hasher.build_hasher();
368            let mut file = File::open(path)?;
369            let mut buf = [0; 512];
370
371            loop {
372                let n = match file.read(&mut buf) {
373                    Ok(0) => break,
374                    Ok(len) => len,
375                    Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
376                    Err(e) => return Err(e),
377                };
378
379                hasher.write(&buf[..n]);
380            }
381
382            Ok(hasher.finish())
383        }
384
385        /// Get [`Event`] by compare two optional [`PathData`].
386        fn compare_to_event<P>(
387            path: P,
388            old: Option<&PathData>,
389            new: Option<&PathData>,
390        ) -> Option<Event>
391        where
392            P: Into<PathBuf>,
393        {
394            match (old, new) {
395                (Some(old), Some(new)) => {
396                    if new.mtime > old.mtime {
397                        Some(EventKind::Modify(ModifyKind::Metadata(
398                            MetadataKind::WriteTime,
399                        )))
400                    } else if new.hash != old.hash {
401                        Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
402                    } else {
403                        None
404                    }
405                }
406                (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)),
407                (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
408                (None, None) => None,
409            }
410            .map(|event_kind| Event::new(event_kind).add_path(path.into()))
411        }
412    }
413
414    /// Compose path and its metadata.
415    ///
416    /// This data structure designed for make sure path and its metadata can be
417    /// transferred in consistent way, and may avoid some duplicated
418    /// `fs::metadata()` function call in some situations.
419    #[derive(Debug)]
420    pub(super) struct MetaPath {
421        path: PathBuf,
422        metadata: Metadata,
423    }
424
425    impl MetaPath {
426        /// Create `MetaPath` by given parts.
427        ///
428        /// # Invariant
429        ///
430        /// User must make sure the input `metadata` are associated with `path`.
431        fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
432            Self { path, metadata }
433        }
434
435        fn path(&self) -> &Path {
436            &self.path
437        }
438
439        fn metadata(&self) -> &Metadata {
440            &self.metadata
441        }
442
443        fn into_path(self) -> PathBuf {
444            self.path
445        }
446    }
447
448    /// Thin wrapper for outer event handler, for easy to use.
449    struct EventEmitter(
450        // Use `RefCell` to make sure `emit()` only need shared borrow of self (&self).
451        // Use `Box` to make sure EventEmitter is Sized.
452        Box<RefCell<dyn EventHandler>>,
453    );
454
455    impl EventEmitter {
456        fn new<F: EventHandler>(event_handler: F) -> Self {
457            Self(Box::new(RefCell::new(event_handler)))
458        }
459
460        /// Emit single event.
461        fn emit(&self, event: crate::Result<Event>) {
462            self.0.borrow_mut().handle_event(event);
463        }
464
465        /// Emit event.
466        fn emit_ok(&self, event: Event) {
467            self.emit(Ok(event))
468        }
469
470        /// Emit io error event.
471        fn emit_io_err<E, P>(&self, err: E, path: Option<P>)
472        where
473            E: Into<io::Error>,
474            P: Into<PathBuf>,
475        {
476            let e = crate::Error::io(err.into());
477            if let Some(path) = path {
478                self.emit(Err(e.add_path(path.into())));
479            } else {
480                self.emit(Err(e));
481            }
482        }
483    }
484}
485
486/// Polling based `Watcher` implementation.
487///
488/// By default scans through all files and checks for changed entries based on their change date.
489/// Can also be changed to perform file content change checks.
490///
491/// See [Config] for more details.
492#[derive(Debug)]
493pub struct PollWatcher {
494    watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
495    data_builder: Arc<Mutex<DataBuilder>>,
496    want_to_stop: Arc<AtomicBool>,
497    /// channel to the poll loop
498    /// currently used only for manual polling
499    message_channel: Sender<()>,
500    delay: Option<Duration>,
501    follow_sylinks: bool,
502}
503
504impl PollWatcher {
505    /// Create a new [`PollWatcher`], configured as needed.
506    pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
507        Self::with_opt::<_, ()>(event_handler, config, None)
508    }
509
510    /// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling.
511    pub fn poll(&self) -> crate::Result<()> {
512        self.message_channel
513            .send(())
514            .map_err(|_| Error::generic("failed to send poll message"))?;
515        Ok(())
516    }
517
518    /// Create a new [`PollWatcher`] with an scan event handler.
519    ///
520    /// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher.
521    pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
522        event_handler: F,
523        config: Config,
524        scan_callback: G,
525    ) -> crate::Result<PollWatcher> {
526        Self::with_opt(event_handler, config, Some(scan_callback))
527    }
528
529    /// create a new [`PollWatcher`] with all options.
530    fn with_opt<F: EventHandler, G: ScanEventHandler>(
531        event_handler: F,
532        config: Config,
533        scan_callback: Option<G>,
534    ) -> crate::Result<PollWatcher> {
535        let data_builder =
536            DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
537
538        let (tx, rx) = unbounded();
539
540        let poll_watcher = PollWatcher {
541            watches: Default::default(),
542            data_builder: Arc::new(Mutex::new(data_builder)),
543            want_to_stop: Arc::new(AtomicBool::new(false)),
544            delay: config.poll_interval(),
545            follow_sylinks: config.follow_symlinks(),
546            message_channel: tx,
547        };
548
549        poll_watcher.run(rx);
550
551        Ok(poll_watcher)
552    }
553
554    fn run(&self, rx: Receiver<()>) {
555        let watches = Arc::clone(&self.watches);
556        let data_builder = Arc::clone(&self.data_builder);
557        let want_to_stop = Arc::clone(&self.want_to_stop);
558        let delay = self.delay;
559
560        let _ = thread::Builder::new()
561            .name("notify-rs poll loop".to_string())
562            .spawn(move || {
563                loop {
564                    if want_to_stop.load(Ordering::SeqCst) {
565                        break;
566                    }
567
568                    // HINT: Make sure always lock in the same order to avoid deadlock.
569                    //
570                    // FIXME: inconsistent: some place mutex poison cause panic,
571                    // some place just ignore.
572                    if let (Ok(mut watches), Ok(mut data_builder)) =
573                        (watches.lock(), data_builder.lock())
574                    {
575                        data_builder.update_timestamp();
576
577                        let vals = watches.values_mut();
578                        for watch_data in vals {
579                            watch_data.rescan(&mut data_builder);
580                        }
581                    }
582                    // TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start))
583                    if let Some(delay) = delay {
584                        let _ = rx.recv_timeout(delay);
585                    } else {
586                        let _ = rx.recv();
587                    }
588                }
589            });
590    }
591
592    /// Watch a path location.
593    ///
594    /// QUESTION: this function never return an Error, is it as intend?
595    /// Please also consider the IO Error event problem.
596    fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) {
597        // HINT: Make sure always lock in the same order to avoid deadlock.
598        //
599        // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
600        if let (Ok(mut watches), Ok(mut data_builder)) =
601            (self.watches.lock(), self.data_builder.lock())
602        {
603            data_builder.update_timestamp();
604
605            let watch_data = data_builder.build_watch_data(
606                path.to_path_buf(),
607                recursive_mode.is_recursive(),
608                self.follow_sylinks,
609            );
610
611            // if create watch_data successful, add it to watching list.
612            if let Some(watch_data) = watch_data {
613                watches.insert(path.to_path_buf(), watch_data);
614            }
615        }
616    }
617
618    /// Unwatch a path.
619    ///
620    /// Return `Err(_)` if given path has't be monitored.
621    fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> {
622        // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
623        self.watches
624            .lock()
625            .unwrap()
626            .remove(path)
627            .map(|_| ())
628            .ok_or_else(crate::Error::watch_not_found)
629    }
630}
631
632impl Watcher for PollWatcher {
633    /// Create a new [`PollWatcher`].
634    fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
635        Self::new(event_handler, config)
636    }
637
638    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> crate::Result<()> {
639        self.watch_inner(path, recursive_mode);
640
641        Ok(())
642    }
643
644    fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
645        self.unwatch_inner(path)
646    }
647
648    fn kind() -> crate::WatcherKind {
649        crate::WatcherKind::PollWatcher
650    }
651}
652
653impl Drop for PollWatcher {
654    fn drop(&mut self) {
655        self.want_to_stop.store(true, Ordering::Relaxed);
656    }
657}
658
659#[test]
660fn poll_watcher_is_send_and_sync() {
661    fn check<T: Send + Sync>() {}
662    check::<PollWatcher>();
663}