Skip to content
This repository has been archived by the owner on Dec 7, 2023. It is now read-only.

Documentation updates and clarifications for the New Storage implementation #242

Merged
merged 8 commits into from
Jul 29, 2019
21 changes: 15 additions & 6 deletions pkg/storage/sync/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sync

import (
"fmt"
"github.com/weaveworks/ignite/pkg/util"

log "github.com/sirupsen/logrus"
meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1"
Expand All @@ -14,7 +15,7 @@ import (

type UpdateStream chan update.Update

const updateBuffer = 4096 // How many updates to buffer
const updateBuffer = 4096 // How many updates to buffer, 4096 should be enough for even a high update frequency

// SyncStorage is a Storage implementation taking in multiple Storages and
// keeping them in sync. Any write operation executed on the SyncStorage
Expand All @@ -27,7 +28,7 @@ type SyncStorage struct {
storages []storage.Storage
eventStream watch.AssociatedEventStream
updateStream UpdateStream
monitor *watch.Monitor
monitor *util.Monitor
}

var _ storage.Storage = &SyncStorage{}
Expand All @@ -47,7 +48,7 @@ func NewSyncStorage(rwStorage storage.Storage, wStorages ...storage.Storage) sto
}

if ss.eventStream != nil {
ss.monitor = watch.RunMonitor(ss.monitorFunc)
ss.monitor = util.RunMonitor(ss.monitorFunc)
}

return ss
Expand Down Expand Up @@ -141,7 +142,8 @@ func (ss *SyncStorage) monitorFunc() {
case update.EventModify, update.EventCreate:
// First load the Object using the Storage given in the update,
// then set it using the client constructed above
if obj, err := client.NewClient(upd.Storage).Dynamic(upd.APIType.GetKind()).Get(upd.APIType.GetUID()); err != nil {
updClient := client.NewClient(upd.Storage).Dynamic(upd.APIType.GetKind())
if obj, err := updClient.Get(upd.APIType.GetUID()); err != nil {
log.Errorf("Failed to get Object with UID %q: %v", upd.APIType.GetUID(), err)
continue
} else if err = c.Dynamic(upd.APIType.GetKind()).Set(obj); err != nil {
Expand All @@ -156,8 +158,15 @@ func (ss *SyncStorage) monitorFunc() {
}
}

// Send the update to the listeners
ss.updateStream <- upd.Update
// Send the update to the listeners unless the channel is full,
// in which case issue a warning. The channel can hold as many
// updates as updateBuffer specifies.
select {
case ss.updateStream <- upd.Update:
log.Debugf("SyncStorage: sent update: %v", upd.Update)
default:
log.Warn("SyncStorage: failed to send update, channel full")
}
} else {
return
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/watch/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package watch

import (
"fmt"
"github.com/weaveworks/ignite/pkg/util"
"io/ioutil"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -42,7 +43,7 @@ func NewGenericWatchStorage(storage storage.Storage) (WatchStorage, error) {
}

if mapped, ok := s.RawStorage().(raw.MappedRawStorage); ok {
s.monitor = RunMonitor(func() {
s.monitor = util.RunMonitor(func() {
s.monitorFunc(mapped, files) // Offload the file registration to the goroutine
})
}
Expand All @@ -55,7 +56,7 @@ type GenericWatchStorage struct {
storage.Storage
watcher *watcher
events *AssociatedEventStream
monitor *Monitor
monitor *util.Monitor
}

var _ WatchStorage = &GenericWatchStorage{}
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/watch/watch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package watch

import (
"github.com/weaveworks/ignite/pkg/util"
"os"
"path/filepath"

Expand Down Expand Up @@ -30,7 +31,7 @@ type watcher struct {
updates UpdateStream
watches watches
suspendEvent update.Event
monitor *Monitor
monitor *util.Monitor
}

func (w *watcher) addWatch(path string) (err error) {
Expand Down Expand Up @@ -73,7 +74,7 @@ func newWatcher(dir string) (w *watcher, files []string, err error) {
if err = w.start(&files); err != nil {
notify.Stop(w.events)
} else {
w.monitor = RunMonitor(w.monitorFunc)
w.monitor = util.RunMonitor(w.monitorFunc)
}

return
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/watch/monitor.go → pkg/util/monitor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package watch
package util

import "sync"

// Monitor is a convenience wrapper around
// starting a goroutine with a wait group,
// which can be used to wait for the
// goroutine to stop.
type Monitor struct {
wg *sync.WaitGroup
}
Expand Down