Skip to content

Commit

Permalink
✨ Add internal/db/storage/blob
Browse files Browse the repository at this point in the history
Signed-off-by: Rintaro Okamura <rintaro.okamura@gmail.com>
  • Loading branch information
rinx authored and actions-user committed May 19, 2020
1 parent 36efe03 commit aa9f28e
Show file tree
Hide file tree
Showing 6 changed files with 451 additions and 0 deletions.
84 changes: 84 additions & 0 deletions internal/config/blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// Package config providers configuration type and load configuration logic
package config

import "strings"

type BlobStorageType uint8

const (
S3 BlobStorageType = iota
)

func (bst BlobStorageType) String() string {
switch bst {
case S3:
return "s3"
}
return "unknown"
}

func AtoBST(bst string) BlobStorageType {
switch strings.ToLower(bst) {
case "s3":
return S3
}
return 0
}

type Blob struct {
// StorageType represents blob storaget type
StorageType string `json:"storage_type" yaml:"storage_type"`

// BucketURL represents bucket URL
BucketURL string `json:"bucket_url" yaml:"bucket_url"`

// S3 represents S3 config
S3 *S3Config `json:"s3" yaml:"s3"`
}

type S3Config struct {
Endpoint string `json:"endpoint" yaml:"endpoint"`
Region string `json:"region" yaml:"region"`
AccessKey string `json:"access_key" yaml:"access_key"`
SecretAccessKey string `json:"secret_access_key" yaml:"secret_access_key"`
Token string `json:"token" yaml:"token"`
}

func (b *Blob) Bind() *Blob {
b.StorageType = GetActualValue(b.StorageType)
b.BucketURL = GetActualValue(b.BucketURL)

if b.S3 != nil {
b.S3 = b.S3.Bind()
} else {
b.S3 = new(S3Config)
}

return b
}

func (s *S3Config) Bind() *S3Config {
s.Endpoint = GetActualValue(s.Endpoint)
s.Region = GetActualValue(s.Region)
s.AccessKey = GetActualValue(s.AccessKey)
s.SecretAccessKey = GetActualValue(s.SecretAccessKey)
s.Token = GetActualValue(s.Token)

return s
}
78 changes: 78 additions & 0 deletions internal/db/storage/blob/blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package blob

import (
"context"
"io"
"net/url"
"reflect"

"github.com/vdaas/vald/internal/errors"

"gocloud.dev/blob"
)

type BucketURLOpener = blob.BucketURLOpener

type bucket struct {
opener BucketURLOpener
url string
bucket *blob.Bucket
}

type Bucket interface {
Open(ctx context.Context) error
Close() error
Reader(ctx context.Context, key string) (io.ReadCloser, error)
Writer(ctx context.Context, key string) (io.WriteCloser, error)
}

func NewBucket(opts ...Option) (Bucket, error) {
b := new(bucket)
for _, opt := range append(defaultOpts, opts...) {
if err := opt(b); err != nil {
return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt))
}
}

return b, nil
}

func (b *bucket) Open(ctx context.Context) (err error) {
url, err := url.Parse(b.url)
if err != nil {
return err
}
b.bucket, err = b.opener.OpenBucketURL(ctx, url)
return err
}

func (b *bucket) Close() error {
if b.bucket != nil {
return b.bucket.Close()
}
return nil
}

func (b *bucket) Reader(ctx context.Context, key string) (io.ReadCloser, error) {
return b.bucket.NewReader(ctx, key, nil)
}

func (b *bucket) Writer(ctx context.Context, key string) (io.WriteCloser, error) {
return b.bucket.NewWriter(ctx, key, nil)
}
129 changes: 129 additions & 0 deletions internal/db/storage/blob/blob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package blob

import (
"context"
"testing"

"github.com/vdaas/vald/internal/db/storage/blob/s3"
)

const (
endpoint = ""
region = ""
accessKey = ""
secretAccessKey = ""
bucketURL = ""
)

func TestS3Write(t *testing.T) {
bucket, err := NewBucket(
WithBucketURLOpener(
s3.NewS3Session(
s3.WithEndpoint(endpoint),
s3.WithRegion(region),
s3.WithAccessKey(accessKey),
s3.WithSecretAccessKey(secretAccessKey),
).URLOpener(),
),
WithBucketURL(bucketURL),
)
if err != nil {
t.Fatalf("bucket initialize failed: %s", err)
}

ctx := context.Background()

err = bucket.Open(ctx)
if err != nil {
t.Fatalf("bucket open failed: %s", err)
}

defer func() {
err = bucket.Close()
if err != nil {
t.Fatalf("bucket close failed: %s", err)
}
}()

w, err := bucket.Writer(ctx, "writer-test.txt")
if err != nil {
t.Fatalf("fetch writer failed: %s", err)
}
defer func() {
err = w.Close()
if err != nil {
t.Fatalf("writer close failed: %s", err)
}
}()

_, err = w.Write([]byte("Hello from blob world!"))
if err != nil {
t.Fatalf("write failed: %s", err)
}
}

func TestS3Read(t *testing.T) {
bucket, err := NewBucket(
WithBucketURLOpener(
s3.NewS3Session(
s3.WithEndpoint(endpoint),
s3.WithRegion(region),
s3.WithAccessKey(accessKey),
s3.WithSecretAccessKey(secretAccessKey),
).URLOpener(),
),
WithBucketURL(bucketURL),
)
if err != nil {
t.Fatalf("bucket initialize failed: %s", err)
}

ctx := context.Background()

err = bucket.Open(ctx)
if err != nil {
t.Fatalf("bucket open failed: %s", err)
}

defer func() {
err = bucket.Close()
if err != nil {
t.Fatalf("bucket close failed: %s", err)
}
}()

r, err := bucket.Reader(ctx, "writer-test.txt")
if err != nil {
t.Fatalf("fetch reader failed: %s", err)
}
defer func() {
err = r.Close()
if err != nil {
t.Fatalf("reader close failed: %s", err)
}
}()

rbuf := make([]byte, 16)
_, err = r.Read(rbuf)
if err != nil {
t.Fatalf("read failed: %s", err)
}

t.Logf("read: %s", string(rbuf))
}
37 changes: 37 additions & 0 deletions internal/db/storage/blob/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package blob

type Option func(b *bucket) error

var (
defaultOpts = []Option{}
)

func WithBucketURLOpener(bo BucketURLOpener) Option {
return func(b *bucket) error {
b.opener = bo
return nil
}
}

func WithBucketURL(url string) Option {
return func(b *bucket) error {
b.url = url
return nil
}
}
53 changes: 53 additions & 0 deletions internal/db/storage/blob/s3/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package s3

type Option func(s *s3session)

var (
defaultOpts = []Option{}
)

func WithEndpoint(ep string) Option {
return func(s *s3session) {
s.endpoint = ep
}
}

func WithRegion(rg string) Option {
return func(s *s3session) {
s.region = rg
}
}

func WithAccessKey(ak string) Option {
return func(s *s3session) {
s.accessKey = ak
}
}

func WithSecretAccessKey(sak string) Option {
return func(s *s3session) {
s.secretAccessKey = sak
}
}

func WithToken(tk string) Option {
return func(s *s3session) {
s.token = tk
}
}
Loading

0 comments on commit aa9f28e

Please sign in to comment.