diff --git a/pkg/gitops/gitops.go b/pkg/gitops/gitops.go index d11222f59..8ff969b32 100644 --- a/pkg/gitops/gitops.go +++ b/pkg/gitops/gitops.go @@ -13,8 +13,8 @@ import ( "github.com/weaveworks/ignite/pkg/operations" "github.com/weaveworks/ignite/pkg/storage/cache" "github.com/weaveworks/ignite/pkg/storage/manifest" - "github.com/weaveworks/ignite/pkg/storage/watch/update" "github.com/weaveworks/ignite/pkg/util" + "github.com/weaveworks/ignite/pkg/util/watcher" ) var ( @@ -58,7 +58,7 @@ func RunLoop(url, branch string, paths []string) error { } var vm *api.VM - if upd.Event == update.EventDelete { + if upd.Event == watcher.EventDelete { // As we know this VM was deleted, it wouldn't show up in a Get() call // Construct a temporary VM object for passing to the delete function vm = &api.VM{ @@ -86,15 +86,15 @@ func RunLoop(url, branch string, paths []string) error { // TODO: Paralellization switch upd.Event { - case update.EventCreate: + case watcher.EventCreate: runHandle(func() error { return handleCreate(vm) }) - case update.EventModify: + case watcher.EventModify: runHandle(func() error { return handleChange(vm) }) - case update.EventDelete: + case watcher.EventDelete: runHandle(func() error { // TODO: Temporary VM Object for removal return handleDelete(vm) diff --git a/pkg/storage/sync/storage.go b/pkg/storage/sync/storage.go index 7714837f3..ffcc5d99d 100644 --- a/pkg/storage/sync/storage.go +++ b/pkg/storage/sync/storage.go @@ -9,7 +9,8 @@ import ( "github.com/weaveworks/ignite/pkg/storage" "github.com/weaveworks/ignite/pkg/storage/watch" "github.com/weaveworks/ignite/pkg/storage/watch/update" - "github.com/weaveworks/ignite/pkg/util" + "github.com/weaveworks/ignite/pkg/util/sync" + "github.com/weaveworks/ignite/pkg/util/watcher" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -28,7 +29,7 @@ type SyncStorage struct { storages []storage.Storage eventStream watch.AssociatedEventStream updateStream UpdateStream - monitor *util.Monitor + monitor *sync.Monitor } var _ storage.Storage = &SyncStorage{} @@ -48,7 +49,7 @@ func NewSyncStorage(rwStorage storage.Storage, wStorages ...storage.Storage) sto } if ss.eventStream != nil { - ss.monitor = util.RunMonitor(ss.monitorFunc) + ss.monitor = sync.RunMonitor(ss.monitorFunc) } return ss @@ -141,7 +142,7 @@ func (ss *SyncStorage) monitorFunc() { log.Debugf("SyncStorage: Received update %v %t", upd, ok) if ok { switch upd.Event { - case update.EventModify, update.EventCreate: + case watcher.EventModify, watcher.EventCreate: // First load the Object using the Storage given in the update, // then set it using the client constructed above updClient := client.NewClient(upd.Storage).Dynamic(upd.APIType.GetKind()) @@ -155,7 +156,7 @@ func (ss *SyncStorage) monitorFunc() { log.Errorf("Failed to set Object with UID %q: %v", upd.APIType.GetUID(), err) continue } - case update.EventDelete: + case watcher.EventDelete: // For deletion we use the generated "fake" APIType object if err := c.Dynamic(upd.APIType.GetKind()).Delete(upd.APIType.GetUID()); err != nil { log.Errorf("Failed to delete Object with UID %q: %v", upd.APIType.GetUID(), err) diff --git a/pkg/storage/watch/batcher.go b/pkg/storage/watch/batcher.go deleted file mode 100644 index 9612dc996..000000000 --- a/pkg/storage/watch/batcher.go +++ /dev/null @@ -1,69 +0,0 @@ -package watch - -import ( - "sync" - "time" - - log "github.com/sirupsen/logrus" -) - -func NewBatcher(duration time.Duration) *Batcher { - return &Batcher{ - duration: duration, - flushCh: make(chan struct{}), - syncMap: &sync.Map{}, - } -} - -type Batcher struct { - duration time.Duration - timer *time.Timer - flushCh chan struct{} - syncMap *sync.Map -} - -func (b *Batcher) Load(key interface{}) (value interface{}, ok bool) { - return b.syncMap.Load(key) -} - -func (b *Batcher) Store(key, value interface{}) { - // prevent the timer from firing as we're manipulating it now - b.cancelUnfiredTimer() - // store the key and the value as requested - log.Tracef("Batcher: Storing key %v and value %q, reset the timer.", key, value) - b.syncMap.Store(key, value) - // set the timer to fire after the duration, unless there's a new .Store call - b.dispatchAfterTimeout() -} - -func (b *Batcher) Close() { - log.Trace("Batcher: Closing the batch channel") - close(b.flushCh) -} - -func (b *Batcher) ProcessBatch(fn func(key, val interface{}) bool) bool { - if _, ok := <-b.flushCh; !ok { - // channel is closed - return false - } - log.Trace("Batcher: Received a flush for the batch. Dispatching it now.") - b.syncMap.Range(fn) - *b.syncMap = sync.Map{} - return true -} - -func (b *Batcher) cancelUnfiredTimer() { - // If the timer already exists; stop it - if b.timer != nil { - log.Tracef("Batcher: Cancelled timer") - b.timer.Stop() - b.timer = nil - } -} - -func (b *Batcher) dispatchAfterTimeout() { - b.timer = time.AfterFunc(b.duration, func() { - log.Tracef("Batcher: Dispatching a batch job") - b.flushCh <- struct{}{} - }) -} diff --git a/pkg/storage/watch/storage.go b/pkg/storage/watch/storage.go index 48867b61f..803980133 100644 --- a/pkg/storage/watch/storage.go +++ b/pkg/storage/watch/storage.go @@ -11,7 +11,8 @@ import ( "github.com/weaveworks/ignite/pkg/storage" "github.com/weaveworks/ignite/pkg/storage/manifest/raw" "github.com/weaveworks/ignite/pkg/storage/watch/update" - "github.com/weaveworks/ignite/pkg/util" + "github.com/weaveworks/ignite/pkg/util/sync" + "github.com/weaveworks/ignite/pkg/util/watcher" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/yaml" @@ -40,12 +41,12 @@ func NewGenericWatchStorage(storage storage.Storage) (WatchStorage, error) { var err error var files []string - if s.watcher, files, err = newWatcher(storage.RawStorage().Dir()); err != nil { + if s.watcher, files, err = watcher.NewFileWatcher(storage.RawStorage().Dir()); err != nil { return nil, err } if mapped, ok := s.RawStorage().(raw.MappedRawStorage); ok { - s.monitor = util.RunMonitor(func() { + s.monitor = sync.RunMonitor(func() { s.monitorFunc(mapped, files) // Offload the file registration to the goroutine }) } @@ -56,28 +57,28 @@ func NewGenericWatchStorage(storage storage.Storage) (WatchStorage, error) { // GenericWatchStorage implements the WatchStorage interface type GenericWatchStorage struct { storage.Storage - watcher *watcher + watcher *watcher.FileWatcher events *AssociatedEventStream - monitor *util.Monitor + monitor *sync.Monitor } var _ WatchStorage = &GenericWatchStorage{} // Suspend modify events during Set func (s *GenericWatchStorage) Set(gvk schema.GroupVersionKind, obj meta.Object) error { - s.watcher.suspend(update.EventModify) + s.watcher.Suspend(watcher.EventModify) return s.Storage.Set(gvk, obj) } // Suspend modify events during Patch func (s *GenericWatchStorage) Patch(gvk schema.GroupVersionKind, uid meta.UID, patch []byte) error { - s.watcher.suspend(update.EventModify) + s.watcher.Suspend(watcher.EventModify) return s.Storage.Patch(gvk, uid, patch) } // Suspend delete events during Delete func (s *GenericWatchStorage) Delete(gvk schema.GroupVersionKind, uid meta.UID) error { - s.watcher.suspend(update.EventDelete) + s.watcher.Suspend(watcher.EventDelete) return s.Storage.Delete(gvk, uid) } @@ -86,7 +87,7 @@ func (s *GenericWatchStorage) SetEventStream(eventStream AssociatedEventStream) } func (s *GenericWatchStorage) Close() { - s.watcher.close() + s.watcher.Close() s.monitor.Wait() } @@ -100,16 +101,16 @@ func (s *GenericWatchStorage) monitorFunc(mapped raw.MappedRawStorage, files []s } else { mapped.AddMapping(storage.NewKey(obj.GetKind(), obj.GetUID()), file) // Send the event to the events channel - s.sendEvent(update.EventModify, obj) + s.sendEvent(watcher.EventModify, obj) } } for { - if event, ok := <-s.watcher.updates; ok { + if event, ok := <-s.watcher.GetFileUpdateStream(); ok { var obj meta.Object var err error - if event.Event == update.EventDelete { + if event.Event == watcher.EventDelete { var key storage.Key if key, err = mapped.GetMapping(event.Path); err != nil { log.Warnf("Failed to retrieve data for %q: %v", event.Path, err) @@ -131,8 +132,8 @@ func (s *GenericWatchStorage) monitorFunc(mapped raw.MappedRawStorage, files []s continue } - // This is based on the key's existence instead of update.EventCreate, - // as Objects can get updated (via update.EventModify) to be conformant + // This is based on the key's existence instead of watcher.EventCreate, + // as Objects can get updated (via watcher.EventModify) to be conformant if _, err = mapped.GetMapping(event.Path); err != nil { mapped.AddMapping(storage.NewKey(obj.GetKind(), obj.GetUID()), event.Path) } @@ -146,7 +147,7 @@ func (s *GenericWatchStorage) monitorFunc(mapped raw.MappedRawStorage, files []s } } -func (s *GenericWatchStorage) sendEvent(event update.Event, obj meta.Object) { +func (s *GenericWatchStorage) sendEvent(event watcher.Event, obj meta.Object) { if s.events != nil { *s.events <- update.AssociatedUpdate{ Update: update.Update{ diff --git a/pkg/storage/watch/update/associated.go b/pkg/storage/watch/update/associated.go deleted file mode 100644 index 239db858d..000000000 --- a/pkg/storage/watch/update/associated.go +++ /dev/null @@ -1,13 +0,0 @@ -package update - -import ( - "github.com/weaveworks/ignite/pkg/storage" -) - -// AssociatedUpdate bundles together an Update and a Storage -// implementation. This is used by SyncStorage to query the -// correct Storage for the updated Object. -type AssociatedUpdate struct { - Update - Storage storage.Storage -} diff --git a/pkg/storage/watch/update/event_test.go b/pkg/storage/watch/update/event_test.go deleted file mode 100644 index f323b9c6f..000000000 --- a/pkg/storage/watch/update/event_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package update - -import ( - "reflect" - "sort" - "testing" -) - -func TestEventSort(t *testing.T) { - e1 := Events{EventDelete, EventModify, EventCreate} - e2 := Events{EventCreate, EventDelete, EventModify} - - sort.Sort(e1) - sort.Sort(e2) - if !reflect.DeepEqual(e1, e2) { - t.Errorf("events do not match: %v %v", e1, e2) - } -} diff --git a/pkg/storage/watch/update/file.go b/pkg/storage/watch/update/file.go deleted file mode 100644 index 706bb975f..000000000 --- a/pkg/storage/watch/update/file.go +++ /dev/null @@ -1,8 +0,0 @@ -package update - -// FileUpdate is used by watchers to -// signal the state change of a file. -type FileUpdate struct { - Event Event - Path string -} diff --git a/pkg/storage/watch/update/update.go b/pkg/storage/watch/update/update.go index 41e054c79..d65c47ee7 100644 --- a/pkg/storage/watch/update/update.go +++ b/pkg/storage/watch/update/update.go @@ -2,11 +2,21 @@ package update import ( meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1" + "github.com/weaveworks/ignite/pkg/storage" + "github.com/weaveworks/ignite/pkg/util/watcher" ) // Update bundles an Event with an // APIType for Storage retrieval. type Update struct { - Event Event + Event watcher.Event APIType meta.Object } + +// AssociatedUpdate bundles together an Update and a Storage +// implementation. This is used by SyncStorage to query the +// correct Storage for the updated Object. +type AssociatedUpdate struct { + Update + Storage storage.Storage +} diff --git a/pkg/storage/watch/watch.go b/pkg/storage/watch/watch.go deleted file mode 100644 index 934d1c24a..000000000 --- a/pkg/storage/watch/watch.go +++ /dev/null @@ -1,348 +0,0 @@ -package watch - -import ( - "bytes" - "os" - "path/filepath" - "time" - - "github.com/rjeczalik/notify" - log "github.com/sirupsen/logrus" - "github.com/weaveworks/ignite/pkg/storage" - "github.com/weaveworks/ignite/pkg/storage/watch/update" - "github.com/weaveworks/ignite/pkg/util" -) - -const eventBuffer = 4096 // How many events and updates we can buffer before watching is interrupted -const dispatchDuration = 1 * time.Second // Duration to wait after last event before dispatching grouped inotify events -var excludeDirs = []string{".git"} -var listenEvents = []notify.Event{notify.InCreate, notify.InDelete, notify.InDeleteSelf, notify.InCloseWrite} - -var eventMap = map[notify.Event]update.Event{ - notify.InCreate: update.EventCreate, - notify.InDelete: update.EventDelete, - notify.InCloseWrite: update.EventModify, -} - -// combinedEvent describes multiple events that should be concatenated into a single event -type combinedEvent struct { - input []byte // input is a slice of events to match (in bytes, it speeds up the comparison) - output update.Event // output is the resulting event that should be returned -} - -// combinedEvents describes the event combinations to concatenate, -// this is iterated in order, so the longest matches should be first -var combinedEvents = []combinedEvent{ - // DELETE + CREATE + MODIFY => MODIFY - {update.Events{update.EventDelete, update.EventCreate, update.EventModify}.Bytes(), update.EventModify}, - // CREATE + MODIFY => CREATE - {update.Events{update.EventCreate, update.EventModify}.Bytes(), update.EventCreate}, - // CREATE + DELETE => NONE - {update.Events{update.EventCreate, update.EventDelete}.Bytes(), update.EventNone}, -} - -// Suppress duplicate events registered in this map. E.g. directory deletion -// fires two DELETE events, one for the parent and one for the deleted directory itself -var suppressDuplicates = map[update.Event]bool{ - update.EventCreate: true, - update.EventDelete: true, -} - -type eventStream chan notify.EventInfo -type UpdateStream chan *update.FileUpdate -type watches []string - -// watcher recursively monitors changes in files in the given directory -// and sends out events based on their state changes. Only files conforming -// to validSuffix are monitored. The watcher can be suspended for a single -// event at a time to eliminate updates by WatchStorage causing a loop. -type watcher struct { - dir string - events eventStream - updates UpdateStream - watches watches - suspendEvent update.Event - monitor *util.Monitor - dispatcher *util.Monitor - // the batcher is used for properly sending many concurrent inotify events - // as a group, after a specified timeout. This fixes the issue of one single - // file operation being registered as many different inotify events - batcher *Batcher -} - -func (w *watcher) addWatch(path string) (err error) { - log.Tracef("Watcher: Adding watch for %q", path) - if err = notify.Watch(path, w.events, listenEvents...); err == nil { - w.watches = append(w.watches, path) - } - - return -} - -func (w *watcher) hasWatch(path string) bool { - for _, watch := range w.watches { - if watch == path { - log.Tracef("Watcher: Watch found for %q", path) - return true - } - } - - log.Tracef("Watcher: No watch found for %q", path) - return false -} - -func (w *watcher) clear() { - log.Tracef("Watcher: Clearing all watches") - notify.Stop(w.events) - w.watches = w.watches[:0] -} - -// newWatcher returns a list of files in the watched directory in -// addition to the generated watcher, it can be used to populate -// MappedRawStorage fileMappings -func newWatcher(dir string) (w *watcher, files []string, err error) { - w = &watcher{ - dir: dir, - events: make(eventStream, eventBuffer), - updates: make(UpdateStream, eventBuffer), - batcher: NewBatcher(dispatchDuration), - } - - if err = w.start(&files); err != nil { - notify.Stop(w.events) - } else { - w.monitor = util.RunMonitor(w.monitorFunc) - w.dispatcher = util.RunMonitor(w.dispatchFunc) - } - - return -} - -// start discovers all subdirectories and adds paths to -// notify before starting the monitoring goroutine -func (w *watcher) start(files *[]string) error { - return filepath.Walk(w.dir, - func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - - if info.IsDir() { - for _, dir := range excludeDirs { - if info.Name() == dir { - return filepath.SkipDir // Skip excluded directories - } - } - - return w.addWatch(path) - } - - if files != nil { - // Only include files with a valid suffix - if validSuffix(info.Name()) { - *files = append(*files, path) - } - } - - return nil - }) -} - -func (w *watcher) monitorFunc() { - log.Debug("Watcher: Monitoring thread started") - defer log.Debug("Watcher: Monitoring thread stopped") - defer close(w.updates) // Close the update stream after the watcher has stopped - - for { - event, ok := <-w.events - if !ok { - return - } - - updateEvent := convertEvent(event.Event()) - if updateEvent == w.suspendEvent { - w.suspendEvent = 0 - log.Debugf("Watcher: Skipping suspended event %s for path: %q", updateEvent, event.Path()) - continue // Skip the suspended event - } - - // Suppress successive duplicate events registered in suppressDuplicates - if suppressEvent(event.Path(), updateEvent) { - log.Debugf("Watcher: Skipping suppressed event %s for path: %q", updateEvent, event.Path()) - continue // Skip the suppressed event - } - - // Directory bypass for watcher registration - // The watcher registration/deletion needs to be as fast as - // possible, bypass the batcher when dealing with directories - if w.handleDirEvent(event.Path(), updateEvent) { - continue // The event path matched a directory, skip file processing for the event - } - - // Get any events registered for the specific file, and append the specified event - var eventList update.Events - if val, ok := w.batcher.Load(event.Path()); ok { - eventList = val.(update.Events) - } - - eventList = append(eventList, updateEvent) - - // Register the event in the map, and dispatch all the events at once after the timeout - w.batcher.Store(event.Path(), eventList) - log.Debugf("Watcher: Registered inotify events %v for path %q", eventList, event.Path()) - } -} - -func (w *watcher) dispatchFunc() { - log.Debug("Watcher: Dispatch thread started") - defer log.Debug("Watcher: Dispatch thread stopped") - - for { - // Wait until we have a batch dispatched to us - if !w.batcher.ProcessBatch(func(key, val interface{}) bool { - filePath := key.(string) - - // Concatenate all known events, and dispatch them to be handled one by one - for _, event := range concatenateEvents(val.(update.Events)) { - w.handleEvent(filePath, event) - } - - // Continue traversing the map - return true - }) { - return // The batcher is closed, stop processing - } - - log.Debug("Watcher: Dispatched events batch and reset the events cache") - } -} - -func (w *watcher) handleEvent(filePath string, event update.Event) { - switch event { - case update.EventCreate, update.EventDelete, update.EventModify: // Ignore EventNone - // only care about valid files - if !validSuffix(filePath) { - return - } - - log.Debugf("Watcher: Sending update: %s -> %q", event, filePath) - w.updates <- &update.FileUpdate{ - Event: event, - Path: filePath, - } - } -} - -func (w *watcher) handleDirEvent(filePath string, event update.Event) (dir bool) { - switch event { - case update.EventCreate: - fi, err := os.Stat(filePath) - if err != nil { - log.Errorf("Watcher: Failed to stat %q: %v", filePath, err) - return - } - - if fi.IsDir() { - if err := w.addWatch(filePath); err != nil { - log.Errorf("Watcher: Failed to add %q: %v", filePath, err) - } - - dir = true - } - case update.EventDelete: - if w.hasWatch(filePath) { - w.clear() - if err := w.start(nil); err != nil { - log.Errorf("Watcher: Failed to re-initialize watches for %q", w.dir) - } - - dir = true - } - } - - return -} - -// TODO: This watcher doesn't handle multiple operations on the same file well -// DELETE+CREATE+MODIFY => MODIFY -// CREATE+MODIFY => CREATE -// Fix this by caching the operations on the same file, and one second after all operations -// have been "written"; go through the changes and interpret the combinations of events properly -// This maybe will allow us to remove the "suspend" functionality? I don't know yet - -func (w *watcher) close() { - notify.Stop(w.events) - w.batcher.Close() - close(w.events) // Close the event stream - w.monitor.Wait() - w.dispatcher.Wait() -} - -// This enables a one-time suspend of the given event, -// the watcher will skip the given event once -func (w *watcher) suspend(updateEvent update.Event) { - w.suspendEvent = updateEvent -} - -func convertEvent(event notify.Event) update.Event { - if updateEvent, ok := eventMap[event]; ok { - return updateEvent - } - - return update.EventNone -} - -// validSuffix is used to filter out all unsupported -// files based on the extensions in storage.Formats -func validSuffix(path string) bool { - for suffix := range storage.Formats { - if filepath.Ext(path) == suffix { - return true - } - } - - return false -} - -// concatenateEvents takes in a slice of events and concatenates -// all events possible based on combinedEvents -func concatenateEvents(events update.Events) update.Events { - if len(events) < 2 { - return events // Quick return for 0 or 1 event - } - - for _, combinedEvent := range combinedEvents { - if len(combinedEvent.input) > len(events) { - continue // The combined event's match is too long - } - - // Test if the prefix of the given events matches combinedEvent.input - if bytes.Equal(events.Bytes()[:len(combinedEvent.input)], combinedEvent.input) { - // If so, replace combinedEvent.input prefix in events with combinedEvent.output and recurse - concatenated := append(update.Events{combinedEvent.output}, events[len(combinedEvent.input):]...) - log.Tracef("Watcher: Concatenated events: %v -> %v", events, concatenated) - return concatenateEvents(concatenated) - } - } - - return events -} - -var suppressCache struct { - event update.Event - path string -} - -// suppressEvent returns true it it's called twice -// in a row with the same known event and path -func suppressEvent(path string, event update.Event) (s bool) { - if _, ok := suppressDuplicates[event]; ok { - if suppressCache.event == event && suppressCache.path == path { - s = true - } - } - - suppressCache.event = event - suppressCache.path = path - return -} diff --git a/pkg/storage/watch/watcher_test.go b/pkg/storage/watch/watcher_test.go deleted file mode 100644 index 59f2546ca..000000000 --- a/pkg/storage/watch/watcher_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package watch - -import ( - "github.com/weaveworks/ignite/pkg/storage/watch/update" - "reflect" - "testing" -) - -var testEvents = []update.Events{ - { - update.EventDelete, - update.EventCreate, - update.EventModify, - }, - { - update.EventCreate, - update.EventModify, - update.EventDelete, - }, - { - update.EventCreate, - update.EventModify, - update.EventDelete, - update.EventCreate, - }, -} - -var targets = []update.Events{ - { - update.EventModify, - }, - { - update.EventNone, - }, - { - update.EventNone, - update.EventCreate, - }, -} - -func TestEventConcatenation(t *testing.T) { - for i, e := range testEvents { - result := concatenateEvents(e) - if !reflect.DeepEqual(result, targets[i]) { - t.Errorf("wrong concatenation result: %v != %v", result, targets[i]) - } - } -} diff --git a/pkg/util/sync/batcher.go b/pkg/util/sync/batcher.go new file mode 100644 index 000000000..d3855b254 --- /dev/null +++ b/pkg/util/sync/batcher.go @@ -0,0 +1,85 @@ +package sync + +import ( + "sync" + "time" + + log "github.com/sirupsen/logrus" +) + +// NewBatchWriter creates a new BatchWriter +func NewBatchWriter(duration time.Duration) *BatchWriter { + return &BatchWriter{ + duration: duration, + flushCh: make(chan struct{}), + syncMap: &sync.Map{}, + } +} + +// BatchWriter is a struct that wraps a concurrent sync.Map +// and dispatches all writes to it at once, a specific +// duration (e.g. 1s) after the last write was performed. This +// allows for 100s of concurrent writes in milliseconds to the +// same map in one sending goroutine; and one receiving goroutine +// which can process the result after all the writes are done. +type BatchWriter struct { + duration time.Duration + timer *time.Timer + flushCh chan struct{} + syncMap *sync.Map +} + +// Load reads the key from the map +func (b *BatchWriter) Load(key interface{}) (value interface{}, ok bool) { + return b.syncMap.Load(key) +} + +// Store writes the value for the specified key to the map +// If no other .Store call is made during the specified duration, +// flushCh is invoked and ProcessBatch unblocks in the other goroutine +func (b *BatchWriter) Store(key, value interface{}) { + // prevent the timer from firing as we're manipulating it now + b.cancelUnfiredTimer() + // store the key and the value as requested + log.Tracef("BatchWriter: Storing key %v and value %q, reset the timer.", key, value) + b.syncMap.Store(key, value) + // set the timer to fire after the duration, unless there's a new .Store call + b.dispatchAfterTimeout() +} + +// Close closes the underlying channel +func (b *BatchWriter) Close() { + log.Trace("BatchWriter: Closing the batch channel") + close(b.flushCh) +} + +// ProcessBatch is effectively a Range over the sync.Map, once a batch write is +// released. This should be used in the receiving goroutine. The internal map is +// reset after this call, so be sure to capture all the contents if needed. This +// function returns false if Close() has been called. +func (b *BatchWriter) ProcessBatch(fn func(key, val interface{}) bool) bool { + if _, ok := <-b.flushCh; !ok { + // channel is closed + return false + } + log.Trace("BatchWriter: Received a flush for the batch. Dispatching it now.") + b.syncMap.Range(fn) + *b.syncMap = sync.Map{} + return true +} + +func (b *BatchWriter) cancelUnfiredTimer() { + // If the timer already exists; stop it + if b.timer != nil { + log.Tracef("BatchWriter: Cancelled timer") + b.timer.Stop() + b.timer = nil + } +} + +func (b *BatchWriter) dispatchAfterTimeout() { + b.timer = time.AfterFunc(b.duration, func() { + log.Tracef("BatchWriter: Dispatching a batch job") + b.flushCh <- struct{}{} + }) +} diff --git a/pkg/storage/watch/batcher_test.go b/pkg/util/sync/batcher_test.go similarity index 93% rename from pkg/storage/watch/batcher_test.go rename to pkg/util/sync/batcher_test.go index 81f149b4a..ff4e7f786 100644 --- a/pkg/storage/watch/batcher_test.go +++ b/pkg/util/sync/batcher_test.go @@ -1,4 +1,4 @@ -package watch +package sync import ( "fmt" @@ -14,9 +14,9 @@ type job struct { event string } -func TestBatcher(t *testing.T) { +func TestBatchWriter(t *testing.T) { ch := make(chan job) - b := NewBatcher(1 * time.Second) + b := NewBatchWriter(1 * time.Second) go func() { for i := 0; i < 10; i++ { fmt.Println(i) diff --git a/pkg/util/monitor.go b/pkg/util/sync/monitor.go similarity index 96% rename from pkg/util/monitor.go rename to pkg/util/sync/monitor.go index dd0149baa..f09c55ca4 100644 --- a/pkg/util/monitor.go +++ b/pkg/util/sync/monitor.go @@ -1,4 +1,4 @@ -package util +package sync import "sync" diff --git a/pkg/storage/watch/update/event.go b/pkg/util/watcher/event.go similarity index 85% rename from pkg/storage/watch/update/event.go rename to pkg/util/watcher/event.go index 7ac35161b..6090aa2b8 100644 --- a/pkg/storage/watch/update/event.go +++ b/pkg/util/watcher/event.go @@ -1,4 +1,4 @@ -package update +package watcher import ( "fmt" @@ -51,3 +51,10 @@ func (e Events) Bytes() []byte { return b } + +// FileUpdate is used by watchers to +// signal the state change of a file. +type FileUpdate struct { + Event Event + Path string +} diff --git a/pkg/util/watcher/filewatcher.go b/pkg/util/watcher/filewatcher.go new file mode 100644 index 000000000..d114d64ab --- /dev/null +++ b/pkg/util/watcher/filewatcher.go @@ -0,0 +1,372 @@ +package watcher + +import ( + "bytes" + "os" + "path/filepath" + "time" + + "github.com/rjeczalik/notify" + log "github.com/sirupsen/logrus" + "github.com/weaveworks/ignite/pkg/util/sync" +) + +const eventBuffer = 4096 // How many events and updates we can buffer before watching is interrupted +var listenEvents = []notify.Event{notify.InCreate, notify.InDelete, notify.InDeleteSelf, notify.InCloseWrite} + +var eventMap = map[notify.Event]Event{ + notify.InCreate: EventCreate, + notify.InDelete: EventDelete, + notify.InCloseWrite: EventModify, +} + +// combinedEvent describes multiple events that should be concatenated into a single event +type combinedEvent struct { + input []byte // input is a slice of events to match (in bytes, it speeds up the comparison) + output Event // output is the resulting event that should be returned +} + +// combinedEvents describes the event combinations to concatenate, +// this is iterated in order, so the longest matches should be first +var combinedEvents = []combinedEvent{ + // DELETE + CREATE + MODIFY => MODIFY + {Events{EventDelete, EventCreate, EventModify}.Bytes(), EventModify}, + // CREATE + MODIFY => CREATE + {Events{EventCreate, EventModify}.Bytes(), EventCreate}, + // CREATE + DELETE => NONE + {Events{EventCreate, EventDelete}.Bytes(), EventNone}, +} + +// Suppress duplicate events registered in this map. E.g. directory deletion +// fires two DELETE events, one for the parent and one for the deleted directory itself +var suppressDuplicates = map[Event]bool{ + EventCreate: true, + EventDelete: true, +} + +type eventStream chan notify.EventInfo +type FileUpdateStream chan *FileUpdate +type watches []string + +// Options specifies options for the FileWatcher +type Options struct { + // ExcludeDirs specifies what directories to not watch + ExcludeDirs []string + // BatchTimeout specifies the duration to wait after last event before dispatching grouped inotify events + BatchTimeout time.Duration + // ValidExtensions specifies what file extensions to look at + ValidExtensions []string +} + +// DefaultOptions returns the default options +func DefaultOptions() Options { + return Options{ + ExcludeDirs: []string{".git"}, + BatchTimeout: 1 * time.Second, + ValidExtensions: []string{".yaml", ".yml", ".json"}, + } +} + +// NewFileWatcher returns a list of files in the watched directory in +// addition to the generated FileWatcher, it can be used to populate +// MappedRawStorage fileMappings +func NewFileWatcher(dir string) (w *FileWatcher, files []string, err error) { + return NewFileWatcherWithOptions(dir, DefaultOptions()) +} + +// NewFileWatcher returns a list of files in the watched directory in +// addition to the generated FileWatcher, it can be used to populate +// MappedRawStorage fileMappings +func NewFileWatcherWithOptions(dir string, opts Options) (w *FileWatcher, files []string, err error) { + w = &FileWatcher{ + dir: dir, + events: make(eventStream, eventBuffer), + updates: make(FileUpdateStream, eventBuffer), + batcher: sync.NewBatchWriter(opts.BatchTimeout), + opts: opts, + } + + if err = w.start(&files); err != nil { + notify.Stop(w.events) + } else { + w.monitor = sync.RunMonitor(w.monitorFunc) + w.dispatcher = sync.RunMonitor(w.dispatchFunc) + } + + return +} + +// FileWatcher recursively monitors changes in files in the given directory +// and sends out events based on their state changes. Only files conforming +// to validSuffix are monitored. The FileWatcher can be suspended for a single +// event at a time to eliminate updates by WatchStorage causing a loop. +type FileWatcher struct { + dir string + events eventStream + updates FileUpdateStream + watches watches + suspendEvent Event + monitor *sync.Monitor + dispatcher *sync.Monitor + opts Options + // the batcher is used for properly sending many concurrent inotify events + // as a group, after a specified timeout. This fixes the issue of one single + // file operation being registered as many different inotify events + batcher *sync.BatchWriter +} + +func (w *FileWatcher) addWatch(path string) (err error) { + log.Tracef("FileWatcher: Adding watch for %q", path) + if err = notify.Watch(path, w.events, listenEvents...); err == nil { + w.watches = append(w.watches, path) + } + + return +} + +func (w *FileWatcher) hasWatch(path string) bool { + for _, watch := range w.watches { + if watch == path { + log.Tracef("FileWatcher: Watch found for %q", path) + return true + } + } + + log.Tracef("FileWatcher: No watch found for %q", path) + return false +} + +func (w *FileWatcher) clear() { + log.Tracef("FileWatcher: Clearing all watches") + notify.Stop(w.events) + w.watches = w.watches[:0] +} + +// start discovers all subdirectories and adds paths to +// notify before starting the monitoring goroutine +func (w *FileWatcher) start(files *[]string) error { + return filepath.Walk(w.dir, + func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if info.IsDir() { + for _, dir := range w.opts.ExcludeDirs { + if info.Name() == dir { + return filepath.SkipDir // Skip excluded directories + } + } + + return w.addWatch(path) + } + + if files != nil { + // Only include files with a valid suffix + if w.validSuffix(info.Name()) { + *files = append(*files, path) + } + } + + return nil + }) +} + +func (w *FileWatcher) monitorFunc() { + log.Debug("FileWatcher: Monitoring thread started") + defer log.Debug("FileWatcher: Monitoring thread stopped") + defer close(w.updates) // Close the update stream after the FileWatcher has stopped + + for { + event, ok := <-w.events + if !ok { + return + } + + updateEvent := convertEvent(event.Event()) + if updateEvent == w.suspendEvent { + w.suspendEvent = 0 + log.Debugf("FileWatcher: Skipping suspended event %s for path: %q", updateEvent, event.Path()) + continue // Skip the suspended event + } + + // Suppress successive duplicate events registered in suppressDuplicates + if suppressEvent(event.Path(), updateEvent) { + log.Debugf("FileWatcher: Skipping suppressed event %s for path: %q", updateEvent, event.Path()) + continue // Skip the suppressed event + } + + // Directory bypass for FileWatcher registration + // The FileWatcher registration/deletion needs to be as fast as + // possible, bypass the batcher when dealing with directories + if w.handleDirEvent(event.Path(), updateEvent) { + continue // The event path matched a directory, skip file processing for the event + } + + // Get any events registered for the specific file, and append the specified event + var eventList Events + if val, ok := w.batcher.Load(event.Path()); ok { + eventList = val.(Events) + } + + eventList = append(eventList, updateEvent) + + // Register the event in the map, and dispatch all the events at once after the timeout + w.batcher.Store(event.Path(), eventList) + log.Debugf("FileWatcher: Registered inotify events %v for path %q", eventList, event.Path()) + } +} + +func (w *FileWatcher) dispatchFunc() { + log.Debug("FileWatcher: Dispatch thread started") + defer log.Debug("FileWatcher: Dispatch thread stopped") + + for { + // Wait until we have a batch dispatched to us + ok := w.batcher.ProcessBatch(func(key, val interface{}) bool { + filePath := key.(string) + + // Concatenate all known events, and dispatch them to be handled one by one + for _, event := range concatenateEvents(val.(Events)) { + w.handleEvent(filePath, event) + } + + // Continue traversing the map + return true + }) + if !ok { + return // The BatchWriter channel is closed, stop processing + } + + log.Debug("FileWatcher: Dispatched events batch and reset the events cache") + } +} + +func (w *FileWatcher) handleEvent(filePath string, event Event) { + switch event { + case EventCreate, EventDelete, EventModify: // Ignore EventNone + // only care about valid files + if !w.validSuffix(filePath) { + return + } + + log.Debugf("FileWatcher: Sending update: %s -> %q", event, filePath) + w.updates <- &FileUpdate{ + Event: event, + Path: filePath, + } + } +} + +func (w *FileWatcher) handleDirEvent(filePath string, event Event) (dir bool) { + switch event { + case EventCreate: + fi, err := os.Stat(filePath) + if err != nil { + log.Errorf("FileWatcher: Failed to stat %q: %v", filePath, err) + return + } + + if fi.IsDir() { + if err := w.addWatch(filePath); err != nil { + log.Errorf("FileWatcher: Failed to add %q: %v", filePath, err) + } + + dir = true + } + case EventDelete: + if w.hasWatch(filePath) { + w.clear() + if err := w.start(nil); err != nil { + log.Errorf("FileWatcher: Failed to re-initialize watches for %q", w.dir) + } + + dir = true + } + } + + return +} + +// GetFileUpdateStream gets the channel with FileUpdates +func (w *FileWatcher) GetFileUpdateStream() FileUpdateStream { + return w.updates +} + +// Close closes active underlying resources +func (w *FileWatcher) Close() { + notify.Stop(w.events) + w.batcher.Close() + close(w.events) // Close the event stream + w.monitor.Wait() + w.dispatcher.Wait() +} + +// Suspend enables a one-time suspend of the given event, +// the FileWatcher will skip the given event once +func (w *FileWatcher) Suspend(updateEvent Event) { + w.suspendEvent = updateEvent +} + +// validSuffix is used to filter out all unsupported +// files based on the extensions given +func (w *FileWatcher) validSuffix(path string) bool { + for _, suffix := range w.opts.ValidExtensions { + if filepath.Ext(path) == suffix { + return true + } + } + + return false +} + +func convertEvent(event notify.Event) Event { + if updateEvent, ok := eventMap[event]; ok { + return updateEvent + } + + return EventNone +} + +// concatenateEvents takes in a slice of events and concatenates +// all events possible based on combinedEvents +func concatenateEvents(events Events) Events { + if len(events) < 2 { + return events // Quick return for 0 or 1 event + } + + for _, combinedEvent := range combinedEvents { + if len(combinedEvent.input) > len(events) { + continue // The combined event's match is too long + } + + // Test if the prefix of the given events matches combinedEvent.input + if bytes.Equal(events.Bytes()[:len(combinedEvent.input)], combinedEvent.input) { + // If so, replace combinedEvent.input prefix in events with combinedEvent.output and recurse + concatenated := append(Events{combinedEvent.output}, events[len(combinedEvent.input):]...) + log.Tracef("FileWatcher: Concatenated events: %v -> %v", events, concatenated) + return concatenateEvents(concatenated) + } + } + + return events +} + +var suppressCache struct { + event Event + path string +} + +// suppressEvent returns true it it's called twice +// in a row with the same known event and path +func suppressEvent(path string, event Event) (s bool) { + if _, ok := suppressDuplicates[event]; ok { + if suppressCache.event == event && suppressCache.path == path { + s = true + } + } + + suppressCache.event = event + suppressCache.path = path + return +} diff --git a/pkg/util/watcher/filewatcher_test.go b/pkg/util/watcher/filewatcher_test.go new file mode 100644 index 000000000..bd914f863 --- /dev/null +++ b/pkg/util/watcher/filewatcher_test.go @@ -0,0 +1,47 @@ +package watcher + +import ( + "reflect" + "testing" +) + +var testEvents = []Events{ + { + EventDelete, + EventCreate, + EventModify, + }, + { + EventCreate, + EventModify, + EventDelete, + }, + { + EventCreate, + EventModify, + EventDelete, + EventCreate, + }, +} + +var targets = []Events{ + { + EventModify, + }, + { + EventNone, + }, + { + EventNone, + EventCreate, + }, +} + +func TestEventConcatenation(t *testing.T) { + for i, e := range testEvents { + result := concatenateEvents(e) + if !reflect.DeepEqual(result, targets[i]) { + t.Errorf("wrong concatenation result: %v != %v", result, targets[i]) + } + } +}