From f72dfb28396574cabbb9b22bf346af8a58982553 Mon Sep 17 00:00:00 2001 From: Dennis Marttinen Date: Mon, 29 Jul 2019 18:14:03 +0300 Subject: [PATCH 1/7] Improve SyncStorage client handling readability, explain updateBuffer --- pkg/storage/sync/storage.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/storage/sync/storage.go b/pkg/storage/sync/storage.go index cf73a3d8d..f98766bed 100644 --- a/pkg/storage/sync/storage.go +++ b/pkg/storage/sync/storage.go @@ -14,7 +14,7 @@ import ( type UpdateStream chan update.Update -const updateBuffer = 4096 // How many updates to buffer +const updateBuffer = 4096 // How many updates to buffer, 4096 should be enough for even a high update frequency // SyncStorage is a Storage implementation taking in multiple Storages and // keeping them in sync. Any write operation executed on the SyncStorage @@ -141,7 +141,8 @@ func (ss *SyncStorage) monitorFunc() { case update.EventModify, update.EventCreate: // First load the Object using the Storage given in the update, // then set it using the client constructed above - if obj, err := client.NewClient(upd.Storage).Dynamic(upd.APIType.GetKind()).Get(upd.APIType.GetUID()); err != nil { + updClient := client.NewClient(upd.Storage).Dynamic(upd.APIType.GetKind()) + if obj, err := updClient.Get(upd.APIType.GetUID()); err != nil { log.Errorf("Failed to get Object with UID %q: %v", upd.APIType.GetUID(), err) continue } else if err = c.Dynamic(upd.APIType.GetKind()).Set(obj); err != nil { From 065dd056547f4ca5970b574239621e3ec384d6aa Mon Sep 17 00:00:00 2001 From: Dennis Marttinen Date: Mon, 29 Jul 2019 18:22:24 +0300 Subject: [PATCH 2/7] SyncStorage: add logging to update sending, discard if channel full --- pkg/storage/sync/storage.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/storage/sync/storage.go b/pkg/storage/sync/storage.go index f98766bed..5b79436e4 100644 --- a/pkg/storage/sync/storage.go +++ b/pkg/storage/sync/storage.go @@ -157,8 +157,15 @@ func (ss *SyncStorage) monitorFunc() { } } - // Send the update to the listeners - ss.updateStream <- upd.Update + // Send the update to the listeners unless the channel is full, + // in which case issue a warning. The channel can hold as many + // updates as updateBuffer specifies. + select { + case ss.updateStream <- upd.Update: + log.Debugf("SyncStorage: sent update: %v", upd.Update) + default: + log.Warn("SyncStorage: failed to send update, channel full") + } } else { return } From 5318b7d6cc90c868a1fe2f17052721b807dcc6f9 Mon Sep 17 00:00:00 2001 From: Dennis Marttinen Date: Mon, 29 Jul 2019 18:29:28 +0300 Subject: [PATCH 3/7] Move Monitor to util, add some documentation for it --- pkg/storage/sync/storage.go | 5 +++-- pkg/storage/watch/storage.go | 5 +++-- pkg/storage/watch/watch.go | 5 +++-- pkg/{storage/watch => util}/monitor.go | 6 +++++- 4 files changed, 14 insertions(+), 7 deletions(-) rename pkg/{storage/watch => util}/monitor.go (63%) diff --git a/pkg/storage/sync/storage.go b/pkg/storage/sync/storage.go index 5b79436e4..b904e4c0b 100644 --- a/pkg/storage/sync/storage.go +++ b/pkg/storage/sync/storage.go @@ -2,6 +2,7 @@ package sync import ( "fmt" + "github.com/weaveworks/ignite/pkg/util" log "github.com/sirupsen/logrus" meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1" @@ -27,7 +28,7 @@ type SyncStorage struct { storages []storage.Storage eventStream watch.AssociatedEventStream updateStream UpdateStream - monitor *watch.Monitor + monitor *util.Monitor } var _ storage.Storage = &SyncStorage{} @@ -47,7 +48,7 @@ func NewSyncStorage(rwStorage storage.Storage, wStorages ...storage.Storage) sto } if ss.eventStream != nil { - ss.monitor = watch.RunMonitor(ss.monitorFunc) + ss.monitor = util.RunMonitor(ss.monitorFunc) } return ss diff --git a/pkg/storage/watch/storage.go b/pkg/storage/watch/storage.go index 4f9d6bde1..8da1048bd 100644 --- a/pkg/storage/watch/storage.go +++ b/pkg/storage/watch/storage.go @@ -2,6 +2,7 @@ package watch import ( "fmt" + "github.com/weaveworks/ignite/pkg/util" "io/ioutil" log "github.com/sirupsen/logrus" @@ -42,7 +43,7 @@ func NewGenericWatchStorage(storage storage.Storage) (WatchStorage, error) { } if mapped, ok := s.RawStorage().(raw.MappedRawStorage); ok { - s.monitor = RunMonitor(func() { + s.monitor = util.RunMonitor(func() { s.monitorFunc(mapped, files) // Offload the file registration to the goroutine }) } @@ -55,7 +56,7 @@ type GenericWatchStorage struct { storage.Storage watcher *watcher events *AssociatedEventStream - monitor *Monitor + monitor *util.Monitor } var _ WatchStorage = &GenericWatchStorage{} diff --git a/pkg/storage/watch/watch.go b/pkg/storage/watch/watch.go index e71a17313..4a8a63b1c 100644 --- a/pkg/storage/watch/watch.go +++ b/pkg/storage/watch/watch.go @@ -1,6 +1,7 @@ package watch import ( + "github.com/weaveworks/ignite/pkg/util" "os" "path/filepath" @@ -30,7 +31,7 @@ type watcher struct { updates UpdateStream watches watches suspendEvent update.Event - monitor *Monitor + monitor *util.Monitor } func (w *watcher) addWatch(path string) (err error) { @@ -73,7 +74,7 @@ func newWatcher(dir string) (w *watcher, files []string, err error) { if err = w.start(&files); err != nil { notify.Stop(w.events) } else { - w.monitor = RunMonitor(w.monitorFunc) + w.monitor = util.RunMonitor(w.monitorFunc) } return diff --git a/pkg/storage/watch/monitor.go b/pkg/util/monitor.go similarity index 63% rename from pkg/storage/watch/monitor.go rename to pkg/util/monitor.go index 8c92b8b7e..dd0149baa 100644 --- a/pkg/storage/watch/monitor.go +++ b/pkg/util/monitor.go @@ -1,7 +1,11 @@ -package watch +package util import "sync" +// Monitor is a convenience wrapper around +// starting a goroutine with a wait group, +// which can be used to wait for the +// goroutine to stop. type Monitor struct { wg *sync.WaitGroup } From 573e1f263580e303eb0d887c3dec2dc864aa843c Mon Sep 17 00:00:00 2001 From: Dennis Marttinen Date: Mon, 29 Jul 2019 18:58:32 +0300 Subject: [PATCH 4/7] Fix WatchStorage godoc, document AssociatedUpdate --- pkg/storage/watch/storage.go | 6 ++++-- pkg/storage/watch/update/associated.go | 3 +++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/storage/watch/storage.go b/pkg/storage/watch/storage.go index 8da1048bd..b20817fa3 100644 --- a/pkg/storage/watch/storage.go +++ b/pkg/storage/watch/storage.go @@ -17,8 +17,10 @@ import ( "sigs.k8s.io/yaml" ) -// Storage is an interface for persisting and retrieving API objects to/from a backend -// One Storage instance handles all different Kinds of Objects +// WatchStorage is an extended Storage implementation, which provides a watcher +// for watching changes in the directory managed by the embedded Storage's RawStorage. +// If the RawStorage is a MappedRawStorage instance, it's mappings will automatically +// be updated by the WatchStorage. Update events are sent to the given event stream. type WatchStorage interface { // WatchStorage extends the Storage interface storage.Storage diff --git a/pkg/storage/watch/update/associated.go b/pkg/storage/watch/update/associated.go index 1df600cae..239db858d 100644 --- a/pkg/storage/watch/update/associated.go +++ b/pkg/storage/watch/update/associated.go @@ -4,6 +4,9 @@ import ( "github.com/weaveworks/ignite/pkg/storage" ) +// AssociatedUpdate bundles together an Update and a Storage +// implementation. This is used by SyncStorage to query the +// correct Storage for the updated Object. type AssociatedUpdate struct { Update Storage storage.Storage From c1cb8055504a395f1810596ca6da28a62ac23a35 Mon Sep 17 00:00:00 2001 From: Dennis Marttinen Date: Mon, 29 Jul 2019 19:02:53 +0300 Subject: [PATCH 5/7] Document Update, move FileUpdate to its own file --- pkg/storage/watch/update/event.go | 9 +-------- pkg/storage/watch/update/file.go | 8 ++++++++ pkg/storage/watch/update/update.go | 2 ++ 3 files changed, 11 insertions(+), 8 deletions(-) create mode 100644 pkg/storage/watch/update/file.go diff --git a/pkg/storage/watch/update/event.go b/pkg/storage/watch/update/event.go index 0b8fc499a..9ac3842d2 100644 --- a/pkg/storage/watch/update/event.go +++ b/pkg/storage/watch/update/event.go @@ -1,6 +1,6 @@ package update -// Event is an enum describing a change in a file's state. +// Event is an enum describing a change in a file's/Object's state. // Unknown state changes can be signaled with a zero value. type Event uint8 @@ -22,10 +22,3 @@ func (e Event) String() string { return "NONE" } - -// FileUpdate is used by watchers to -// signal the state change of a file. -type FileUpdate struct { - Event Event - Path string -} diff --git a/pkg/storage/watch/update/file.go b/pkg/storage/watch/update/file.go new file mode 100644 index 000000000..706bb975f --- /dev/null +++ b/pkg/storage/watch/update/file.go @@ -0,0 +1,8 @@ +package update + +// FileUpdate is used by watchers to +// signal the state change of a file. +type FileUpdate struct { + Event Event + Path string +} diff --git a/pkg/storage/watch/update/update.go b/pkg/storage/watch/update/update.go index 48a9890de..41e054c79 100644 --- a/pkg/storage/watch/update/update.go +++ b/pkg/storage/watch/update/update.go @@ -4,6 +4,8 @@ import ( meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1" ) +// Update bundles an Event with an +// APIType for Storage retrieval. type Update struct { Event Event APIType meta.Object From 259fe61ffa75628658acf5e2d0e0308cadc50507 Mon Sep 17 00:00:00 2001 From: Dennis Marttinen Date: Mon, 29 Jul 2019 19:16:17 +0300 Subject: [PATCH 6/7] Add godoc comments to watcher --- pkg/storage/watch/watch.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/storage/watch/watch.go b/pkg/storage/watch/watch.go index 4a8a63b1c..7aa2957d8 100644 --- a/pkg/storage/watch/watch.go +++ b/pkg/storage/watch/watch.go @@ -25,6 +25,10 @@ type eventStream chan notify.EventInfo type UpdateStream chan *update.FileUpdate type watches []string +// watcher recursively monitors changes in files in the given directory +// and sends out events based on their state changes. Only files conforming +// to validSuffix are monitored. The watcher can be suspended for a single +// event at a time to eliminate updates by WatchStorage causing a loop. type watcher struct { dir string events eventStream @@ -174,6 +178,8 @@ func (w *watcher) close() { w.monitor.Wait() } +// This enables a one-time suspend of the given event, +// the watcher will skip the given event once func (w *watcher) suspend(updateEvent update.Event) { w.suspendEvent = updateEvent } @@ -186,6 +192,8 @@ func convertEvent(event notify.Event) update.Event { return 0 } +// validSuffix is used to filter out all unsupported +// files based on the extensions in storage.Formats func validSuffix(path string) bool { for suffix := range storage.Formats { if filepath.Ext(path) == suffix { From 6c6158ff7a9dd0028fb78af4c033b58e86c15a0d Mon Sep 17 00:00:00 2001 From: Dennis Marttinen Date: Mon, 29 Jul 2019 19:34:49 +0300 Subject: [PATCH 7/7] make tidy --- cmd/ignite/cmd/daemon.go | 2 +- pkg/storage/sync/storage.go | 2 +- pkg/storage/watch/storage.go | 2 +- pkg/storage/watch/watch.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/ignite/cmd/daemon.go b/cmd/ignite/cmd/daemon.go index 295a0130d..3d2add925 100644 --- a/cmd/ignite/cmd/daemon.go +++ b/cmd/ignite/cmd/daemon.go @@ -2,13 +2,13 @@ package cmd import ( "fmt" - "github.com/weaveworks/ignite/pkg/errutils" "io" "os" "os/signal" "sync" "github.com/spf13/cobra" + "github.com/weaveworks/ignite/pkg/errutils" "github.com/weaveworks/ignite/pkg/providers" ) diff --git a/pkg/storage/sync/storage.go b/pkg/storage/sync/storage.go index b904e4c0b..00860e62e 100644 --- a/pkg/storage/sync/storage.go +++ b/pkg/storage/sync/storage.go @@ -2,7 +2,6 @@ package sync import ( "fmt" - "github.com/weaveworks/ignite/pkg/util" log "github.com/sirupsen/logrus" meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1" @@ -10,6 +9,7 @@ import ( "github.com/weaveworks/ignite/pkg/storage" "github.com/weaveworks/ignite/pkg/storage/watch" "github.com/weaveworks/ignite/pkg/storage/watch/update" + "github.com/weaveworks/ignite/pkg/util" "k8s.io/apimachinery/pkg/runtime/schema" ) diff --git a/pkg/storage/watch/storage.go b/pkg/storage/watch/storage.go index b20817fa3..cb9fb57f5 100644 --- a/pkg/storage/watch/storage.go +++ b/pkg/storage/watch/storage.go @@ -2,7 +2,6 @@ package watch import ( "fmt" - "github.com/weaveworks/ignite/pkg/util" "io/ioutil" log "github.com/sirupsen/logrus" @@ -12,6 +11,7 @@ import ( "github.com/weaveworks/ignite/pkg/storage" "github.com/weaveworks/ignite/pkg/storage/manifest/raw" "github.com/weaveworks/ignite/pkg/storage/watch/update" + "github.com/weaveworks/ignite/pkg/util" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/yaml" diff --git a/pkg/storage/watch/watch.go b/pkg/storage/watch/watch.go index 7aa2957d8..ece5a8413 100644 --- a/pkg/storage/watch/watch.go +++ b/pkg/storage/watch/watch.go @@ -1,7 +1,6 @@ package watch import ( - "github.com/weaveworks/ignite/pkg/util" "os" "path/filepath" @@ -9,6 +8,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/weaveworks/ignite/pkg/storage" "github.com/weaveworks/ignite/pkg/storage/watch/update" + "github.com/weaveworks/ignite/pkg/util" ) const eventBuffer = 4096 // How many events and updates we can buffer before watching is interrupted