Skip to content

Commit

Permalink
refactor: refactor sync providers (#291)
Browse files Browse the repository at this point in the history
## This PR

Revisits sync providers and refactor them.

- unify event handling
- packaging improvements
- removal of unused codes (cleanup)
- clean contracts between Runtime and Sync implementations 

With this change, we get a clear isolation between runtime and sync
providers (ex:- file, k8s, ....). For example, the runtime is not aware
of any sync implementation details such as events. It simply coordinates
sync impls, store (evaluator) and notifications. This should bring more
extensibility when adding future extensions.

Below is the overview of the internals and interactions,


![image](https://user-images.githubusercontent.com/8186721/213037032-316adb7e-e9ab-42e3-82f6-c3eaa2612ba3.png)

Signed-off-by: Kavindu Dodanduwa <kavindudodanduwa@gmail.com>
  • Loading branch information
Kavindu-Dodan authored Jan 23, 2023
1 parent ce14db2 commit 46c5bf8
Show file tree
Hide file tree
Showing 18 changed files with 720 additions and 916 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ lint:
install-mockgen:
go install github.com/golang/mock/mockgen@v1.6.0
mockgen: install-mockgen
mockgen -source=pkg/sync/http_sync.go -destination=pkg/sync/mock/http.go -package=syncmock
mockgen -source=pkg/sync/http/http_sync.go -destination=pkg/sync/http/mock/http.go -package=syncmock
mockgen -source=pkg/eval/ievaluator.go -destination=pkg/eval/mock/ievaluator.go -package=evalmock
14 changes: 8 additions & 6 deletions pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"regexp"
"time"

"github.com/open-feature/flagd/pkg/sync/file"
httpSync "github.com/open-feature/flagd/pkg/sync/http"

"github.com/open-feature/flagd/pkg/eval"
"github.com/open-feature/flagd/pkg/logger"
"github.com/open-feature/flagd/pkg/service"
Expand All @@ -29,10 +32,9 @@ func init() {

func FromConfig(logger *logger.Logger, config Config) (*Runtime, error) {
rt := Runtime{
config: config,
Logger: logger.WithFields(zap.String("component", "runtime")),
syncNotifier: make(chan sync.INotify),
Evaluator: eval.NewJSONEvaluator(logger),
config: config,
Logger: logger.WithFields(zap.String("component", "runtime")),
Evaluator: eval.NewJSONEvaluator(logger),
}
if err := rt.setSyncImplFromConfig(logger); err != nil {
return nil, err
Expand Down Expand Up @@ -63,7 +65,7 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error {
for _, uri := range r.config.SyncURI {
switch uriB := []byte(uri); {
case regFile.Match(uriB):
r.SyncImpl = append(r.SyncImpl, &sync.FilePathSync{
r.SyncImpl = append(r.SyncImpl, &file.Sync{
URI: regFile.ReplaceAllString(uri, ""),
Logger: logger.WithFields(
zap.String("component", "sync"),
Expand All @@ -83,7 +85,7 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error {
})
rtLogger.Debug(fmt.Sprintf("Using kubernetes sync-provider for %s", uri))
case regURL.Match(uriB):
r.SyncImpl = append(r.SyncImpl, &sync.HTTPSync{
r.SyncImpl = append(r.SyncImpl, &httpSync.Sync{
URI: uri,
BearerToken: r.config.SyncBearerToken,
Client: &http.Client{
Expand Down
94 changes: 56 additions & 38 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ package runtime

import (
"context"
"fmt"
"errors"
"os"
"os/signal"
msync "sync"
"syscall"

"golang.org/x/sync/errgroup"

"github.com/open-feature/flagd/pkg/eval"
"github.com/open-feature/flagd/pkg/logger"
Expand All @@ -12,10 +17,9 @@ import (
)

type Runtime struct {
config Config
Service service.IService
SyncImpl []sync.ISync
syncNotifier chan sync.INotify
config Config
Service service.IService
SyncImpl []sync.ISync

mu msync.Mutex
Evaluator eval.IEvaluator
Expand All @@ -36,48 +40,63 @@ type Config struct {
CORS []string
}

func (r *Runtime) startSyncer(ctx context.Context, syncr sync.ISync) error {
if err := r.updateState(ctx, syncr); err != nil {
return err
func (r *Runtime) Start() error {
if r.Service == nil {
return errors.New("no Service set")
}
if len(r.SyncImpl) == 0 {
return errors.New("no SyncImplementation set")
}
if r.Evaluator == nil {
return errors.New("no Evaluator set")
}

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()

go syncr.Notify(ctx, r.syncNotifier)

for {
select {
case <-ctx.Done():
return nil
case w := <-r.syncNotifier:
switch w.GetEvent().EventType {
case sync.DefaultEventTypeCreate:
r.Logger.Debug("New configuration created")
if err := r.updateState(ctx, syncr); err != nil {
r.Logger.Error(err.Error())
}
case sync.DefaultEventTypeModify:
r.Logger.Debug("Configuration modified")
if err := r.updateState(ctx, syncr); err != nil {
r.Logger.Error(err.Error())
}
case sync.DefaultEventTypeDelete:
r.Logger.Debug("Configuration deleted")
case sync.DefaultEventTypeReady:
r.Logger.Debug("Notifier ready")
g, gCtx := errgroup.WithContext(ctx)
dataSync := make(chan sync.DataSync)

// Initialize DataSync channel watcher
g.Go(func() error {
for {
select {
case data := <-dataSync:
r.updateWithNotify(data)
case <-gCtx.Done():
return nil
}
}
})

// Start sync providers
for _, s := range r.SyncImpl {
p := s
g.Go(func() error {
return p.Sync(gCtx, dataSync)
})
}
}

func (r *Runtime) updateState(ctx context.Context, syncr sync.ISync) error {
msg, err := syncr.Fetch(ctx)
if err != nil {
return fmt.Errorf("fetch: %w", err)
g.Go(func() error {
return r.Service.Serve(gCtx, r.Evaluator)
})

<-gCtx.Done()
if err := g.Wait(); err != nil {
return err
}
return nil
}

// updateWithNotify helps to update state and notify listeners
func (r *Runtime) updateWithNotify(data sync.DataSync) {
r.mu.Lock()
defer r.mu.Unlock()
notifications, err := r.Evaluator.SetState(syncr.Source(), msg)

notifications, err := r.Evaluator.SetState(data.Source, data.FlagData)
if err != nil {
return fmt.Errorf("set state: %w", err)
r.Logger.Error(err.Error())
return
}

r.Service.Notify(service.Notification{
Expand All @@ -86,5 +105,4 @@ func (r *Runtime) updateState(ctx context.Context, syncr sync.ISync) error {
"flags": notifications,
},
})
return nil
}
44 changes: 0 additions & 44 deletions pkg/runtime/start.go

This file was deleted.

138 changes: 138 additions & 0 deletions pkg/sync/file/filepath_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package file

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strings"

"github.com/open-feature/flagd/pkg/sync"

"gopkg.in/yaml.v3"

"github.com/fsnotify/fsnotify"
"github.com/open-feature/flagd/pkg/logger"
)

type Sync struct {
URI string
Logger *logger.Logger
ProviderArgs sync.ProviderArgs
// FileType indicates the file type e.g., json, yaml/yml etc.,
fileType string
}

//nolint:funlen
func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
fs.Logger.Info("Starting filepath sync notifier")
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer watcher.Close()

err = watcher.Add(fs.URI)
if err != nil {
return err
}

// file watcher is ready(and stable), fetch and emit the initial results
fetch, err := fs.fetch(ctx)
if err != nil {
return err
}

dataSync <- sync.DataSync{FlagData: fetch, Source: fs.URI}

fs.Logger.Info(fmt.Sprintf("Watching filepath: %s", fs.URI))
for {
select {
case event, ok := <-watcher.Events:
if !ok {
fs.Logger.Info("Filepath notifier closed")
return errors.New("filepath notifier closed")
}

fs.Logger.Info(fmt.Sprintf("Filepath event: %s %s", event.Name, event.Op.String()))

switch event.Op {
case fsnotify.Create:
fs.Logger.Debug("New configuration created")
msg, err := fs.fetch(ctx)
if err != nil {
fs.Logger.Error(fmt.Sprintf("Error fetching after Create notification: %s", err.Error()))
continue
}

dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI}
case fsnotify.Write:
fs.Logger.Debug("Configuration modified")
msg, err := fs.fetch(ctx)
if err != nil {
fs.Logger.Error(fmt.Sprintf("Error fetching after Write notification: %s", err.Error()))
continue
}

dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI}
case fsnotify.Remove:
// K8s exposes config maps as symlinks.
// Updates cause a remove event, we need to re-add the watcher in this case.
err = watcher.Add(fs.URI)
if err != nil {
fs.Logger.Error(fmt.Sprintf("Error restoring watcher, file may have been deleted: %s", err.Error()))
}
}
case err, ok := <-watcher.Errors:
if !ok {
return errors.New("watcher error")
}

fs.Logger.Error(err.Error())
case <-ctx.Done():
fs.Logger.Debug("Exiting file watcher")
return nil
}
}
}

func (fs *Sync) fetch(_ context.Context) (string, error) {
if fs.URI == "" {
return "", errors.New("no filepath string set")
}
if fs.fileType == "" {
uriSplit := strings.Split(fs.URI, ".")
fs.fileType = uriSplit[len(uriSplit)-1]
}
rawFile, err := os.ReadFile(fs.URI)
if err != nil {
return "", err
}

switch fs.fileType {
case "yaml", "yml":
return yamlToJSON(rawFile)
case "json":
return string(rawFile), nil
default:
return "", fmt.Errorf("filepath extension for URI '%s' is not supported", fs.URI)
}
}

// yamlToJSON is a generic helper function to convert
// yaml to json
func yamlToJSON(rawFile []byte) (string, error) {
var ms map[string]interface{}
// yaml.Unmarshal unmarshals to map[interface]interface{}
if err := yaml.Unmarshal(rawFile, &ms); err != nil {
return "", fmt.Errorf("unmarshal yaml: %w", err)
}

r, err := json.Marshal(ms)
if err != nil {
return "", fmt.Errorf("convert yaml to json: %w", err)
}

return string(r), err
}
Loading

0 comments on commit 46c5bf8

Please sign in to comment.