Skip to content

Commit

Permalink
Capture missed inotify events
Browse files Browse the repository at this point in the history
  • Loading branch information
jmshark committed Apr 24, 2023
1 parent 4f87e9c commit 4ca36fd
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 202 deletions.
250 changes: 71 additions & 179 deletions crates/common/tedge_utils/src/notify.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::hash::Hash;
use std::path::Path;
use std::path::PathBuf;

use notify::event::AccessKind;
use notify::event::AccessMode;
use notify::event::CreateKind;
use notify::event::DataChange;
use notify::event::ModifyKind;
use notify::event::RemoveKind;
use notify::Config;
Expand All @@ -16,6 +9,9 @@ use notify::INotifyWatcher;
use notify::RecommendedWatcher;
use notify::RecursiveMode;
use notify::Watcher;
use std::hash::Hash;
use std::path::Path;
use std::path::PathBuf;
use strum_macros::Display;
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::Receiver;
Expand Down Expand Up @@ -50,14 +46,9 @@ pub enum NotifyStreamError {
DuplicateWatcher { mask: FsEvent, path: PathBuf },
}

type DirPath = PathBuf;
type MaybeFileName = Option<String>;
type Metadata = HashMap<DirPath, HashMap<MaybeFileName, HashSet<FsEvent>>>;

pub struct NotifyStream {
watcher: INotifyWatcher,
pub rx: Receiver<(PathBuf, FsEvent)>,
metadata: Metadata,
}

impl NotifyStream {
Expand All @@ -70,142 +61,75 @@ impl NotifyStream {
move |res: Result<notify::Event, notify::Error>| {
futures::executor::block_on(async {
if let Ok(notify_event) = res {
match notify_event.kind {
EventKind::Modify(ModifyKind::Data(DataChange::Any)) => {
for path in notify_event.paths {
let _ = tx.send((path, FsEvent::Modified)).await;
}
}

EventKind::Access(AccessKind::Close(AccessMode::Write)) => {
for path in notify_event.paths {
let _ = tx.send((path, FsEvent::Modified)).await;
}
}
EventKind::Create(CreateKind::File) => {
for path in notify_event.paths {
let _ = tx.send((path, FsEvent::FileCreated)).await;
}
}
EventKind::Create(CreateKind::Folder) => {
for path in notify_event.paths {
let _ = tx.send((path, FsEvent::DirectoryCreated)).await;
let event = match notify_event.kind {
EventKind::Access(access_kind) => match access_kind {
AccessKind::Close(access_mode) => match access_mode {
AccessMode::Any | AccessMode::Other | AccessMode::Write => {
Some(FsEvent::Modified)
}
AccessMode::Read | AccessMode::Execute => None,
},
AccessKind::Any | AccessKind::Other => Some(FsEvent::Modified),
AccessKind::Read | AccessKind::Open(_) => None,
},
EventKind::Create(create_kind) => match create_kind {
CreateKind::File | CreateKind::Any | CreateKind::Other => {
Some(FsEvent::FileCreated)
}
}
EventKind::Remove(RemoveKind::File) => {
for path in notify_event.paths {
let _ = tx.send((path, FsEvent::FileDeleted)).await;
}
}
EventKind::Remove(RemoveKind::Folder) => {
for path in notify_event.paths {
let _ = tx.send((path, FsEvent::DirectoryDeleted)).await;
CreateKind::Folder => Some(FsEvent::DirectoryCreated),
},
EventKind::Modify(modify_kind) => match modify_kind {
ModifyKind::Data(_)
| ModifyKind::Metadata(_)
| ModifyKind::Name(_)
| ModifyKind::Any
| ModifyKind::Other => Some(FsEvent::Modified),
},
EventKind::Remove(remove_kind) => match remove_kind {
RemoveKind::File | RemoveKind::Any | RemoveKind::Other => {
Some(FsEvent::FileDeleted)
}
RemoveKind::Folder => Some(FsEvent::DirectoryDeleted),
},
EventKind::Any | EventKind::Other => Some(FsEvent::Modified),
};

if let Some(event) = event {
for path in notify_event.paths {
let _ = tx.send((path, event)).await;
}
_other => {}
}
}
})
},
Config::default(),
)?;
Ok(Self {
watcher,
rx,
metadata: HashMap::new(),
})
Ok(Self { watcher, rx })
}

#[cfg(test)]
fn get_metadata(&self) -> &Metadata {
&self.metadata
}
fn get_metadata_as_mut(&mut self) -> &mut Metadata {
&mut self.metadata
}

pub fn add_watcher(
&mut self,
dir_path: &Path,
file: Option<String>,
events: &[FsEvent],
) -> Result<(), NotifyStreamError> {
/// Will return an error if you try to watch a file/directory which doesn't exist
pub fn add_watcher(&mut self, dir_path: &Path) -> Result<(), NotifyStreamError> {
self.watcher.watch(dir_path, RecursiveMode::Recursive)?;

// we add the information to the metadata
let maybe_file_name_entry = self
.get_metadata_as_mut()
.entry(dir_path.to_path_buf())
.or_insert_with(HashMap::new);

let file_event_entry = maybe_file_name_entry
.entry(file)
.or_insert_with(HashSet::new);
for event in events {
file_event_entry.insert(*event);
}
Ok(())
}
}

pub fn fs_notify_stream(
input: &[(&Path, Option<String>, &[FsEvent])],
) -> Result<NotifyStream, NotifyStreamError> {
pub fn fs_notify_stream(input: &[&Path]) -> Result<NotifyStream, NotifyStreamError> {
let mut fs_notify = NotifyStream::try_default()?;
for (dir_path, watch, flags) in input {
fs_notify.add_watcher(dir_path, watch.to_owned(), flags)?;
for dir_path in input {
fs_notify.add_watcher(dir_path)?;
}
Ok(fs_notify)
}

#[cfg(test)]
mod notify_tests {
use std::collections::HashMap;
use std::sync::Arc;

mod tests {
use super::*;
use maplit::hashmap;
use std::{collections::HashMap, sync::Arc, time::Duration};
use tedge_test_utils::fs::TempTedgeDir;

use crate::notify::FsEvent;

use super::fs_notify_stream;
use super::NotifyStream;

/// This test:
/// Creates a duplicate watcher (same directory path, same file name, same event)
/// Adds a new event for the same directory path, same file name
/// Checks the duplicate event is not duplicated in the data structure
/// Checks the new event is in the data structure
#[test]
fn it_inserts_new_file_events_correctly() {
let ttd = TempTedgeDir::new();
let mut notify = NotifyStream::try_default().unwrap();
let maybe_file_name = Some("file_a".to_string());

notify
.add_watcher(ttd.path(), maybe_file_name.clone(), &[FsEvent::FileCreated])
.unwrap();
notify
.add_watcher(ttd.path(), maybe_file_name.clone(), &[FsEvent::FileCreated])
.unwrap();
notify
.add_watcher(ttd.path(), maybe_file_name.clone(), &[FsEvent::FileDeleted])
.unwrap();

let event_hashset = notify
.get_metadata()
.get(ttd.path())
.unwrap()
.get(&maybe_file_name)
.unwrap();

// assert no duplicate entry was created for the second insert and new event was added
// in total 2 events are expected: FileEvent::Created, FileEvent::Deleted
assert_eq!(event_hashset.len(), 2);
assert!(event_hashset.contains(&FsEvent::FileCreated));
assert!(event_hashset.contains(&FsEvent::FileDeleted));
}

async fn assert_rx_stream(
mut inputs: HashMap<String, Vec<FsEvent>>,
mut fs_notify: NotifyStream,
Expand Down Expand Up @@ -246,19 +170,7 @@ mod notify_tests {
String::from("file_b") => vec![FsEvent::FileCreated, FsEvent::Modified]
};

let stream = fs_notify_stream(&[
(
ttd.path(),
Some(String::from("file_a")),
&[FsEvent::FileCreated],
),
(
ttd.path(),
Some(String::from("file_b")),
&[FsEvent::FileCreated, FsEvent::Modified],
),
])
.unwrap();
let stream = fs_notify_stream(&[ttd.path(), ttd.path()]).unwrap();

let fs_notify_handler = tokio::task::spawn(async move {
assert_rx_stream(expected_events, stream).await;
Expand All @@ -278,29 +190,31 @@ mod notify_tests {
let ttd = Arc::new(TempTedgeDir::new());
let ttd_clone = ttd.clone();
let mut fs_notify = NotifyStream::try_default().unwrap();
fs_notify
.add_watcher(ttd.path(), None, &[FsEvent::FileCreated, FsEvent::Modified])
.unwrap();
fs_notify.add_watcher(ttd.path()).unwrap();

let expected_events = hashmap! {
String::from("file_a") => vec![FsEvent::FileCreated],
String::from("file_b") => vec![FsEvent::FileCreated, FsEvent::Modified],
String::from("file_c") => vec![FsEvent::FileCreated, FsEvent::FileDeleted],
};

let file_system_handler = tokio::spawn(async move {
ttd_clone.dir("dir_a");
ttd_clone.file("file_a");
ttd_clone.file("file_b"); //.with_raw_content("yo");
ttd_clone.file("file_c").delete();
});
let assert_file_events = tokio::spawn(async move {
let expected_events = hashmap! {
String::from("file_a") => vec![FsEvent::FileCreated],
String::from("file_b") => vec![FsEvent::FileCreated, FsEvent::Modified],
String::from("file_c") => vec![FsEvent::FileCreated, FsEvent::FileDeleted],
};

let fs_notify_handler = tokio::spawn(async move {
assert_rx_stream(expected_events, fs_notify).await;
let file_system_handler = async {
ttd_clone.dir("dir_a");
ttd_clone.file("file_a");
ttd_clone.file("file_b");
ttd_clone.file("file_c").delete();
};
let _ = tokio::join!(
assert_rx_stream(expected_events, fs_notify),
file_system_handler
);
});

fs_notify_handler.await.unwrap();
file_system_handler.await.unwrap();
tokio::time::timeout(Duration::from_secs(1), assert_file_events)
.await
.unwrap()
.unwrap();
}

#[tokio::test]
Expand All @@ -315,16 +229,7 @@ mod notify_tests {
String::from("file_c") => vec![FsEvent::FileCreated, FsEvent::FileDeleted]
};

let stream = fs_notify_stream(&[(
ttd.path(),
None,
&[
FsEvent::FileCreated,
FsEvent::Modified,
FsEvent::FileDeleted,
],
)])
.unwrap();
let stream = fs_notify_stream(&[ttd.path()]).unwrap();

let fs_notify_handler = tokio::task::spawn(async move {
assert_rx_stream(expected_events, stream).await;
Expand Down Expand Up @@ -359,21 +264,8 @@ mod notify_tests {
String::from("dir_d") => vec![FsEvent::DirectoryCreated],
};

let stream = fs_notify_stream(&[
(ttd_a.path(), None, &[FsEvent::FileCreated]),
(
ttd_b.path(),
None,
&[FsEvent::FileCreated, FsEvent::Modified],
),
(
ttd_c.path(),
None,
&[FsEvent::FileCreated, FsEvent::FileDeleted],
),
(ttd_d.path(), None, &[FsEvent::FileCreated]),
])
.unwrap();
let stream =
fs_notify_stream(&[ttd_a.path(), ttd_b.path(), ttd_c.path(), ttd_d.path()]).unwrap();

let fs_notify_handler = tokio::task::spawn(async move {
assert_rx_stream(expected_events, stream).await;
Expand Down
11 changes: 1 addition & 10 deletions crates/core/tedge_mapper/src/core/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,7 @@ impl Mapper {

async fn process_messages(mapper: &mut Mapper, ops_dir: Option<&Path>) -> Result<(), MapperError> {
if let Some(path) = ops_dir {
let mut fs_notification_stream = fs_notify_stream(&[(
path,
None,
&[
FsEvent::DirectoryCreated,
FsEvent::FileCreated,
FsEvent::FileDeleted,
FsEvent::Modified,
],
)])?;
let mut fs_notification_stream = fs_notify_stream(&[path])?;

// Send health status to confirm the mapper initialization is completed
send_health_status(&mut mapper.output, &mapper.mapper_name).await;
Expand Down
12 changes: 1 addition & 11 deletions crates/extensions/tedge_file_system_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,7 @@ impl Actor for FsWatchActor {
async fn run(&mut self) -> Result<(), RuntimeError> {
let mut fs_notify = NotifyStream::try_default().unwrap();
for (watch_path, _) in self.messages.get_watch_dirs().iter() {
fs_notify
.add_watcher(
watch_path,
None,
&[
FsEvent::Modified,
FsEvent::FileDeleted,
FsEvent::FileCreated,
],
)
.unwrap();
fs_notify.add_watcher(watch_path).unwrap();
}

loop {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ Update configuration plugin config via cloud
Modify configuration plugin config via local filesystem modify inplace
[Documentation] Editing the c8y-configuration-plugin.toml locally requires the
... c8y-configuration-plugin service to be restarted
Skip msg="Fails due to https://github.com/thin-edge/thin-edge.io/issues/1896"
Cumulocity.Should Support Configurations ${DEFAULT_CONFIG} /etc/tedge/tedge.toml system.toml CONFIG1
Execute Command sed -i 's/CONFIG1/CONFIG3/g' /etc/tedge/c8y/c8y-configuration-plugin.toml
Cumulocity.Should Support Configurations ${DEFAULT_CONFIG} /etc/tedge/tedge.toml system.toml CONFIG3
Expand Down Expand Up @@ -100,7 +99,6 @@ Update configuration plugin config via local filesystem move (different director
Update configuration plugin config via local filesystem move (same directory)
[Documentation] Repalacing the c8y-configuration-plugin.toml locally requires the
... c8y-configuration-plugin service to be restarted
Skip msg="Fails due to https://github.com/thin-edge/thin-edge.io/issues/1896"
Cumulocity.Should Support Configurations ${DEFAULT_CONFIG} /etc/tedge/tedge.toml system.toml CONFIG1
Transfer To Device ${CURDIR}/c8y-configuration-plugin-updated.toml /etc/tedge/c8y/
Execute Command mv /etc/tedge/c8y/c8y-configuration-plugin-updated.toml /etc/tedge/c8y/c8y-configuration-plugin.toml
Expand Down

0 comments on commit 4ca36fd

Please sign in to comment.