1use crate::{unbounded, Config, Error, EventHandler, Receiver, RecursiveMode, Sender, Watcher};
7use std::{
8 collections::HashMap,
9 path::{Path, PathBuf},
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc, Mutex,
13 },
14 thread,
15 time::Duration,
16};
17
18pub type ScanEvent = crate::Result<PathBuf>;
20
21pub trait ScanEventHandler: Send + 'static {
26 fn handle_event(&mut self, event: ScanEvent);
28}
29
30impl<F> ScanEventHandler for F
31where
32 F: FnMut(ScanEvent) + Send + 'static,
33{
34 fn handle_event(&mut self, event: ScanEvent) {
35 (self)(event);
36 }
37}
38
39#[cfg(feature = "crossbeam-channel")]
40impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
41 fn handle_event(&mut self, event: ScanEvent) {
42 let _ = self.send(event);
43 }
44}
45
46impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
47 fn handle_event(&mut self, event: ScanEvent) {
48 let _ = self.send(event);
49 }
50}
51
52impl ScanEventHandler for () {
53 fn handle_event(&mut self, _event: ScanEvent) {}
54}
55
56use data::{DataBuilder, WatchData};
57mod data {
58 use crate::{
59 event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
60 EventHandler,
61 };
62 use filetime::FileTime;
63 use std::{
64 cell::RefCell,
65 collections::{hash_map::RandomState, HashMap},
66 fmt::{self, Debug},
67 fs::{self, File, Metadata},
68 hash::{BuildHasher, Hasher},
69 io::{self, Read},
70 path::{Path, PathBuf},
71 time::Instant,
72 };
73 use walkdir::WalkDir;
74
75 use super::ScanEventHandler;
76
77 pub(super) struct DataBuilder {
79 emitter: EventEmitter,
80 scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
81
82 build_hasher: Option<RandomState>,
85
86 now: Instant,
88 }
89
90 impl DataBuilder {
91 pub(super) fn new<F, G>(
92 event_handler: F,
93 compare_content: bool,
94 scan_emitter: Option<G>,
95 ) -> Self
96 where
97 F: EventHandler,
98 G: ScanEventHandler,
99 {
100 let scan_emitter = match scan_emitter {
101 None => None,
102 Some(v) => {
103 let intermediate: Box<RefCell<dyn ScanEventHandler>> =
105 Box::new(RefCell::new(v));
106 Some(intermediate)
107 }
108 };
109 Self {
110 emitter: EventEmitter::new(event_handler),
111 scan_emitter,
112 build_hasher: compare_content.then(RandomState::default),
113 now: Instant::now(),
114 }
115 }
116
117 pub(super) fn update_timestamp(&mut self) {
119 self.now = Instant::now();
120 }
121
122 pub(super) fn build_watch_data(
127 &self,
128 root: PathBuf,
129 is_recursive: bool,
130 follow_symlinks: bool,
131 ) -> Option<WatchData> {
132 WatchData::new(self, root, is_recursive, follow_symlinks)
133 }
134
135 fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
137 PathData::new(self, meta_path)
138 }
139 }
140
141 impl Debug for DataBuilder {
142 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
143 f.debug_struct("DataBuilder")
144 .field("build_hasher", &self.build_hasher)
145 .field("now", &self.now)
146 .finish()
147 }
148 }
149
150 #[derive(Debug)]
151 pub(super) struct WatchData {
152 root: PathBuf,
154 is_recursive: bool,
155 follow_symlinks: bool,
156
157 all_path_data: HashMap<PathBuf, PathData>,
159 }
160
161 impl WatchData {
162 fn new(
168 data_builder: &DataBuilder,
169 root: PathBuf,
170 is_recursive: bool,
171 follow_symlinks: bool,
172 ) -> Option<Self> {
173 if let Err(e) = fs::metadata(&root) {
192 data_builder.emitter.emit_io_err(e, Some(&root));
193 return None;
194 }
195
196 let all_path_data = Self::scan_all_path_data(
197 data_builder,
198 root.clone(),
199 is_recursive,
200 follow_symlinks,
201 true,
202 )
203 .collect();
204
205 Some(Self {
206 root,
207 is_recursive,
208 follow_symlinks,
209 all_path_data,
210 })
211 }
212
213 pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
219 for (path, new_path_data) in Self::scan_all_path_data(
221 data_builder,
222 self.root.clone(),
223 self.is_recursive,
224 self.follow_symlinks,
225 false,
226 ) {
227 let old_path_data = self
228 .all_path_data
229 .insert(path.clone(), new_path_data.clone());
230
231 let event =
233 PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data));
234 if let Some(event) = event {
235 data_builder.emitter.emit_ok(event);
236 }
237 }
238
239 let mut disappeared_paths = Vec::new();
241 for (path, path_data) in self.all_path_data.iter() {
242 if path_data.last_check < data_builder.now {
243 disappeared_paths.push(path.clone());
244 }
245 }
246
247 for path in disappeared_paths {
249 let old_path_data = self.all_path_data.remove(&path);
250
251 let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
253 if let Some(event) = event {
254 data_builder.emitter.emit_ok(event);
255 }
256 }
257 }
258
259 fn scan_all_path_data(
265 data_builder: &'_ DataBuilder,
266 root: PathBuf,
267 is_recursive: bool,
268 follow_symlinks: bool,
269 is_initial: bool,
271 ) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
272 log::trace!("rescanning {root:?}");
273 WalkDir::new(root)
278 .follow_links(follow_symlinks)
279 .max_depth(Self::dir_scan_depth(is_recursive))
280 .into_iter()
281 .filter_map(|entry_res| match entry_res {
282 Ok(entry) => Some(entry),
283 Err(err) => {
284 log::warn!("walkdir error scanning {err:?}");
285 if let Some(io_error) = err.io_error() {
286 let new_io_error = io::Error::new(io_error.kind(), err.to_string());
288 data_builder.emitter.emit_io_err(new_io_error, err.path());
289 } else {
290 let crate_err =
291 crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
292 data_builder.emitter.emit(Err(crate_err));
293 }
294 None
295 }
296 })
297 .filter_map(move |entry| match entry.metadata() {
298 Ok(metadata) => {
299 let path = entry.into_path();
300 if is_initial {
301 if let Some(ref emitter) = data_builder.scan_emitter {
303 emitter.borrow_mut().handle_event(Ok(path.clone()));
304 }
305 }
306 let meta_path = MetaPath::from_parts_unchecked(path, metadata);
307 let data_path = data_builder.build_path_data(&meta_path);
308
309 Some((meta_path.into_path(), data_path))
310 }
311 Err(e) => {
312 let path = entry.into_path();
314 data_builder.emitter.emit_io_err(e, Some(path));
315
316 None
317 }
318 })
319 }
320
321 fn dir_scan_depth(is_recursive: bool) -> usize {
322 if is_recursive {
323 usize::MAX
324 } else {
325 1
326 }
327 }
328 }
329
330 #[derive(Debug, Clone)]
334 struct PathData {
335 mtime: i64,
337
338 hash: Option<u64>,
341
342 last_check: Instant,
344 }
345
346 impl PathData {
347 fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
349 let metadata = meta_path.metadata();
350
351 PathData {
352 mtime: FileTime::from_last_modification_time(metadata).seconds(),
353 hash: data_builder
354 .build_hasher
355 .as_ref()
356 .filter(|_| metadata.is_file())
357 .and_then(|build_hasher| {
358 Self::get_content_hash(build_hasher, meta_path.path()).ok()
359 }),
360
361 last_check: data_builder.now,
362 }
363 }
364
365 fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
367 let mut hasher = build_hasher.build_hasher();
368 let mut file = File::open(path)?;
369 let mut buf = [0; 512];
370
371 loop {
372 let n = match file.read(&mut buf) {
373 Ok(0) => break,
374 Ok(len) => len,
375 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
376 Err(e) => return Err(e),
377 };
378
379 hasher.write(&buf[..n]);
380 }
381
382 Ok(hasher.finish())
383 }
384
385 fn compare_to_event<P>(
387 path: P,
388 old: Option<&PathData>,
389 new: Option<&PathData>,
390 ) -> Option<Event>
391 where
392 P: Into<PathBuf>,
393 {
394 match (old, new) {
395 (Some(old), Some(new)) => {
396 if new.mtime > old.mtime {
397 Some(EventKind::Modify(ModifyKind::Metadata(
398 MetadataKind::WriteTime,
399 )))
400 } else if new.hash != old.hash {
401 Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
402 } else {
403 None
404 }
405 }
406 (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)),
407 (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
408 (None, None) => None,
409 }
410 .map(|event_kind| Event::new(event_kind).add_path(path.into()))
411 }
412 }
413
414 #[derive(Debug)]
420 pub(super) struct MetaPath {
421 path: PathBuf,
422 metadata: Metadata,
423 }
424
425 impl MetaPath {
426 fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
432 Self { path, metadata }
433 }
434
435 fn path(&self) -> &Path {
436 &self.path
437 }
438
439 fn metadata(&self) -> &Metadata {
440 &self.metadata
441 }
442
443 fn into_path(self) -> PathBuf {
444 self.path
445 }
446 }
447
448 struct EventEmitter(
450 Box<RefCell<dyn EventHandler>>,
453 );
454
455 impl EventEmitter {
456 fn new<F: EventHandler>(event_handler: F) -> Self {
457 Self(Box::new(RefCell::new(event_handler)))
458 }
459
460 fn emit(&self, event: crate::Result<Event>) {
462 self.0.borrow_mut().handle_event(event);
463 }
464
465 fn emit_ok(&self, event: Event) {
467 self.emit(Ok(event))
468 }
469
470 fn emit_io_err<E, P>(&self, err: E, path: Option<P>)
472 where
473 E: Into<io::Error>,
474 P: Into<PathBuf>,
475 {
476 let e = crate::Error::io(err.into());
477 if let Some(path) = path {
478 self.emit(Err(e.add_path(path.into())));
479 } else {
480 self.emit(Err(e));
481 }
482 }
483 }
484}
485
486#[derive(Debug)]
493pub struct PollWatcher {
494 watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
495 data_builder: Arc<Mutex<DataBuilder>>,
496 want_to_stop: Arc<AtomicBool>,
497 message_channel: Sender<()>,
500 delay: Option<Duration>,
501 follow_sylinks: bool,
502}
503
504impl PollWatcher {
505 pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
507 Self::with_opt::<_, ()>(event_handler, config, None)
508 }
509
510 pub fn poll(&self) -> crate::Result<()> {
512 self.message_channel
513 .send(())
514 .map_err(|_| Error::generic("failed to send poll message"))?;
515 Ok(())
516 }
517
518 pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
522 event_handler: F,
523 config: Config,
524 scan_callback: G,
525 ) -> crate::Result<PollWatcher> {
526 Self::with_opt(event_handler, config, Some(scan_callback))
527 }
528
529 fn with_opt<F: EventHandler, G: ScanEventHandler>(
531 event_handler: F,
532 config: Config,
533 scan_callback: Option<G>,
534 ) -> crate::Result<PollWatcher> {
535 let data_builder =
536 DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
537
538 let (tx, rx) = unbounded();
539
540 let poll_watcher = PollWatcher {
541 watches: Default::default(),
542 data_builder: Arc::new(Mutex::new(data_builder)),
543 want_to_stop: Arc::new(AtomicBool::new(false)),
544 delay: config.poll_interval(),
545 follow_sylinks: config.follow_symlinks(),
546 message_channel: tx,
547 };
548
549 poll_watcher.run(rx);
550
551 Ok(poll_watcher)
552 }
553
554 fn run(&self, rx: Receiver<()>) {
555 let watches = Arc::clone(&self.watches);
556 let data_builder = Arc::clone(&self.data_builder);
557 let want_to_stop = Arc::clone(&self.want_to_stop);
558 let delay = self.delay;
559
560 let _ = thread::Builder::new()
561 .name("notify-rs poll loop".to_string())
562 .spawn(move || {
563 loop {
564 if want_to_stop.load(Ordering::SeqCst) {
565 break;
566 }
567
568 if let (Ok(mut watches), Ok(mut data_builder)) =
573 (watches.lock(), data_builder.lock())
574 {
575 data_builder.update_timestamp();
576
577 let vals = watches.values_mut();
578 for watch_data in vals {
579 watch_data.rescan(&mut data_builder);
580 }
581 }
582 if let Some(delay) = delay {
584 let _ = rx.recv_timeout(delay);
585 } else {
586 let _ = rx.recv();
587 }
588 }
589 });
590 }
591
592 fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) {
597 if let (Ok(mut watches), Ok(mut data_builder)) =
601 (self.watches.lock(), self.data_builder.lock())
602 {
603 data_builder.update_timestamp();
604
605 let watch_data = data_builder.build_watch_data(
606 path.to_path_buf(),
607 recursive_mode.is_recursive(),
608 self.follow_sylinks,
609 );
610
611 if let Some(watch_data) = watch_data {
613 watches.insert(path.to_path_buf(), watch_data);
614 }
615 }
616 }
617
618 fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> {
622 self.watches
624 .lock()
625 .unwrap()
626 .remove(path)
627 .map(|_| ())
628 .ok_or_else(crate::Error::watch_not_found)
629 }
630}
631
632impl Watcher for PollWatcher {
633 fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
635 Self::new(event_handler, config)
636 }
637
638 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> crate::Result<()> {
639 self.watch_inner(path, recursive_mode);
640
641 Ok(())
642 }
643
644 fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
645 self.unwatch_inner(path)
646 }
647
648 fn kind() -> crate::WatcherKind {
649 crate::WatcherKind::PollWatcher
650 }
651}
652
653impl Drop for PollWatcher {
654 fn drop(&mut self) {
655 self.want_to_stop.store(true, Ordering::Relaxed);
656 }
657}
658
659#[test]
660fn poll_watcher_is_send_and_sync() {
661 fn check<T: Send + Sync>() {}
662 check::<PollWatcher>();
663}