Skip to content

Commit

Permalink
filewatch: if the Start() fails, clean up immediately
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Santos <nick.santos@docker.com>
  • Loading branch information
nicks committed Jul 8, 2022
1 parent 6549abe commit 523ac6d
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 12 deletions.
10 changes: 7 additions & 3 deletions internal/controllers/core/filewatch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ func (c *Controller) addOrReplace(ctx context.Context, name types.NamespacedName
status.Error = fmt.Sprintf("filewatch init: %v", err)
} else if err := notify.Start(); err != nil {
status.Error = fmt.Sprintf("filewatch init: %v", err)

// Close the notify immediately, but don't add it to the watcher object. The
// watcher object is still needed to handle backoff.
_ = notify.Close()
} else {
startFileChangeLoop = true
}
Expand All @@ -211,12 +215,12 @@ func (c *Controller) addOrReplace(ctx context.Context, name types.NamespacedName
existing.cleanupWatch(ctx)
}

ctx, cancel := context.WithCancel(ctx)
w.cancel = cancel

if startFileChangeLoop {
w.notify = notify
status.MonitorStartTime = apis.NowMicro()
ctx, cancel := context.WithCancel(ctx)
w.cancel = cancel

go c.dispatchFileChangesLoop(ctx, w)
}

Expand Down
25 changes: 25 additions & 0 deletions internal/controllers/core/filewatch/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,28 @@ func TestCreateSubError(t *testing.T) {
f.MustGet(key, &fw)
assert.Contains(t, fw.Status.Error, "filewatch init: Unusual watcher error")
}

func TestStartSubError(t *testing.T) {
f := newFixture(t)
maker := f.controller.fsWatcherMaker
var ffw *fsevent.FakeWatcher
f.controller.fsWatcherMaker = fsevent.WatcherMaker(func(paths []string, ignore watch.PathMatcher, l logger.Logger) (watch.Notify, error) {
w, err := maker(paths, ignore, l)
ffw = w.(*fsevent.FakeWatcher)
ffw.StartErr = fmt.Errorf("Unusual start error")
return w, err
})
key, _ := f.CreateSimpleFileWatch()

var fw filewatches.FileWatch
f.MustGet(key, &fw)
assert.Contains(t, fw.Status.Error, "filewatch init: Unusual start error")
assert.False(t, ffw.Running)

fw.Spec.WatchedPaths = []string{f.tmpdir.JoinPath("d")}
f.Update(&fw)

f.MustGet(key, &fw)
assert.Contains(t, fw.Status.Error, "filewatch init: Unusual start error")
assert.False(t, ffw.Running)
}
31 changes: 24 additions & 7 deletions internal/controllers/core/filewatch/fsevent/fake_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ func (w *FakeMultiWatcher) loop() {
for _, sub := range w.getSubs() {
close(sub)
}
}()

defer func() {
for _, sub := range w.getSubErrors() {
close(sub)
}
Expand All @@ -75,7 +73,7 @@ func (w *FakeMultiWatcher) loop() {
}
w.mu.Lock()
for _, watcher := range w.watchers {
if watcher.matches(e.Path()) {
if watcher.Running && watcher.matches(e.Path()) {
watcher.inboundCh <- e
}
}
Expand All @@ -95,11 +93,15 @@ type FakeWatcher struct {
inboundCh chan watch.FileEvent
outboundCh chan watch.FileEvent
errorCh chan error
closeCh chan bool

eventCount uint64

paths []string
ignore watch.PathMatcher

Running bool
StartErr error
}

func NewFakeWatcher(inboundCh chan watch.FileEvent, errorCh chan error, paths []string, ignore watch.PathMatcher) *FakeWatcher {
Expand All @@ -113,6 +115,7 @@ func NewFakeWatcher(inboundCh chan watch.FileEvent, errorCh chan error, paths []
errorCh: errorCh,
paths: paths,
ignore: ignore,
closeCh: make(chan bool),
}
}

Expand All @@ -131,11 +134,17 @@ func (w *FakeWatcher) matches(path string) bool {
}

func (w *FakeWatcher) Start() error {
w.Running = true
go w.loop()
if w.StartErr != nil {
return w.StartErr
}
return nil
}

func (w *FakeWatcher) Close() error {
close(w.closeCh)
<-w.outboundCh
return nil
}

Expand All @@ -156,15 +165,23 @@ func (w *FakeWatcher) QueuedCount() int {
}

func (w *FakeWatcher) loop() {
defer func() {
w.Running = false
close(w.outboundCh)
}()

var q []watch.FileEvent
for {
if len(q) == 0 {
e, ok := <-w.inboundCh
if !ok {
close(w.outboundCh)
select {
case e, ok := <-w.inboundCh:
if !ok {
return
}
q = append(q, e)
case <-w.closeCh:
return
}
q = append(q, e)
} else {
e := q[0]
w.outboundCh <- e
Expand Down
7 changes: 5 additions & 2 deletions internal/controllers/core/filewatch/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ func (w *watcher) cleanupWatch(ctx context.Context) {
if w.done {
return
}
if err := w.notify.Close(); err != nil {
logger.Get(ctx).Debugf("Failed to close notifier for %q: %v", w.name.String(), err)

if w.notify != nil {
if err := w.notify.Close(); err != nil {
logger.Get(ctx).Debugf("Failed to close notifier for %q: %v", w.name.String(), err)
}
}

w.restartBackoff *= 2
Expand Down

0 comments on commit 523ac6d

Please sign in to comment.