Skip to content

Commit

Permalink
feat(os/gfsnotify): add recursive watching for created subfolders and…
Browse files Browse the repository at this point in the history
… sub-files under folders that already watched (#3830)
  • Loading branch information
gqcn authored Sep 30, 2024
1 parent 38622f9 commit 459c8ce
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 155 deletions.
5 changes: 3 additions & 2 deletions os/gfpool/gfpool_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (p *Pool) File() (*File, error) {
}
// It firstly checks using !p.init.Val() for performance purpose.
if !p.init.Val() && p.init.Cas(false, true) {
_, _ = gfsnotify.Add(f.path, func(event *gfsnotify.Event) {
var watchCallback = func(event *gfsnotify.Event) {
// If the file is removed or renamed, recreates the pool by increasing the pool id.
if event.IsRemove() || event.IsRename() {
// It drops the old pool.
Expand All @@ -110,7 +110,8 @@ func (p *Pool) File() (*File, error) {
// Whenever the pool id changes, the pool will be recreated.
p.id.Add(1)
}
}, false)
}
_, _ = gfsnotify.Add(f.path, watchCallback, gfsnotify.WatchOption{NoRecursive: true})
}
return f, nil
}
Expand Down
33 changes: 24 additions & 9 deletions os/gfsnotify/gfsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Callback struct {
Path string // Bound file path (absolute).
name string // Registered name for AddOnce.
elem *glist.Element // Element in the callbacks of watcher.
recursive bool // Is bound to path recursively or not.
recursive bool // Is bound to sub-path recursively or not.
}

// Event is the event produced by underlying fsnotify.
Expand All @@ -53,6 +53,15 @@ type Event struct {
Watcher *Watcher // Parent watcher.
}

// WatchOption holds the option for watching.
type WatchOption struct {
// NoRecursive explicitly specifies no recursive watching.
// Recursive watching will also watch all its current and following created subfolders and sub-files.
//
// Note that the recursive watching is enabled in default.
NoRecursive bool
}

// Op is the bits union for file operations.
type Op uint32

Expand All @@ -75,13 +84,15 @@ const (
var (
mu sync.Mutex // Mutex for concurrent safety of defaultWatcher.
defaultWatcher *Watcher // Default watcher.
callbackIdMap = gmap.NewIntAnyMap(true) // Id to callback mapping.
callbackIdMap = gmap.NewIntAnyMap(true) // Global callback id to callback function mapping.
callbackIdGenerator = gtype.NewInt() // Atomic id generator for callback.
)

// New creates and returns a new watcher.
// Note that the watcher number is limited by the file handle setting of the system.
// Eg: fs.inotify.max_user_instances system variable in linux systems.
// Example: fs.inotify.max_user_instances system variable in linux systems.
//
// In most case, you can use the default watcher for usage instead of creating one.
func New() (*Watcher, error) {
w := &Watcher{
cache: gcache.New(),
Expand All @@ -102,26 +113,30 @@ func New() (*Watcher, error) {
}

// Add monitors `path` using default watcher with callback function `callbackFunc`.
//
// The parameter `path` can be either a file or a directory path.
// The optional parameter `recursive` specifies whether monitoring the `path` recursively, which is true in default.
func Add(path string, callbackFunc func(event *Event), recursive ...bool) (callback *Callback, err error) {
func Add(path string, callbackFunc func(event *Event), option ...WatchOption) (callback *Callback, err error) {
w, err := getDefaultWatcher()
if err != nil {
return nil, err
}
return w.Add(path, callbackFunc, recursive...)
return w.Add(path, callbackFunc, option...)
}

// AddOnce monitors `path` using default watcher with callback function `callbackFunc` only once using unique name `name`.
// If AddOnce is called multiple times with the same `name` parameter, `path` is only added to monitor once. It returns error
// if it's called twice with the same `name`.
//
// If AddOnce is called multiple times with the same `name` parameter, `path` is only added to monitor once.
// It returns error if it's called twice with the same `name`.
//
// The parameter `path` can be either a file or a directory path.
// The optional parameter `recursive` specifies whether monitoring the `path` recursively, which is true in default.
func AddOnce(name, path string, callbackFunc func(event *Event), recursive ...bool) (callback *Callback, err error) {
func AddOnce(name, path string, callbackFunc func(event *Event), option ...WatchOption) (callback *Callback, err error) {
w, err := getDefaultWatcher()
if err != nil {
return nil, err
}
return w.AddOnce(name, path, callbackFunc, recursive...)
return w.AddOnce(name, path, callbackFunc, option...)
}

// Remove removes all monitoring callbacks of given `path` from watcher recursively.
Expand Down
6 changes: 2 additions & 4 deletions os/gfsnotify/gfsnotify_filefunc.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,12 @@ func doFileScanDir(path string, pattern string, recursive ...bool) ([]string, er
file, err = os.Open(path)
)
if err != nil {
err = gerror.Wrapf(err, `os.Open failed for path "%s"`, path)
return nil, err
return nil, gerror.Wrapf(err, `os.Open failed for path "%s"`, path)
}
defer file.Close()
names, err := file.Readdirnames(-1)
if err != nil {
err = gerror.Wrapf(err, `read directory files failed for path "%s"`, path)
return nil, err
return nil, gerror.Wrapf(err, `read directory files failed for path "%s"`, path)
}
filePath := ""
for _, name := range names {
Expand Down
143 changes: 72 additions & 71 deletions os/gfsnotify/gfsnotify_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ import (
)

// Add monitors `path` with callback function `callbackFunc` to the watcher.
//
// The parameter `path` can be either a file or a directory path.
// The optional parameter `recursive` specifies whether monitoring the `path` recursively,
// which is true in default.
func (w *Watcher) Add(path string, callbackFunc func(event *Event), recursive ...bool) (callback *Callback, err error) {
return w.AddOnce("", path, callbackFunc, recursive...)
func (w *Watcher) Add(
path string, callbackFunc func(event *Event), option ...WatchOption,
) (callback *Callback, err error) {
return w.AddOnce("", path, callbackFunc, option...)
}

// AddOnce monitors `path` with callback function `callbackFunc` only once using unique name
Expand All @@ -28,26 +32,40 @@ func (w *Watcher) Add(path string, callbackFunc func(event *Event), recursive ..
//
// It returns error if it's called twice with the same `name`.
//
// The parameter `path` can be either a file or a directory path.
// The optional parameter `recursive` specifies whether monitoring the `path` recursively,
// which is true in default.
func (w *Watcher) AddOnce(name, path string, callbackFunc func(event *Event), recursive ...bool) (callback *Callback, err error) {
func (w *Watcher) AddOnce(
name, path string, callbackFunc func(event *Event), option ...WatchOption,
) (callback *Callback, err error) {
var watchOption = w.getWatchOption(option...)
w.nameSet.AddIfNotExistFuncLock(name, func() bool {
// Firstly add the path to watcher.
callback, err = w.addWithCallbackFunc(name, path, callbackFunc, recursive...)
//
// A path can only be watched once; watching it more than once is a no-op and will
// not return an error.
callback, err = w.addWithCallbackFunc(
name, path, callbackFunc, option...,
)
if err != nil {
return false
}

// If it's recursive adding, it then adds all sub-folders to the monitor.
// NOTE:
// 1. It only recursively adds **folders** to the monitor, NOT files,
// because if the folders are monitored and their sub-files are also monitored.
// 2. It bounds no callbacks to the folders, because it will search the callbacks
// from its parent recursively if any event produced.
if fileIsDir(path) && (len(recursive) == 0 || recursive[0]) {
if fileIsDir(path) && !watchOption.NoRecursive {
for _, subPath := range fileAllDirs(path) {
if fileIsDir(subPath) {
if err = w.watcher.Add(subPath); err != nil {
err = gerror.Wrapf(err, `add watch failed for path "%s"`, subPath)
if watchAddErr := w.watcher.Add(subPath); watchAddErr != nil {
err = gerror.Wrapf(
err,
`add watch failed for path "%s", err: %s`,
subPath, watchAddErr.Error(),
)
} else {
intlog.Printf(context.TODO(), "watcher adds monitor for: %s", subPath)
}
Expand All @@ -62,25 +80,32 @@ func (w *Watcher) AddOnce(name, path string, callbackFunc func(event *Event), re
return
}

func (w *Watcher) getWatchOption(option ...WatchOption) WatchOption {
if len(option) > 0 {
return option[0]
}
return WatchOption{}
}

// addWithCallbackFunc adds the path to underlying monitor, creates and returns a callback object.
// Very note that if it calls multiple times with the same `path`, the latest one will overwrite the previous one.
func (w *Watcher) addWithCallbackFunc(name, path string, callbackFunc func(event *Event), recursive ...bool) (callback *Callback, err error) {
func (w *Watcher) addWithCallbackFunc(
name, path string, callbackFunc func(event *Event), option ...WatchOption,
) (callback *Callback, err error) {
var watchOption = w.getWatchOption(option...)
// Check and convert the given path to absolute path.
if t := fileRealPath(path); t == "" {
if realPath := fileRealPath(path); realPath == "" {
return nil, gerror.NewCodef(gcode.CodeInvalidParameter, `"%s" does not exist`, path)
} else {
path = t
path = realPath
}
// Create callback object.
callback = &Callback{
Id: callbackIdGenerator.Add(1),
Func: callbackFunc,
Path: path,
name: name,
recursive: true,
}
if len(recursive) > 0 {
callback.recursive = recursive[0]
recursive: !watchOption.NoRecursive,
}
// Register the callback to watcher.
w.callbacks.LockFunc(func(m map[string]interface{}) {
Expand Down Expand Up @@ -113,74 +138,50 @@ func (w *Watcher) Close() {
w.events.Close()
}

// Remove removes monitor and all callbacks associated with the `path` recursively.
// Remove removes watching and all callbacks associated with the `path` recursively.
// Note that, it's recursive in default if given `path` is a directory.
func (w *Watcher) Remove(path string) error {
// Firstly remove the callbacks of the path.
if value := w.callbacks.Remove(path); value != nil {
list := value.(*glist.List)
for {
if item := list.PopFront(); item != nil {
callbackIdMap.Remove(item.(*Callback).Id)
} else {
break
}
}
}
// Secondly remove monitor of all sub-files which have no callbacks.
if subPaths, err := fileScanDir(path, "*", true); err == nil && len(subPaths) > 0 {
for _, subPath := range subPaths {
if w.checkPathCanBeRemoved(subPath) {
if internalErr := w.watcher.Remove(subPath); internalErr != nil {
intlog.Errorf(context.TODO(), `%+v`, internalErr)
}
}
var (
err error
subPaths []string
removedPaths = make([]string, 0)
)
removedPaths = append(removedPaths, path)
if fileIsDir(path) {
subPaths, err = fileScanDir(path, "*", true)
if err != nil {
return err
}
removedPaths = append(removedPaths, subPaths...)
}
// Lastly remove the monitor of the path from underlying monitor.
err := w.watcher.Remove(path)
if err != nil {
err = gerror.Wrapf(err, `remove watch failed for path "%s"`, path)
}
return err
}

// checkPathCanBeRemoved checks whether the given path have no callbacks bound.
func (w *Watcher) checkPathCanBeRemoved(path string) bool {
// Firstly check the callbacks in the watcher directly.
if v := w.callbacks.Get(path); v != nil {
return false
}
// Secondly check its parent whether has callbacks.
dirPath := fileDir(path)
if v := w.callbacks.Get(dirPath); v != nil {
for _, c := range v.(*glist.List).FrontAll() {
if c.(*Callback).recursive {
return false
}
}
return false
}
// Recursively check its parent.
parentDirPath := ""
for {
parentDirPath = fileDir(dirPath)
if parentDirPath == dirPath {
break
}
if v := w.callbacks.Get(parentDirPath); v != nil {
for _, c := range v.(*glist.List).FrontAll() {
if c.(*Callback).recursive {
return false
for _, removedPath := range removedPaths {
// remove the callbacks of the path.
if value := w.callbacks.Remove(removedPath); value != nil {
list := value.(*glist.List)
for {
if item := list.PopFront(); item != nil {
callbackIdMap.Remove(item.(*Callback).Id)
} else {
break
}
}
return false
}
dirPath = parentDirPath
// remove the monitor of the path from underlying monitor.
if watcherRemoveErr := w.watcher.Remove(removedPath); watcherRemoveErr != nil {
err = gerror.Wrapf(
err,
`remove watch failed for path "%s", err: %s`,
removedPath, watcherRemoveErr.Error(),
)
}
}
return true
return err
}

// RemoveCallback removes callback with given callback id from watcher.
//
// Note that, it auto removes the path watching if there's no callback bound on it.
func (w *Watcher) RemoveCallback(callbackId int) {
callback := (*Callback)(nil)
if r := callbackIdMap.Get(callbackId); r != nil {
Expand Down
Loading

0 comments on commit 459c8ce

Please sign in to comment.