Skip to content
This repository has been archived by the owner on Dec 7, 2023. It is now read-only.

Commit

Permalink
Merge pull request #242 from twelho/syncstorage
Browse files Browse the repository at this point in the history
Documentation updates and clarifications for the New Storage implementation
  • Loading branch information
luxas authored Jul 29, 2019
2 parents fed5235 + e268ad5 commit 6a2257d
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cmd/ignite/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package cmd

import (
"fmt"
"github.com/weaveworks/ignite/pkg/errutils"
"io"
"os"
"os/signal"
"sync"

"github.com/spf13/cobra"
"github.com/weaveworks/ignite/pkg/errutils"
"github.com/weaveworks/ignite/pkg/providers"
)

Expand Down
21 changes: 15 additions & 6 deletions pkg/storage/sync/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
"github.com/weaveworks/ignite/pkg/storage"
"github.com/weaveworks/ignite/pkg/storage/watch"
"github.com/weaveworks/ignite/pkg/storage/watch/update"
"github.com/weaveworks/ignite/pkg/util"
"k8s.io/apimachinery/pkg/runtime/schema"
)

type UpdateStream chan update.Update

const updateBuffer = 4096 // How many updates to buffer
const updateBuffer = 4096 // How many updates to buffer, 4096 should be enough for even a high update frequency

// SyncStorage is a Storage implementation taking in multiple Storages and
// keeping them in sync. Any write operation executed on the SyncStorage
Expand All @@ -27,7 +28,7 @@ type SyncStorage struct {
storages []storage.Storage
eventStream watch.AssociatedEventStream
updateStream UpdateStream
monitor *watch.Monitor
monitor *util.Monitor
}

var _ storage.Storage = &SyncStorage{}
Expand All @@ -47,7 +48,7 @@ func NewSyncStorage(rwStorage storage.Storage, wStorages ...storage.Storage) sto
}

if ss.eventStream != nil {
ss.monitor = watch.RunMonitor(ss.monitorFunc)
ss.monitor = util.RunMonitor(ss.monitorFunc)
}

return ss
Expand Down Expand Up @@ -141,7 +142,8 @@ func (ss *SyncStorage) monitorFunc() {
case update.EventModify, update.EventCreate:
// First load the Object using the Storage given in the update,
// then set it using the client constructed above
if obj, err := client.NewClient(upd.Storage).Dynamic(upd.APIType.GetKind()).Get(upd.APIType.GetUID()); err != nil {
updClient := client.NewClient(upd.Storage).Dynamic(upd.APIType.GetKind())
if obj, err := updClient.Get(upd.APIType.GetUID()); err != nil {
log.Errorf("Failed to get Object with UID %q: %v", upd.APIType.GetUID(), err)
continue
} else if err = c.Dynamic(upd.APIType.GetKind()).Set(obj); err != nil {
Expand All @@ -156,8 +158,15 @@ func (ss *SyncStorage) monitorFunc() {
}
}

// Send the update to the listeners
ss.updateStream <- upd.Update
// Send the update to the listeners unless the channel is full,
// in which case issue a warning. The channel can hold as many
// updates as updateBuffer specifies.
select {
case ss.updateStream <- upd.Update:
log.Debugf("SyncStorage: sent update: %v", upd.Update)
default:
log.Warn("SyncStorage: failed to send update, channel full")
}
} else {
return
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/storage/watch/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (
"github.com/weaveworks/ignite/pkg/storage"
"github.com/weaveworks/ignite/pkg/storage/manifest/raw"
"github.com/weaveworks/ignite/pkg/storage/watch/update"
"github.com/weaveworks/ignite/pkg/util"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/yaml"
)

// Storage is an interface for persisting and retrieving API objects to/from a backend
// One Storage instance handles all different Kinds of Objects
// WatchStorage is an extended Storage implementation, which provides a watcher
// for watching changes in the directory managed by the embedded Storage's RawStorage.
// If the RawStorage is a MappedRawStorage instance, it's mappings will automatically
// be updated by the WatchStorage. Update events are sent to the given event stream.
type WatchStorage interface {
// WatchStorage extends the Storage interface
storage.Storage
Expand All @@ -42,7 +45,7 @@ func NewGenericWatchStorage(storage storage.Storage) (WatchStorage, error) {
}

if mapped, ok := s.RawStorage().(raw.MappedRawStorage); ok {
s.monitor = RunMonitor(func() {
s.monitor = util.RunMonitor(func() {
s.monitorFunc(mapped, files) // Offload the file registration to the goroutine
})
}
Expand All @@ -55,7 +58,7 @@ type GenericWatchStorage struct {
storage.Storage
watcher *watcher
events *AssociatedEventStream
monitor *Monitor
monitor *util.Monitor
}

var _ WatchStorage = &GenericWatchStorage{}
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/watch/update/associated.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"github.com/weaveworks/ignite/pkg/storage"
)

// AssociatedUpdate bundles together an Update and a Storage
// implementation. This is used by SyncStorage to query the
// correct Storage for the updated Object.
type AssociatedUpdate struct {
Update
Storage storage.Storage
Expand Down
9 changes: 1 addition & 8 deletions pkg/storage/watch/update/event.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package update

// Event is an enum describing a change in a file's state.
// Event is an enum describing a change in a file's/Object's state.
// Unknown state changes can be signaled with a zero value.
type Event uint8

Expand All @@ -22,10 +22,3 @@ func (e Event) String() string {

return "NONE"
}

// FileUpdate is used by watchers to
// signal the state change of a file.
type FileUpdate struct {
Event Event
Path string
}
8 changes: 8 additions & 0 deletions pkg/storage/watch/update/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package update

// FileUpdate is used by watchers to
// signal the state change of a file.
type FileUpdate struct {
Event Event
Path string
}
2 changes: 2 additions & 0 deletions pkg/storage/watch/update/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1"
)

// Update bundles an Event with an
// APIType for Storage retrieval.
type Update struct {
Event Event
APIType meta.Object
Expand Down
13 changes: 11 additions & 2 deletions pkg/storage/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/weaveworks/ignite/pkg/storage"
"github.com/weaveworks/ignite/pkg/storage/watch/update"
"github.com/weaveworks/ignite/pkg/util"
)

const eventBuffer = 4096 // How many events and updates we can buffer before watching is interrupted
Expand All @@ -24,13 +25,17 @@ type eventStream chan notify.EventInfo
type UpdateStream chan *update.FileUpdate
type watches []string

// watcher recursively monitors changes in files in the given directory
// and sends out events based on their state changes. Only files conforming
// to validSuffix are monitored. The watcher can be suspended for a single
// event at a time to eliminate updates by WatchStorage causing a loop.
type watcher struct {
dir string
events eventStream
updates UpdateStream
watches watches
suspendEvent update.Event
monitor *Monitor
monitor *util.Monitor
}

func (w *watcher) addWatch(path string) (err error) {
Expand Down Expand Up @@ -73,7 +78,7 @@ func newWatcher(dir string) (w *watcher, files []string, err error) {
if err = w.start(&files); err != nil {
notify.Stop(w.events)
} else {
w.monitor = RunMonitor(w.monitorFunc)
w.monitor = util.RunMonitor(w.monitorFunc)
}

return
Expand Down Expand Up @@ -173,6 +178,8 @@ func (w *watcher) close() {
w.monitor.Wait()
}

// This enables a one-time suspend of the given event,
// the watcher will skip the given event once
func (w *watcher) suspend(updateEvent update.Event) {
w.suspendEvent = updateEvent
}
Expand All @@ -185,6 +192,8 @@ func convertEvent(event notify.Event) update.Event {
return 0
}

// validSuffix is used to filter out all unsupported
// files based on the extensions in storage.Formats
func validSuffix(path string) bool {
for suffix := range storage.Formats {
if filepath.Ext(path) == suffix {
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/watch/monitor.go → pkg/util/monitor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package watch
package util

import "sync"

// Monitor is a convenience wrapper around
// starting a goroutine with a wait group,
// which can be used to wait for the
// goroutine to stop.
type Monitor struct {
wg *sync.WaitGroup
}
Expand Down

0 comments on commit 6a2257d

Please sign in to comment.