Skip to content

Commit

Permalink
chore: refactoring component structure (#1044)
Browse files Browse the repository at this point in the history
This PR refactors the core package by restructuring the components
responsible for managing the subscriptions, as well as the creation of
sync sources. The following changes have been made:

- Renamed `sync-store` package to `subscriptions`. This has been done to
avoid confusion because we already have a `sync`, and a `store`.
package. Within the package, the `SyncStore`. has been renamed to
`subscriptions.Manager`, which should reflect its responsibility in a
better way. Also, the `syncHandler` has been renamed to `multiplexer`,
as this one is responsible for sending updates of a certain target to
all subscribers - `syncHandler` was a bit too generic in my opinion.
- Moved the `GetSyncSourceFromURI` method to a new package,
`sync/builder`, to remove the responsibility of building concrete sync
sources from the subscription manager
- Moved the logic of retrieving the K8s client config to the sync
builder. Previously, this was done in the `runtime` package by calling
the respective methods for the config retrieval provided by the
`sync/kubernetes` package and then handing that config back to the
initialization of the `K8sSync`. Note: This step can potentially be done
in a separate PR, if so desired.

---------

Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>
Co-authored-by: Giovanni Liva <giovanni.liva@dynatrace.com>
  • Loading branch information
bacherfl and thisthat authored Dec 7, 2023
1 parent ec5f778 commit 0c7f78a
Show file tree
Hide file tree
Showing 21 changed files with 1,035 additions and 757 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ mockgen: install-mockgen
cd core; mockgen -source=pkg/sync/grpc/credentials/builder.go -destination=pkg/sync/grpc/credentials/mock/builder.go -package=credendialsmock
cd core; mockgen -source=pkg/eval/ievaluator.go -destination=pkg/eval/mock/ievaluator.go -package=evalmock
cd core; mockgen -source=pkg/service/middleware/interface.go -destination=pkg/service/middleware/mock/interface.go -package=middlewaremock
cd core; mockgen -source=pkg/sync/builder/syncbuilder.go -destination=pkg/sync/builder/mock/syncbuilder.go -package=middlewaremocksyncbuildermock
generate-docs:
cd flagd; go run ./cmd/doc/main.go

Expand Down
205 changes: 8 additions & 197 deletions core/pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,22 @@ package runtime

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"regexp"
msync "sync"
"time"

"github.com/open-feature/flagd/core/pkg/eval"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/service"
flageval "github.com/open-feature/flagd/core/pkg/service/flag-evaluation"
"github.com/open-feature/flagd/core/pkg/store"
"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/flagd/core/pkg/sync/file"
"github.com/open-feature/flagd/core/pkg/sync/grpc"
"github.com/open-feature/flagd/core/pkg/sync/grpc/credentials"
httpSync "github.com/open-feature/flagd/core/pkg/sync/http"
"github.com/open-feature/flagd/core/pkg/sync/kubernetes"
syncbuilder "github.com/open-feature/flagd/core/pkg/sync/builder"
"github.com/open-feature/flagd/core/pkg/telemetry"
"github.com/robfig/cron"
"go.uber.org/zap"
)

// from_config is a collection of structures and parsers responsible for deriving flagd runtime

const (
syncProviderFile = "file"
syncProviderGrpc = "grpc"
syncProviderKubernetes = "kubernetes"
syncProviderHTTP = "http"
svcName = "flagd"
)

var (
regCrd *regexp.Regexp
regURL *regexp.Regexp
regGRPC *regexp.Regexp
regGRPCSecure *regexp.Regexp
regFile *regexp.Regexp
)

// SourceConfig is configuration option for flagd. This maps to startup parameter sources
type SourceConfig struct {
URI string `json:"uri"`
Provider string `json:"provider"`

BearerToken string `json:"bearerToken,omitempty"`
CertPath string `json:"certPath,omitempty"`
TLS bool `json:"tls,omitempty"`
ProviderID string `json:"providerID,omitempty"`
Selector string `json:"selector,omitempty"`
Interval uint32 `json:"interval,omitempty"`
}
const svcName = "flagd"

// Config is the configuration structure derived from startup arguments.
type Config struct {
Expand All @@ -67,18 +29,10 @@ type Config struct {
ServicePort uint16
ServiceSocketPath string

SyncProviders []SourceConfig
SyncProviders []sync.SourceConfig
CORS []string
}

func init() {
regCrd = regexp.MustCompile("^core.openfeature.dev/")
regURL = regexp.MustCompile("^https?://")
regGRPC = regexp.MustCompile("^" + grpc.Prefix)
regGRPCSecure = regexp.MustCompile("^" + grpc.PrefixSecure)
regFile = regexp.MustCompile("^file:")
}

// FromConfig builds a runtime from startup configurations
// nolint: funlen
func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime, error) {
Expand Down Expand Up @@ -176,155 +130,12 @@ func setupJSONEvaluator(logger *logger.Logger, s *store.Flags) *eval.JSONEvaluat
}

// syncProvidersFromConfig is a helper to build ISync implementations from SourceConfig
func syncProvidersFromConfig(logger *logger.Logger, sources []SourceConfig) ([]sync.ISync, error) {
syncImpls := []sync.ISync{}

for _, syncProvider := range sources {
switch syncProvider.Provider {
case syncProviderFile:
syncImpls = append(syncImpls, NewFile(syncProvider, logger))
logger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", syncProvider.URI))
case syncProviderKubernetes:
k, err := NewK8s(syncProvider.URI, logger)
if err != nil {
return nil, err
}
syncImpls = append(syncImpls, k)
logger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", syncProvider.URI))
case syncProviderHTTP:
syncImpls = append(syncImpls, NewHTTP(syncProvider, logger))
logger.Debug(fmt.Sprintf("using remote sync-provider for: %s", syncProvider.URI))
case syncProviderGrpc:
syncImpls = append(syncImpls, NewGRPC(syncProvider, logger))
logger.Debug(fmt.Sprintf("using grpc sync-provider for: %s", syncProvider.URI))

default:
return nil, fmt.Errorf("invalid sync provider: %s, must be one of with '%s', '%s', '%s' or '%s'",
syncProvider.Provider, syncProviderFile, syncProviderKubernetes, syncProviderHTTP, syncProviderKubernetes)
}
}
return syncImpls, nil
}

func NewGRPC(config SourceConfig, logger *logger.Logger) *grpc.Sync {
return &grpc.Sync{
URI: config.URI,
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "grpc"),
),
CredentialBuilder: &credentials.CredentialBuilder{},
CertPath: config.CertPath,
ProviderID: config.ProviderID,
Secure: config.TLS,
Selector: config.Selector,
}
}

func NewHTTP(config SourceConfig, logger *logger.Logger) *httpSync.Sync {
// Default to 5 seconds
var interval uint32 = 5
if config.Interval != 0 {
interval = config.Interval
}

return &httpSync.Sync{
URI: config.URI,
Client: &http.Client{
Timeout: time.Second * 10,
},
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "remote"),
),
BearerToken: config.BearerToken,
Interval: interval,
Cron: cron.New(),
}
}

func NewK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) {
reader, dynamic, err := kubernetes.GetClients()
func syncProvidersFromConfig(logger *logger.Logger, sources []sync.SourceConfig) ([]sync.ISync, error) {
builder := syncbuilder.NewSyncBuilder()
syncs, err := builder.SyncsFromConfig(sources, logger)
if err != nil {
return nil, fmt.Errorf("error creating kubernetes clients: %w", err)
}
return kubernetes.NewK8sSync(
logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "kubernetes"),
),
regCrd.ReplaceAllString(uri, ""),
reader,
dynamic,
), nil
}

func NewFile(config SourceConfig, logger *logger.Logger) *file.Sync {
return &file.Sync{
URI: config.URI,
Logger: logger.WithFields(
zap.String("component", "sync"),
zap.String("sync", "filepath"),
),
Mux: &msync.RWMutex{},
}
}

// ParseSources parse a json formatted SourceConfig array string and performs validations on the content
func ParseSources(sourcesFlag string) ([]SourceConfig, error) {
syncProvidersParsed := []SourceConfig{}

if err := json.Unmarshal([]byte(sourcesFlag), &syncProvidersParsed); err != nil {
return syncProvidersParsed, fmt.Errorf("error parsing sync providers: %w", err)
}
for _, sp := range syncProvidersParsed {
if sp.URI == "" {
return syncProvidersParsed, errors.New("sync provider argument parse: uri is a required field")
}
if sp.Provider == "" {
return syncProvidersParsed, errors.New("sync provider argument parse: provider is a required field")
}
return nil, fmt.Errorf("could not create sync sources from config: %w", err)
}
return syncProvidersParsed, nil
}

// ParseSyncProviderURIs uri flag based sync sources to SourceConfig array. Replaces uri prefixes where necessary to
// derive SourceConfig
func ParseSyncProviderURIs(uris []string) ([]SourceConfig, error) {
syncProvidersParsed := []SourceConfig{}

for _, uri := range uris {
switch uriB := []byte(uri); {
case regFile.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, SourceConfig{
URI: regFile.ReplaceAllString(uri, ""),
Provider: syncProviderFile,
})
case regCrd.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, SourceConfig{
URI: regCrd.ReplaceAllString(uri, ""),
Provider: syncProviderKubernetes,
})
case regURL.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, SourceConfig{
URI: uri,
Provider: syncProviderHTTP,
})
case regGRPC.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, SourceConfig{
URI: regGRPC.ReplaceAllString(uri, ""),
Provider: syncProviderGrpc,
})
case regGRPCSecure.Match(uriB):
syncProvidersParsed = append(syncProvidersParsed, SourceConfig{
URI: regGRPCSecure.ReplaceAllString(uri, ""),
Provider: syncProviderGrpc,
TLS: true,
})
default:
return syncProvidersParsed, fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', "+
"'http(s)://', 'grpc(s)://', or 'core.openfeature.dev'", uri)
}
}
return syncProvidersParsed, nil
return syncs, nil
}
Loading

0 comments on commit 0c7f78a

Please sign in to comment.