From 4ca36fd0d592f3c0e120be0b53bfca74d466be82 Mon Sep 17 00:00:00 2001 From: jmshark Date: Mon, 24 Apr 2023 14:51:01 +0000 Subject: [PATCH] Capture missed inotify events --- crates/common/tedge_utils/src/notify.rs | 250 +++++------------- crates/core/tedge_mapper/src/core/mapper.rs | 11 +- .../tedge_file_system_ext/src/lib.rs | 12 +- .../configuration_operation.robot | 2 - 4 files changed, 73 insertions(+), 202 deletions(-) diff --git a/crates/common/tedge_utils/src/notify.rs b/crates/common/tedge_utils/src/notify.rs index 526997b7f84..e03f7183e39 100644 --- a/crates/common/tedge_utils/src/notify.rs +++ b/crates/common/tedge_utils/src/notify.rs @@ -1,13 +1,6 @@ -use std::collections::HashMap; -use std::collections::HashSet; -use std::hash::Hash; -use std::path::Path; -use std::path::PathBuf; - use notify::event::AccessKind; use notify::event::AccessMode; use notify::event::CreateKind; -use notify::event::DataChange; use notify::event::ModifyKind; use notify::event::RemoveKind; use notify::Config; @@ -16,6 +9,9 @@ use notify::INotifyWatcher; use notify::RecommendedWatcher; use notify::RecursiveMode; use notify::Watcher; +use std::hash::Hash; +use std::path::Path; +use std::path::PathBuf; use strum_macros::Display; use tokio::sync::mpsc::channel; use tokio::sync::mpsc::Receiver; @@ -50,14 +46,9 @@ pub enum NotifyStreamError { DuplicateWatcher { mask: FsEvent, path: PathBuf }, } -type DirPath = PathBuf; -type MaybeFileName = Option; -type Metadata = HashMap>>; - pub struct NotifyStream { watcher: INotifyWatcher, pub rx: Receiver<(PathBuf, FsEvent)>, - metadata: Metadata, } impl NotifyStream { @@ -70,142 +61,75 @@ impl NotifyStream { move |res: Result| { futures::executor::block_on(async { if let Ok(notify_event) = res { - match notify_event.kind { - EventKind::Modify(ModifyKind::Data(DataChange::Any)) => { - for path in notify_event.paths { - let _ = tx.send((path, FsEvent::Modified)).await; - } - } - - EventKind::Access(AccessKind::Close(AccessMode::Write)) => { - for path in notify_event.paths { - let _ = tx.send((path, FsEvent::Modified)).await; - } - } - EventKind::Create(CreateKind::File) => { - for path in notify_event.paths { - let _ = tx.send((path, FsEvent::FileCreated)).await; - } - } - EventKind::Create(CreateKind::Folder) => { - for path in notify_event.paths { - let _ = tx.send((path, FsEvent::DirectoryCreated)).await; + let event = match notify_event.kind { + EventKind::Access(access_kind) => match access_kind { + AccessKind::Close(access_mode) => match access_mode { + AccessMode::Any | AccessMode::Other | AccessMode::Write => { + Some(FsEvent::Modified) + } + AccessMode::Read | AccessMode::Execute => None, + }, + AccessKind::Any | AccessKind::Other => Some(FsEvent::Modified), + AccessKind::Read | AccessKind::Open(_) => None, + }, + EventKind::Create(create_kind) => match create_kind { + CreateKind::File | CreateKind::Any | CreateKind::Other => { + Some(FsEvent::FileCreated) } - } - EventKind::Remove(RemoveKind::File) => { - for path in notify_event.paths { - let _ = tx.send((path, FsEvent::FileDeleted)).await; - } - } - EventKind::Remove(RemoveKind::Folder) => { - for path in notify_event.paths { - let _ = tx.send((path, FsEvent::DirectoryDeleted)).await; + CreateKind::Folder => Some(FsEvent::DirectoryCreated), + }, + EventKind::Modify(modify_kind) => match modify_kind { + ModifyKind::Data(_) + | ModifyKind::Metadata(_) + | ModifyKind::Name(_) + | ModifyKind::Any + | ModifyKind::Other => Some(FsEvent::Modified), + }, + EventKind::Remove(remove_kind) => match remove_kind { + RemoveKind::File | RemoveKind::Any | RemoveKind::Other => { + Some(FsEvent::FileDeleted) } + RemoveKind::Folder => Some(FsEvent::DirectoryDeleted), + }, + EventKind::Any | EventKind::Other => Some(FsEvent::Modified), + }; + + if let Some(event) = event { + for path in notify_event.paths { + let _ = tx.send((path, event)).await; } - _other => {} } } }) }, Config::default(), )?; - Ok(Self { - watcher, - rx, - metadata: HashMap::new(), - }) + Ok(Self { watcher, rx }) } - #[cfg(test)] - fn get_metadata(&self) -> &Metadata { - &self.metadata - } - fn get_metadata_as_mut(&mut self) -> &mut Metadata { - &mut self.metadata - } - - pub fn add_watcher( - &mut self, - dir_path: &Path, - file: Option, - events: &[FsEvent], - ) -> Result<(), NotifyStreamError> { + /// Will return an error if you try to watch a file/directory which doesn't exist + pub fn add_watcher(&mut self, dir_path: &Path) -> Result<(), NotifyStreamError> { self.watcher.watch(dir_path, RecursiveMode::Recursive)?; - // we add the information to the metadata - let maybe_file_name_entry = self - .get_metadata_as_mut() - .entry(dir_path.to_path_buf()) - .or_insert_with(HashMap::new); - - let file_event_entry = maybe_file_name_entry - .entry(file) - .or_insert_with(HashSet::new); - for event in events { - file_event_entry.insert(*event); - } Ok(()) } } -pub fn fs_notify_stream( - input: &[(&Path, Option, &[FsEvent])], -) -> Result { +pub fn fs_notify_stream(input: &[&Path]) -> Result { let mut fs_notify = NotifyStream::try_default()?; - for (dir_path, watch, flags) in input { - fs_notify.add_watcher(dir_path, watch.to_owned(), flags)?; + for dir_path in input { + fs_notify.add_watcher(dir_path)?; } Ok(fs_notify) } #[cfg(test)] -mod notify_tests { - use std::collections::HashMap; - use std::sync::Arc; - +mod tests { + use super::*; use maplit::hashmap; + use std::{collections::HashMap, sync::Arc, time::Duration}; use tedge_test_utils::fs::TempTedgeDir; - use crate::notify::FsEvent; - - use super::fs_notify_stream; - use super::NotifyStream; - - /// This test: - /// Creates a duplicate watcher (same directory path, same file name, same event) - /// Adds a new event for the same directory path, same file name - /// Checks the duplicate event is not duplicated in the data structure - /// Checks the new event is in the data structure - #[test] - fn it_inserts_new_file_events_correctly() { - let ttd = TempTedgeDir::new(); - let mut notify = NotifyStream::try_default().unwrap(); - let maybe_file_name = Some("file_a".to_string()); - - notify - .add_watcher(ttd.path(), maybe_file_name.clone(), &[FsEvent::FileCreated]) - .unwrap(); - notify - .add_watcher(ttd.path(), maybe_file_name.clone(), &[FsEvent::FileCreated]) - .unwrap(); - notify - .add_watcher(ttd.path(), maybe_file_name.clone(), &[FsEvent::FileDeleted]) - .unwrap(); - - let event_hashset = notify - .get_metadata() - .get(ttd.path()) - .unwrap() - .get(&maybe_file_name) - .unwrap(); - - // assert no duplicate entry was created for the second insert and new event was added - // in total 2 events are expected: FileEvent::Created, FileEvent::Deleted - assert_eq!(event_hashset.len(), 2); - assert!(event_hashset.contains(&FsEvent::FileCreated)); - assert!(event_hashset.contains(&FsEvent::FileDeleted)); - } - async fn assert_rx_stream( mut inputs: HashMap>, mut fs_notify: NotifyStream, @@ -246,19 +170,7 @@ mod notify_tests { String::from("file_b") => vec![FsEvent::FileCreated, FsEvent::Modified] }; - let stream = fs_notify_stream(&[ - ( - ttd.path(), - Some(String::from("file_a")), - &[FsEvent::FileCreated], - ), - ( - ttd.path(), - Some(String::from("file_b")), - &[FsEvent::FileCreated, FsEvent::Modified], - ), - ]) - .unwrap(); + let stream = fs_notify_stream(&[ttd.path(), ttd.path()]).unwrap(); let fs_notify_handler = tokio::task::spawn(async move { assert_rx_stream(expected_events, stream).await; @@ -278,29 +190,31 @@ mod notify_tests { let ttd = Arc::new(TempTedgeDir::new()); let ttd_clone = ttd.clone(); let mut fs_notify = NotifyStream::try_default().unwrap(); - fs_notify - .add_watcher(ttd.path(), None, &[FsEvent::FileCreated, FsEvent::Modified]) - .unwrap(); + fs_notify.add_watcher(ttd.path()).unwrap(); - let expected_events = hashmap! { - String::from("file_a") => vec![FsEvent::FileCreated], - String::from("file_b") => vec![FsEvent::FileCreated, FsEvent::Modified], - String::from("file_c") => vec![FsEvent::FileCreated, FsEvent::FileDeleted], - }; - - let file_system_handler = tokio::spawn(async move { - ttd_clone.dir("dir_a"); - ttd_clone.file("file_a"); - ttd_clone.file("file_b"); //.with_raw_content("yo"); - ttd_clone.file("file_c").delete(); - }); + let assert_file_events = tokio::spawn(async move { + let expected_events = hashmap! { + String::from("file_a") => vec![FsEvent::FileCreated], + String::from("file_b") => vec![FsEvent::FileCreated, FsEvent::Modified], + String::from("file_c") => vec![FsEvent::FileCreated, FsEvent::FileDeleted], + }; - let fs_notify_handler = tokio::spawn(async move { - assert_rx_stream(expected_events, fs_notify).await; + let file_system_handler = async { + ttd_clone.dir("dir_a"); + ttd_clone.file("file_a"); + ttd_clone.file("file_b"); + ttd_clone.file("file_c").delete(); + }; + let _ = tokio::join!( + assert_rx_stream(expected_events, fs_notify), + file_system_handler + ); }); - fs_notify_handler.await.unwrap(); - file_system_handler.await.unwrap(); + tokio::time::timeout(Duration::from_secs(1), assert_file_events) + .await + .unwrap() + .unwrap(); } #[tokio::test] @@ -315,16 +229,7 @@ mod notify_tests { String::from("file_c") => vec![FsEvent::FileCreated, FsEvent::FileDeleted] }; - let stream = fs_notify_stream(&[( - ttd.path(), - None, - &[ - FsEvent::FileCreated, - FsEvent::Modified, - FsEvent::FileDeleted, - ], - )]) - .unwrap(); + let stream = fs_notify_stream(&[ttd.path()]).unwrap(); let fs_notify_handler = tokio::task::spawn(async move { assert_rx_stream(expected_events, stream).await; @@ -359,21 +264,8 @@ mod notify_tests { String::from("dir_d") => vec![FsEvent::DirectoryCreated], }; - let stream = fs_notify_stream(&[ - (ttd_a.path(), None, &[FsEvent::FileCreated]), - ( - ttd_b.path(), - None, - &[FsEvent::FileCreated, FsEvent::Modified], - ), - ( - ttd_c.path(), - None, - &[FsEvent::FileCreated, FsEvent::FileDeleted], - ), - (ttd_d.path(), None, &[FsEvent::FileCreated]), - ]) - .unwrap(); + let stream = + fs_notify_stream(&[ttd_a.path(), ttd_b.path(), ttd_c.path(), ttd_d.path()]).unwrap(); let fs_notify_handler = tokio::task::spawn(async move { assert_rx_stream(expected_events, stream).await; diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs index 51b59c73e3f..207756d8fa0 100644 --- a/crates/core/tedge_mapper/src/core/mapper.rs +++ b/crates/core/tedge_mapper/src/core/mapper.rs @@ -156,16 +156,7 @@ impl Mapper { async fn process_messages(mapper: &mut Mapper, ops_dir: Option<&Path>) -> Result<(), MapperError> { if let Some(path) = ops_dir { - let mut fs_notification_stream = fs_notify_stream(&[( - path, - None, - &[ - FsEvent::DirectoryCreated, - FsEvent::FileCreated, - FsEvent::FileDeleted, - FsEvent::Modified, - ], - )])?; + let mut fs_notification_stream = fs_notify_stream(&[path])?; // Send health status to confirm the mapper initialization is completed send_health_status(&mut mapper.output, &mapper.mapper_name).await; diff --git a/crates/extensions/tedge_file_system_ext/src/lib.rs b/crates/extensions/tedge_file_system_ext/src/lib.rs index b0a8e32e335..8603204ed01 100644 --- a/crates/extensions/tedge_file_system_ext/src/lib.rs +++ b/crates/extensions/tedge_file_system_ext/src/lib.rs @@ -123,17 +123,7 @@ impl Actor for FsWatchActor { async fn run(&mut self) -> Result<(), RuntimeError> { let mut fs_notify = NotifyStream::try_default().unwrap(); for (watch_path, _) in self.messages.get_watch_dirs().iter() { - fs_notify - .add_watcher( - watch_path, - None, - &[ - FsEvent::Modified, - FsEvent::FileDeleted, - FsEvent::FileCreated, - ], - ) - .unwrap(); + fs_notify.add_watcher(watch_path).unwrap(); } loop { diff --git a/tests/RobotFramework/tests/cumulocity/configuration/configuration_operation.robot b/tests/RobotFramework/tests/cumulocity/configuration/configuration_operation.robot index 92c8b3db71f..83b76c5f4d0 100644 --- a/tests/RobotFramework/tests/cumulocity/configuration/configuration_operation.robot +++ b/tests/RobotFramework/tests/cumulocity/configuration/configuration_operation.robot @@ -68,7 +68,6 @@ Update configuration plugin config via cloud Modify configuration plugin config via local filesystem modify inplace [Documentation] Editing the c8y-configuration-plugin.toml locally requires the ... c8y-configuration-plugin service to be restarted - Skip msg="Fails due to https://github.com/thin-edge/thin-edge.io/issues/1896" Cumulocity.Should Support Configurations ${DEFAULT_CONFIG} /etc/tedge/tedge.toml system.toml CONFIG1 Execute Command sed -i 's/CONFIG1/CONFIG3/g' /etc/tedge/c8y/c8y-configuration-plugin.toml Cumulocity.Should Support Configurations ${DEFAULT_CONFIG} /etc/tedge/tedge.toml system.toml CONFIG3 @@ -100,7 +99,6 @@ Update configuration plugin config via local filesystem move (different director Update configuration plugin config via local filesystem move (same directory) [Documentation] Repalacing the c8y-configuration-plugin.toml locally requires the ... c8y-configuration-plugin service to be restarted - Skip msg="Fails due to https://github.com/thin-edge/thin-edge.io/issues/1896" Cumulocity.Should Support Configurations ${DEFAULT_CONFIG} /etc/tedge/tedge.toml system.toml CONFIG1 Transfer To Device ${CURDIR}/c8y-configuration-plugin-updated.toml /etc/tedge/c8y/ Execute Command mv /etc/tedge/c8y/c8y-configuration-plugin-updated.toml /etc/tedge/c8y/c8y-configuration-plugin.toml