Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(scanner): make the watcher a little easier to reason about #394

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/gonic/gonic.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,13 @@ func main() {

defer logJob("scan watcher")()

done := make(chan struct{})
errgrp.Go(func() error {
<-ctx.Done()
scannr.CancelWatch()
done <- struct{}{}
return nil
})
return scannr.ExecuteWatch()
return scannr.ExecuteWatch(done)
})

errgrp.Go(func() error {
Expand Down
126 changes: 56 additions & 70 deletions scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ type Scanner struct {
tagReader tagcommon.Reader
excludePattern *regexp.Regexp
scanning *int32
watcher *fsnotify.Watcher
watchMap map[string]string // maps watched dirs back to root music dir
watchDone chan bool
}

func New(musicDirs []string, db *db.DB, multiValueSettings map[Tag]MultiValueSetting, tagReader tagcommon.Reader, excludePattern string) *Scanner {
Expand All @@ -52,22 +49,12 @@ func New(musicDirs []string, db *db.DB, multiValueSettings map[Tag]MultiValueSet
tagReader: tagReader,
excludePattern: excludePatternRegExp,
scanning: new(int32),
watchMap: make(map[string]string),
watchDone: make(chan bool),
}
}

func (s *Scanner) IsScanning() bool {
return atomic.LoadInt32(s.scanning) == 1
}

func (s *Scanner) StartScanning() bool {
return atomic.CompareAndSwapInt32(s.scanning, 0, 1)
}

func (s *Scanner) StopScanning() {
defer atomic.StoreInt32(s.scanning, 0)
}
func (s *Scanner) IsScanning() bool { return atomic.LoadInt32(s.scanning) == 1 }
func (s *Scanner) StartScanning() bool { return atomic.CompareAndSwapInt32(s.scanning, 0, 1) }
func (s *Scanner) StopScanning() { defer atomic.StoreInt32(s.scanning, 0) }

type ScanOptions struct {
IsFull bool
Expand All @@ -94,7 +81,7 @@ func (s *Scanner) ScanAndClean(opts ScanOptions) (*Context, error) {

for _, dir := range s.musicDirs {
err := filepath.WalkDir(dir, func(absPath string, d fs.DirEntry, err error) error {
return s.scanCallback(c, dir, absPath, d, err)
return s.scanCallback(c, absPath, d, err)
})
if err != nil {
return nil, fmt.Errorf("walk: %w", err)
Expand All @@ -121,93 +108,82 @@ func (s *Scanner) ScanAndClean(opts ScanOptions) (*Context, error) {
return c, errors.Join(c.errs...)
}

func (s *Scanner) ExecuteWatch() error {
var err error
s.watcher, err = fsnotify.NewWatcher()
func (s *Scanner) ExecuteWatch(done <-chan struct{}) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Printf("error creating watcher: %v\n", err)
return err
return fmt.Errorf("creating watcher: %w", err)
}
defer s.watcher.Close()
defer watcher.Close()

t := time.NewTimer(10 * time.Second)
if !t.Stop() {
<-t.C
}
const batchInterval = 10 * time.Second
batchT := time.NewTimer(batchInterval)
batchT.Stop()

for _, dir := range s.musicDirs {
err := filepath.WalkDir(dir, func(absPath string, d fs.DirEntry, err error) error {
return s.watchCallback(dir, absPath, d, err)
return watchCallback(watcher, absPath, d, err)
})
if err != nil {
log.Printf("error watching directory tree: %v\n", err)
continue
}
}

scanList := map[string]struct{}{}
batchSeen := map[string]struct{}{}
for {
select {
case <-t.C:
case <-batchT.C:
if !s.StartScanning() {
scanList = map[string]struct{}{}
break
}
for dirName := range scanList {
for absPath := range batchSeen {
c := &Context{
seenTracks: map[int]struct{}{},
seenAlbums: map[int]struct{}{},
isFull: false,
}
musicDirName := s.watchMap[dirName]
if musicDirName == "" {
musicDirName = s.watchMap[filepath.Dir(dirName)]
}
err = filepath.WalkDir(dirName, func(absPath string, d fs.DirEntry, err error) error {
return s.watchCallback(musicDirName, absPath, d, err)
err = filepath.WalkDir(absPath, func(absPath string, d fs.DirEntry, err error) error {
return watchCallback(watcher, absPath, d, err)
})
if err != nil {
log.Printf("error watching directory tree: %v\n", err)
continue
}
err = filepath.WalkDir(dirName, func(absPath string, d fs.DirEntry, err error) error {
return s.scanCallback(c, musicDirName, absPath, d, err)
err = filepath.WalkDir(absPath, func(absPath string, d fs.DirEntry, err error) error {
return s.scanCallback(c, absPath, d, err)
})
if err != nil {
log.Printf("error walking: %v", err)
continue
}
}
scanList = map[string]struct{}{}
s.StopScanning()
case event := <-s.watcher.Events:
var dirName string
clear(batchSeen)

case event := <-watcher.Events:
if event.Op&(fsnotify.Create|fsnotify.Write) == 0 {
break
}
if len(scanList) == 0 {
t.Reset(10 * time.Second)
}
fileInfo, err := os.Stat(event.Name)
if err != nil {
break
}
if fileInfo.IsDir() {
dirName = event.Name
batchSeen[event.Name] = struct{}{}
} else {
dirName = filepath.Dir(event.Name)
batchSeen[filepath.Dir(event.Name)] = struct{}{}
}
scanList[dirName] = struct{}{}
case err = <-s.watcher.Errors:
batchT.Reset(batchInterval)

case err = <-watcher.Errors:
log.Printf("error from watcher: %v\n", err)
case <-s.watchDone:

case <-done:
return nil
}
}
}

func (s *Scanner) CancelWatch() {
s.watchDone <- true
}

func (s *Scanner) watchCallback(dir string, absPath string, d fs.DirEntry, err error) error {
func watchCallback(watcher *fsnotify.Watcher, absPath string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
Expand All @@ -218,35 +194,31 @@ func (s *Scanner) watchCallback(dir string, absPath string, d fs.DirEntry, err e
eval, _ := filepath.EvalSymlinks(absPath)
return filepath.WalkDir(eval, func(subAbs string, d fs.DirEntry, err error) error {
subAbs = strings.Replace(subAbs, eval, absPath, 1)
return s.watchCallback(dir, subAbs, d, err)
return watchCallback(watcher, subAbs, d, err)
})
default:
return nil
}

if s.watchMap[absPath] == "" {
s.watchMap[absPath] = dir
err = s.watcher.Add(absPath)
if err := watcher.Add(absPath); err != nil {
return fmt.Errorf("add path to watcher: %w", err)
}
return err
return nil
}

func (s *Scanner) scanCallback(c *Context, dir string, absPath string, d fs.DirEntry, err error) error {
func (s *Scanner) scanCallback(c *Context, absPath string, d fs.DirEntry, err error) error {
if err != nil {
c.errs = append(c.errs, err)
return nil
}
if dir == absPath {
return nil
}

switch d.Type() {
case os.ModeDir:
case os.ModeSymlink:
eval, _ := filepath.EvalSymlinks(absPath)
return filepath.WalkDir(eval, func(subAbs string, d fs.DirEntry, err error) error {
subAbs = strings.Replace(subAbs, eval, absPath, 1)
return s.scanCallback(c, dir, subAbs, d, err)
return s.scanCallback(c, subAbs, d, err)
})
default:
return nil
Expand All @@ -260,7 +232,7 @@ func (s *Scanner) scanCallback(c *Context, dir string, absPath string, d fs.DirE
log.Printf("processing folder %q", absPath)

tx := s.db.Begin()
if err := s.scanDir(tx, c, dir, absPath); err != nil {
if err := s.scanDir(tx, c, absPath); err != nil {
c.errs = append(c.errs, fmt.Errorf("%q: %w", absPath, err))
tx.Rollback()
return nil
Expand All @@ -272,7 +244,12 @@ func (s *Scanner) scanCallback(c *Context, dir string, absPath string, d fs.DirE
return nil
}

func (s *Scanner) scanDir(tx *db.DB, c *Context, musicDir string, absPath string) error {
func (s *Scanner) scanDir(tx *db.DB, c *Context, absPath string) error {
musicDir, relPath := musicDirRelative(s.musicDirs, absPath)
if musicDir == absPath {
return nil
}

items, err := os.ReadDir(absPath)
if err != nil {
return err
Expand Down Expand Up @@ -300,7 +277,6 @@ func (s *Scanner) scanDir(tx *db.DB, c *Context, musicDir string, absPath string
}
}

relPath, _ := filepath.Rel(musicDir, absPath)
pdir, pbasename := filepath.Split(filepath.Dir(relPath))
var parent db.Album
if err := tx.Where("root_dir=? AND left_path=? AND right_path=?", musicDir, pdir, pbasename).Assign(db.Album{RootDir: musicDir, LeftPath: pdir, RightPath: pbasename}).FirstOrCreate(&parent).Error; err != nil {
Expand Down Expand Up @@ -701,3 +677,13 @@ func parseMulti(parser tagcommon.Info, setting MultiValueSetting, getMulti func(
}
return parts
}

func musicDirRelative(musicDirs []string, absPath string) (musicDir, relPath string) {
for _, musicDir := range musicDirs {
if strings.HasPrefix(absPath, musicDir) {
relPath, _ = filepath.Rel(musicDir, absPath)
return musicDir, relPath
}
}
return
}
17 changes: 17 additions & 0 deletions scanner/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,23 @@ func TestSymlinkedSubdiscs(t *testing.T) {
assert.NotZero(t, info.ModTime()) // track resolves
}

func TestSymlinkEscapesMusicDirs(t *testing.T) {
t.Parallel()
m := mockfs.NewWithDirs(t, []string{"scandir"})

require.NoError(t, os.MkdirAll(filepath.Join(m.TmpDir(), "otherdir", "artist", "album-test"), os.ModePerm))
require.NoError(t, os.Symlink(
filepath.Join(m.TmpDir(), "otherdir", "artist"),
filepath.Join(m.TmpDir(), "scandir", "artist"),
))

m.ScanAndClean()

var albums []*db.Album
require.NoError(t, m.DB().Find(&albums).Error)
require.Len(t, albums, 3)
}

func TestTagErrors(t *testing.T) {
t.Parallel()
m := mockfs.New(t)
Expand Down
Loading