Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/1896/file modifications #1928

Merged
merged 1 commit into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 74 additions & 179 deletions crates/common/tedge_utils/src/notify.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::hash::Hash;
use std::path::Path;
use std::path::PathBuf;

use notify::event::AccessKind;
use notify::event::AccessMode;
use notify::event::CreateKind;
use notify::event::DataChange;
use notify::event::ModifyKind;
use notify::event::RemoveKind;
use notify::Config;
Expand All @@ -16,6 +9,9 @@ use notify::INotifyWatcher;
use notify::RecommendedWatcher;
use notify::RecursiveMode;
use notify::Watcher;
use std::hash::Hash;
use std::path::Path;
use std::path::PathBuf;
use strum_macros::Display;
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::Receiver;
Expand Down Expand Up @@ -50,14 +46,9 @@ pub enum NotifyStreamError {
DuplicateWatcher { mask: FsEvent, path: PathBuf },
}

type DirPath = PathBuf;
type MaybeFileName = Option<String>;
type Metadata = HashMap<DirPath, HashMap<MaybeFileName, HashSet<FsEvent>>>;

pub struct NotifyStream {
watcher: INotifyWatcher,
pub rx: Receiver<(PathBuf, FsEvent)>,
metadata: Metadata,
}

impl NotifyStream {
Expand All @@ -70,142 +61,77 @@ impl NotifyStream {
move |res: Result<notify::Event, notify::Error>| {
futures::executor::block_on(async {
if let Ok(notify_event) = res {
match notify_event.kind {
EventKind::Modify(ModifyKind::Data(DataChange::Any)) => {
for path in notify_event.paths {
let _ = tx.send((path, FsEvent::Modified)).await;
}
}

EventKind::Access(AccessKind::Close(AccessMode::Write)) => {
for path in notify_event.paths {
let _ = tx.send((path, FsEvent::Modified)).await;
}
}
EventKind::Create(CreateKind::File) => {
for path in notify_event.paths {
let _ = tx.send((path, FsEvent::FileCreated)).await;
let event = match notify_event.kind {
EventKind::Access(access_kind) => match access_kind {
AccessKind::Close(access_mode) => match access_mode {
AccessMode::Any | AccessMode::Other | AccessMode::Write => {
Some(FsEvent::Modified)
}
AccessMode::Read | AccessMode::Execute => None,
},
AccessKind::Any | AccessKind::Other => Some(FsEvent::Modified),
AccessKind::Read | AccessKind::Open(_) => None,
},
EventKind::Create(create_kind) => match create_kind {
CreateKind::File | CreateKind::Any | CreateKind::Other => {
Some(FsEvent::FileCreated)
}
}
EventKind::Create(CreateKind::Folder) => {
for path in notify_event.paths {
let _ = tx.send((path, FsEvent::DirectoryCreated)).await;
}
}
EventKind::Remove(RemoveKind::File) => {
for path in notify_event.paths {
let _ = tx.send((path, FsEvent::FileDeleted)).await;
CreateKind::Folder => Some(FsEvent::DirectoryCreated),
},
EventKind::Modify(modify_kind) => match modify_kind {
ModifyKind::Data(_)
| ModifyKind::Metadata(_)
| ModifyKind::Name(_)
| ModifyKind::Any
| ModifyKind::Other => Some(FsEvent::Modified),
},
EventKind::Remove(remove_kind) => match remove_kind {
RemoveKind::File | RemoveKind::Any | RemoveKind::Other => {
Some(FsEvent::FileDeleted)
}
RemoveKind::Folder => Some(FsEvent::DirectoryDeleted),
},
EventKind::Any | EventKind::Other => Some(FsEvent::Modified),
};

if let Some(event) = event {
for path in notify_event.paths {
let _ = tx.send((path, event)).await;
}
EventKind::Remove(RemoveKind::Folder) => {
for path in notify_event.paths {
let _ = tx.send((path, FsEvent::DirectoryDeleted)).await;
}
}
_other => {}
}
}
})
},
Config::default(),
)?;
Ok(Self {
watcher,
rx,
metadata: HashMap::new(),
})
Ok(Self { watcher, rx })
}

#[cfg(test)]
fn get_metadata(&self) -> &Metadata {
&self.metadata
}
fn get_metadata_as_mut(&mut self) -> &mut Metadata {
&mut self.metadata
}

pub fn add_watcher(
&mut self,
dir_path: &Path,
file: Option<String>,
events: &[FsEvent],
) -> Result<(), NotifyStreamError> {
/// Will return an error if you try to watch a file/directory which doesn't exist
pub fn add_watcher(&mut self, dir_path: &Path) -> Result<(), NotifyStreamError> {
Comment on lines -127 to +111
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the list of watch events is a nice simplification. The caller might receive more messages than expected but it far simpler to do the filtering there.

self.watcher.watch(dir_path, RecursiveMode::Recursive)?;

// we add the information to the metadata
let maybe_file_name_entry = self
.get_metadata_as_mut()
.entry(dir_path.to_path_buf())
.or_insert_with(HashMap::new);

let file_event_entry = maybe_file_name_entry
.entry(file)
.or_insert_with(HashSet::new);
for event in events {
file_event_entry.insert(*event);
}
Ok(())
}
}

pub fn fs_notify_stream(
input: &[(&Path, Option<String>, &[FsEvent])],
) -> Result<NotifyStream, NotifyStreamError> {
pub fn fs_notify_stream(input: &[&Path]) -> Result<NotifyStream, NotifyStreamError> {
let mut fs_notify = NotifyStream::try_default()?;
for (dir_path, watch, flags) in input {
fs_notify.add_watcher(dir_path, watch.to_owned(), flags)?;
for dir_path in input {
fs_notify.add_watcher(dir_path)?;
}
Ok(fs_notify)
}

#[cfg(test)]
mod notify_tests {
mod tests {
use super::*;
use maplit::hashmap;
use std::collections::HashMap;
use std::sync::Arc;

use maplit::hashmap;
use std::time::Duration;
use tedge_test_utils::fs::TempTedgeDir;

use crate::notify::FsEvent;

use super::fs_notify_stream;
use super::NotifyStream;

/// This test:
/// Creates a duplicate watcher (same directory path, same file name, same event)
/// Adds a new event for the same directory path, same file name
/// Checks the duplicate event is not duplicated in the data structure
/// Checks the new event is in the data structure
#[test]
fn it_inserts_new_file_events_correctly() {
let ttd = TempTedgeDir::new();
let mut notify = NotifyStream::try_default().unwrap();
let maybe_file_name = Some("file_a".to_string());

notify
.add_watcher(ttd.path(), maybe_file_name.clone(), &[FsEvent::FileCreated])
.unwrap();
notify
.add_watcher(ttd.path(), maybe_file_name.clone(), &[FsEvent::FileCreated])
.unwrap();
notify
.add_watcher(ttd.path(), maybe_file_name.clone(), &[FsEvent::FileDeleted])
.unwrap();

let event_hashset = notify
.get_metadata()
.get(ttd.path())
.unwrap()
.get(&maybe_file_name)
.unwrap();

// assert no duplicate entry was created for the second insert and new event was added
// in total 2 events are expected: FileEvent::Created, FileEvent::Deleted
assert_eq!(event_hashset.len(), 2);
assert!(event_hashset.contains(&FsEvent::FileCreated));
assert!(event_hashset.contains(&FsEvent::FileDeleted));
}

async fn assert_rx_stream(
mut inputs: HashMap<String, Vec<FsEvent>>,
mut fs_notify: NotifyStream,
Expand Down Expand Up @@ -246,19 +172,7 @@ mod notify_tests {
String::from("file_b") => vec![FsEvent::FileCreated, FsEvent::Modified]
};

let stream = fs_notify_stream(&[
(
ttd.path(),
Some(String::from("file_a")),
&[FsEvent::FileCreated],
),
(
ttd.path(),
Some(String::from("file_b")),
&[FsEvent::FileCreated, FsEvent::Modified],
),
])
.unwrap();
let stream = fs_notify_stream(&[ttd.path(), ttd.path()]).unwrap();

let fs_notify_handler = tokio::task::spawn(async move {
assert_rx_stream(expected_events, stream).await;
Expand All @@ -278,29 +192,32 @@ mod notify_tests {
let ttd = Arc::new(TempTedgeDir::new());
let ttd_clone = ttd.clone();
let mut fs_notify = NotifyStream::try_default().unwrap();
fs_notify
.add_watcher(ttd.path(), None, &[FsEvent::FileCreated, FsEvent::Modified])
.unwrap();

let expected_events = hashmap! {
String::from("file_a") => vec![FsEvent::FileCreated],
String::from("file_b") => vec![FsEvent::FileCreated, FsEvent::Modified],
String::from("file_c") => vec![FsEvent::FileCreated, FsEvent::FileDeleted],
};

let file_system_handler = tokio::spawn(async move {
ttd_clone.dir("dir_a");
ttd_clone.file("file_a");
ttd_clone.file("file_b"); //.with_raw_content("yo");
ttd_clone.file("file_c").delete();
});
fs_notify.add_watcher(ttd.path()).unwrap();

let assert_file_events = tokio::spawn(async move {
let expected_events = hashmap! {
String::from("dir_a") => vec![FsEvent::DirectoryCreated],
String::from("file_a") => vec![FsEvent::FileCreated],
String::from("file_b") => vec![FsEvent::FileCreated, FsEvent::Modified],
String::from("file_c") => vec![FsEvent::FileCreated, FsEvent::FileDeleted],
jmshark marked this conversation as resolved.
Show resolved Hide resolved
};

let fs_notify_handler = tokio::spawn(async move {
assert_rx_stream(expected_events, fs_notify).await;
let file_system_handler = async {
ttd_clone.dir("dir_a");
ttd_clone.file("file_a");
ttd_clone.file("file_b");
ttd_clone.file("file_c").delete();
};
let _ = tokio::join!(
assert_rx_stream(expected_events, fs_notify),
file_system_handler
);
});

fs_notify_handler.await.unwrap();
file_system_handler.await.unwrap();
tokio::time::timeout(Duration::from_secs(1), assert_file_events)
.await
.unwrap()
.unwrap();
}

#[tokio::test]
Expand All @@ -315,16 +232,7 @@ mod notify_tests {
String::from("file_c") => vec![FsEvent::FileCreated, FsEvent::FileDeleted]
};

let stream = fs_notify_stream(&[(
ttd.path(),
None,
&[
FsEvent::FileCreated,
FsEvent::Modified,
FsEvent::FileDeleted,
],
)])
.unwrap();
let stream = fs_notify_stream(&[ttd.path()]).unwrap();

let fs_notify_handler = tokio::task::spawn(async move {
assert_rx_stream(expected_events, stream).await;
Expand Down Expand Up @@ -359,21 +267,8 @@ mod notify_tests {
String::from("dir_d") => vec![FsEvent::DirectoryCreated],
};

let stream = fs_notify_stream(&[
(ttd_a.path(), None, &[FsEvent::FileCreated]),
(
ttd_b.path(),
None,
&[FsEvent::FileCreated, FsEvent::Modified],
),
(
ttd_c.path(),
None,
&[FsEvent::FileCreated, FsEvent::FileDeleted],
),
(ttd_d.path(), None, &[FsEvent::FileCreated]),
])
.unwrap();
let stream =
fs_notify_stream(&[ttd_a.path(), ttd_b.path(), ttd_c.path(), ttd_d.path()]).unwrap();

let fs_notify_handler = tokio::task::spawn(async move {
assert_rx_stream(expected_events, stream).await;
Expand Down
11 changes: 1 addition & 10 deletions crates/core/tedge_mapper/src/core/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,7 @@ impl Mapper {

async fn process_messages(mapper: &mut Mapper, ops_dir: Option<&Path>) -> Result<(), MapperError> {
if let Some(path) = ops_dir {
let mut fs_notification_stream = fs_notify_stream(&[(
path,
None,
&[
FsEvent::DirectoryCreated,
FsEvent::FileCreated,
FsEvent::FileDeleted,
FsEvent::Modified,
],
)])?;
let mut fs_notification_stream = fs_notify_stream(&[path])?;

// Send health status to confirm the mapper initialization is completed
send_health_status(&mut mapper.output, &mapper.mapper_name).await;
Expand Down
12 changes: 1 addition & 11 deletions crates/extensions/tedge_file_system_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,7 @@ impl Actor for FsWatchActor {
async fn run(&mut self) -> Result<(), RuntimeError> {
let mut fs_notify = NotifyStream::try_default().unwrap();
for (watch_path, _) in self.messages.get_watch_dirs().iter() {
fs_notify
.add_watcher(
watch_path,
None,
&[
FsEvent::Modified,
FsEvent::FileDeleted,
FsEvent::FileCreated,
],
)
.unwrap();
fs_notify.add_watcher(watch_path).unwrap();
}

loop {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ Update configuration plugin config via cloud
Cumulocity.Should Support Configurations ${DEFAULT_CONFIG} /etc/tedge/tedge.toml system.toml CONFIG1 Config@2.0.0

Modify configuration plugin config via local filesystem modify inplace
[Documentation] Editing the c8y-configuration-plugin.toml locally requires the
... c8y-configuration-plugin service to be restarted
Skip msg="Fails due to https://github.com/thin-edge/thin-edge.io/issues/1896"
Cumulocity.Should Support Configurations ${DEFAULT_CONFIG} /etc/tedge/tedge.toml system.toml CONFIG1
Execute Command sed -i 's/CONFIG1/CONFIG3/g' /etc/tedge/c8y/c8y-configuration-plugin.toml
Cumulocity.Should Support Configurations ${DEFAULT_CONFIG} /etc/tedge/tedge.toml system.toml CONFIG3
Expand All @@ -96,7 +93,6 @@ Update configuration plugin config via local filesystem move (different director
Cumulocity.Should Support Configurations ${DEFAULT_CONFIG} /etc/tedge/tedge.toml system.toml CONFIG1 Config@2.0.0

Update configuration plugin config via local filesystem move (same directory)
Skip msg="Fails due to https://github.com/thin-edge/thin-edge.io/issues/1896"
Cumulocity.Should Support Configurations ${DEFAULT_CONFIG} /etc/tedge/tedge.toml system.toml CONFIG1
Transfer To Device ${CURDIR}/c8y-configuration-plugin-updated.toml /etc/tedge/c8y/
Execute Command mv /etc/tedge/c8y/c8y-configuration-plugin-updated.toml /etc/tedge/c8y/c8y-configuration-plugin.toml
Expand Down