Skip to content

Commit

Permalink
feat(config): support remote blob storage for service configuration (#…
Browse files Browse the repository at this point in the history
…2983)

* feat(config): support remote blob storage for service configuration

Signed-off-by: Roman Dmytrenko <rdmytrenko@gmail.com>

* address PR feedback and improve test coverage

Signed-off-by: Roman Dmytrenko <rdmytrenko@gmail.com>

---------

Signed-off-by: Roman Dmytrenko <rdmytrenko@gmail.com>
  • Loading branch information
erka authored Apr 14, 2024
1 parent 5148610 commit 91cc1b9
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 44 deletions.
24 changes: 24 additions & 0 deletions build/testing/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,30 @@ exit $?`,
}
}

{
container := container.Pipeline("flipt (remote config)")
minio := container.
From("quay.io/minio/minio:latest").
WithExposedPort(9009).
WithEnvVariable("MINIO_ROOT_USER", "user").
WithEnvVariable("MINIO_ROOT_PASSWORD", "password").
WithEnvVariable("MINIO_BROWSER", "off").
WithExec([]string{"server", "/data", "--address", ":9009", "--quiet"}).
AsService()

if _, err := assertExec(ctx,
container.WithServiceBinding("minio", minio).
WithEnvVariable("AWS_ACCESS_KEY_ID", "user").
WithEnvVariable("AWS_SECRET_ACCESS_KEY", "password"),
flipt("--config", "s3://mybucket/local.yml?region=minio&endpoint=http://minio:9009"),
fails,
stderr(contains(`NoSuchBucket`)),
stderr(contains(`loading configuration: open local.yml`)),
); err != nil {
return err
}
}

{
container := container.Pipeline("flipt import/export")

Expand Down
2 changes: 1 addition & 1 deletion cmd/flipt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func buildConfig() (*zap.Logger, *config.Config, error) {
// otherwise, use defaults
res, err := config.Load(path)
if err != nil {
return nil, nil, fmt.Errorf("loading configuration %w", err)
return nil, nil, fmt.Errorf("loading configuration: %w", err)
}

if !found {
Expand Down
57 changes: 54 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package config

import (
"context"
"encoding/json"
"fmt"
"io/fs"
"net/http"
"net/url"
"os"
"path/filepath"
"reflect"
"slices"
"strings"
"time"

"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
"go.flipt.io/flipt/internal/storage/fs/object"
"gocloud.dev/blob"
"golang.org/x/exp/constraints"
)

Expand Down Expand Up @@ -86,9 +92,27 @@ func Load(path string) (*Result, error) {
cfg = Default()
} else {
cfg = &Config{}
v.SetConfigFile(path)
if err := v.ReadInConfig(); err != nil {
return nil, fmt.Errorf("loading configuration: %w", err)
file, err := getConfigFile(context.Background(), path)
if err != nil {
return nil, err
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return nil, err
}

// reimplement logic from v.ReadInConfig()
v.SetConfigFile(stat.Name())
ext := filepath.Ext(stat.Name())
if len(ext) > 1 {
ext = ext[1:]
}
if !slices.Contains(viper.SupportedExts, ext) {
return nil, viper.UnsupportedConfigError(ext)
}
if err := v.ReadConfig(file); err != nil {
return nil, err
}
}

Expand Down Expand Up @@ -183,6 +207,33 @@ func Load(path string) (*Result, error) {
return result, nil
}

// getConfigFile provides a file from different type of storage.
func getConfigFile(ctx context.Context, path string) (fs.File, error) {
u, err := url.Parse(path)
if err != nil {
return nil, err
}

if slices.Contains(object.SupportedSchemes(), u.Scheme) {
key := strings.TrimPrefix(u.Path, "/")
u.Path = ""
bucket, err := object.OpenBucket(ctx, u)
if err != nil {
return nil, err
}
defer bucket.Close()
bucket.SetIOFSCallback(func() (context.Context, *blob.ReaderOptions) { return ctx, nil })
return bucket.Open(key)
}

// assumes that the local file is used
file, err := os.Open(path)
if err != nil {
return nil, err
}
return file, nil
}

type defaulter interface {
setDefaults(v *viper.Viper) error
}
Expand Down
60 changes: 60 additions & 0 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package config

import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"net/http/httptest"
"net/url"
"os"
"reflect"
"strings"
Expand All @@ -17,6 +19,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.flipt.io/flipt/internal/oci"
"gocloud.dev/blob"
"gocloud.dev/blob/memblob"
"gopkg.in/yaml.v2"
)

Expand Down Expand Up @@ -1351,3 +1355,59 @@ func Test_mustBindEnv(t *testing.T) {
})
}
}

type mockURLOpener struct {
bucket *blob.Bucket
}

// OpenBucketURL opens a blob.Bucket based on u.
func (c *mockURLOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*blob.Bucket, error) {
for param := range u.Query() {
return nil, fmt.Errorf("open bucket %v: invalid query parameter %q", u, param)
}
return c.bucket, nil
}

func TestGetConfigFile(t *testing.T) {
blob.DefaultURLMux().RegisterBucket("mock", &mockURLOpener{
bucket: memblob.OpenBucket(nil),
})
configData := []byte("some config data")
ctx := context.Background()
b, err := blob.OpenBucket(ctx, "mock://mybucket")
require.NoError(t, err)
t.Cleanup(func() { b.Close() })
w, err := b.NewWriter(ctx, "config/local.yml", nil)
require.NoError(t, err)
_, err = w.Write(configData)
require.NoError(t, err)
err = w.Close()
require.NoError(t, err)
t.Run("successful", func(t *testing.T) {
f, err := getConfigFile(ctx, "mock://mybucket/config/local.yml")
require.NoError(t, err)
s, err := f.Stat()
require.NoError(t, err)
require.Equal(t, "local.yml", s.Name())

data, err := io.ReadAll(f)
require.NoError(t, err)
require.Equal(t, configData, data)
})

for _, tt := range []struct {
name string
path string
}{
{"unknown bucket", "mock://otherbucket/config.yml"},
{"unknown scheme", "unknown://otherbucket/config.yml"},
{"no bucket", "mock://"},
{"no key", "mock://mybucket"},
{"no data", ""},
} {
t.Run(tt.name, func(t *testing.T) {
_, err = getConfigFile(ctx, tt.path)
require.Error(t, err)
})
}
}
64 changes: 64 additions & 0 deletions internal/storage/fs/object/mux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package object

import (
"context"
"fmt"
"net/url"

s3v2 "github.com/aws/aws-sdk-go-v2/service/s3"
gcaws "gocloud.dev/aws"
gcblob "gocloud.dev/blob"
_ "gocloud.dev/blob/azureblob"
"gocloud.dev/blob/gcsblob"
"gocloud.dev/blob/s3blob"
)

// s3Schema is a custom scheme for gocloud blob which works with
// how we interact with s3 (supports interfacing with minio)
const (
s3Schema = "s3i"
googlecloudSchema = "googlecloud"
)

func init() {
gcblob.DefaultURLMux().RegisterBucket(s3Schema, new(urlSessionOpener))
}

type urlSessionOpener struct{}

func (o *urlSessionOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*gcblob.Bucket, error) {
cfg, err := gcaws.V2ConfigFromURLParams(ctx, u.Query())
if err != nil {
return nil, fmt.Errorf("open bucket %v: %w", u, err)
}
clientV2 := s3v2.NewFromConfig(cfg, func(o *s3v2.Options) {
o.UsePathStyle = true
})
return s3blob.OpenBucketV2(ctx, clientV2, u.Host, &s3blob.Options{})
}

// OpenBucket opens the bucket identified by the URL given.
//
// See the URLOpener documentation in driver subpackages for
// details on supported URL formats, and https://gocloud.dev/concepts/urls/
// for more information.
func OpenBucket(ctx context.Context, urlstr *url.URL) (*gcblob.Bucket, error) {
urlCopy := *urlstr
urlCopy.Scheme = remapScheme(urlstr.Scheme)
return gcblob.DefaultURLMux().OpenBucketURL(ctx, &urlCopy)
}

func remapScheme(scheme string) string {
switch scheme {
case s3blob.Scheme:
return s3Schema
case googlecloudSchema:
return gcsblob.Scheme
default:
return scheme
}
}

func SupportedSchemes() []string {
return append(gcblob.DefaultURLMux().BucketSchemes(), googlecloudSchema)
}
30 changes: 30 additions & 0 deletions internal/storage/fs/object/mux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package object

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestRemapScheme(t *testing.T) {
for _, tt := range []struct {
scheme string
expected string
}{
{"s3", "s3i"},
{"azblob", "azblob"},
{"googlecloud", "gs"},
} {
t.Run(tt.scheme, func(t *testing.T) {
assert.Equal(t, tt.expected, remapScheme(tt.scheme))
})
}
}

func TestSupportedSchemes(t *testing.T) {
for _, tt := range []string{"s3", "s3i", "azblob", "googlecloud", "gs"} {
t.Run(tt, func(t *testing.T) {
assert.Contains(t, SupportedSchemes(), tt)
})
}
}
26 changes: 0 additions & 26 deletions internal/storage/fs/object/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,22 @@ package object
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"net/url"
"strings"
"sync"
"time"

s3v2 "github.com/aws/aws-sdk-go-v2/service/s3"
"go.flipt.io/flipt/internal/containers"
"go.flipt.io/flipt/internal/storage"
storagefs "go.flipt.io/flipt/internal/storage/fs"
"go.uber.org/zap"
gcaws "gocloud.dev/aws"
gcblob "gocloud.dev/blob"
"gocloud.dev/blob/s3blob"
"gocloud.dev/gcerrors"
)

var _ storagefs.SnapshotStore = (*SnapshotStore)(nil)

// S3Schema is a custom scheme for gocloud blob which works with
// how we interact with s3 (supports interfacing with minio)
const S3Schema = "s3i"

func init() {
gcblob.DefaultURLMux().RegisterBucket(S3Schema, new(urlSessionOpener))
}

type urlSessionOpener struct{}

func (o *urlSessionOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*gcblob.Bucket, error) {
cfg, err := gcaws.V2ConfigFromURLParams(ctx, u.Query())
if err != nil {
return nil, fmt.Errorf("open bucket %v: %w", u, err)
}
clientV2 := s3v2.NewFromConfig(cfg, func(o *s3v2.Options) {
o.UsePathStyle = true
})
return s3blob.OpenBucketV2(ctx, clientV2, u.Host, &s3blob.Options{})
}

type SnapshotStore struct {
*storagefs.Poller

Expand Down
Loading

0 comments on commit 91cc1b9

Please sign in to comment.