Skip to content

Commit

Permalink
add a BackupStore to pkg/persistence that supports prefixes
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Kriss <steve@heptio.com>
  • Loading branch information
skriss committed Sep 6, 2018
1 parent af64069 commit f0edf73
Show file tree
Hide file tree
Showing 28 changed files with 1,392 additions and 1,069 deletions.
10 changes: 5 additions & 5 deletions pkg/cloudprovider/aws/object_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (o *objectStore) Init(config map[string]string) error {
return nil
}

func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error {
func (o *objectStore) PutObject(bucket, key string, body io.Reader) error {
req := &s3manager.UploadInput{
Bucket: &bucket,
Key: &key,
Expand All @@ -134,7 +134,7 @@ func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error
return errors.Wrapf(err, "error putting object %s", key)
}

func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error) {
func (o *objectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
req := &s3.GetObjectInput{
Bucket: &bucket,
Key: &key,
Expand All @@ -148,9 +148,10 @@ func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error
return res.Body, nil
}

func (o *objectStore) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) {
func (o *objectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) {
req := &s3.ListObjectsV2Input{
Bucket: &bucket,
Prefix: &prefix,
Delimiter: &delimiter,
}

Expand All @@ -161,7 +162,6 @@ func (o *objectStore) ListCommonPrefixes(bucket string, delimiter string) ([]str
}
return !lastPage
})

if err != nil {
return nil, errors.WithStack(err)
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func (o *objectStore) ListObjects(bucket, prefix string) ([]string, error) {
return ret, nil
}

func (o *objectStore) DeleteObject(bucket string, key string) error {
func (o *objectStore) DeleteObject(bucket, key string) error {
req := &s3.DeleteObjectInput{
Bucket: &bucket,
Key: &key,
Expand Down
16 changes: 5 additions & 11 deletions pkg/cloudprovider/azure/object_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (o *objectStore) Init(config map[string]string) error {
return nil
}

func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error {
func (o *objectStore) PutObject(bucket, key string, body io.Reader) error {
container, err := getContainerReference(o.blobClient, bucket)
if err != nil {
return err
Expand All @@ -133,7 +133,7 @@ func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error
return errors.WithStack(blob.CreateBlockBlobFromReader(body, nil))
}

func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error) {
func (o *objectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
container, err := getContainerReference(o.blobClient, bucket)
if err != nil {
return nil, err
Expand All @@ -152,13 +152,14 @@ func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error
return res, nil
}

func (o *objectStore) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) {
func (o *objectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) {
container, err := getContainerReference(o.blobClient, bucket)
if err != nil {
return nil, err
}

params := storage.ListBlobsParameters{
Prefix: prefix,
Delimiter: delimiter,
}

Expand All @@ -167,14 +168,7 @@ func (o *objectStore) ListCommonPrefixes(bucket string, delimiter string) ([]str
return nil, errors.WithStack(err)
}

// Azure returns prefixes inclusive of the last delimiter. We need to strip
// it.
ret := make([]string, 0, len(res.BlobPrefixes))
for _, prefix := range res.BlobPrefixes {
ret = append(ret, prefix[0:strings.LastIndex(prefix, delimiter)])
}

return ret, nil
return res.BlobPrefixes, nil
}

func (o *objectStore) ListObjects(bucket, prefix string) ([]string, error) {
Expand Down
25 changes: 13 additions & 12 deletions pkg/cloudprovider/gcp/object_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"io"
"io/ioutil"
"os"
"strings"
"time"

"cloud.google.com/go/storage"
Expand Down Expand Up @@ -98,7 +97,7 @@ func (o *objectStore) Init(config map[string]string) error {
return nil
}

func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error {
func (o *objectStore) PutObject(bucket, key string, body io.Reader) error {
w := o.bucketWriter.getWriteCloser(bucket, key)

// The writer returned by NewWriter is asynchronous, so errors aren't guaranteed
Expand All @@ -114,7 +113,7 @@ func (o *objectStore) PutObject(bucket string, key string, body io.Reader) error
return closeErr
}

func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error) {
func (o *objectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
r, err := o.client.Bucket(bucket).Object(key).NewReader(context.Background())
if err != nil {
return nil, errors.WithStack(err)
Expand All @@ -123,28 +122,30 @@ func (o *objectStore) GetObject(bucket string, key string) (io.ReadCloser, error
return r, nil
}

func (o *objectStore) ListCommonPrefixes(bucket string, delimiter string) ([]string, error) {
func (o *objectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) {
q := &storage.Query{
Prefix: prefix,
Delimiter: delimiter,
}

var res []string

iter := o.client.Bucket(bucket).Objects(context.Background(), q)

var res []string
for {
obj, err := iter.Next()
if err == iterator.Done {
return res, nil
}
if err != nil {
if err != nil && err != iterator.Done {
return nil, errors.WithStack(err)
}
if err == iterator.Done {
break
}

if obj.Prefix != "" {
res = append(res, obj.Prefix[0:strings.LastIndex(obj.Prefix, delimiter)])
res = append(res, obj.Prefix)
}
}

return res, nil
}

func (o *objectStore) ListObjects(bucket, prefix string) ([]string, error) {
Expand All @@ -169,7 +170,7 @@ func (o *objectStore) ListObjects(bucket, prefix string) ([]string, error) {
}
}

func (o *objectStore) DeleteObject(bucket string, key string) error {
func (o *objectStore) DeleteObject(bucket, key string) error {
return errors.Wrapf(o.client.Bucket(bucket).Object(key).Delete(context.Background()), "error deleting object %s", key)
}

Expand Down
168 changes: 168 additions & 0 deletions pkg/cloudprovider/in_memory_object_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
Copyright 2018 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cloudprovider

import (
"bytes"
"errors"
"io"
"io/ioutil"
"strings"
"time"
)

type BucketData map[string][]byte

// InMemoryObjectStore is a simple implementation of the ObjectStore interface
// that stores its data in-memory/in-proc. This is mainly intended to be used
// as a test fake.
type InMemoryObjectStore struct {
Data map[string]BucketData
}

func NewInMemoryObjectStore(buckets ...string) *InMemoryObjectStore {
o := &InMemoryObjectStore{
Data: make(map[string]BucketData),
}

for _, bucket := range buckets {
o.Data[bucket] = make(map[string][]byte)
}

return o
}

//
// Interface Implementation
//

func (o *InMemoryObjectStore) Init(config map[string]string) error {
return nil
}

func (o *InMemoryObjectStore) PutObject(bucket, key string, body io.Reader) error {
bucketData, ok := o.Data[bucket]
if !ok {
return errors.New("bucket not found")
}

obj, err := ioutil.ReadAll(body)
if err != nil {
return err
}

bucketData[key] = obj

return nil
}

func (o *InMemoryObjectStore) GetObject(bucket, key string) (io.ReadCloser, error) {
bucketData, ok := o.Data[bucket]
if !ok {
return nil, errors.New("bucket not found")
}

obj, ok := bucketData[key]
if !ok {
return nil, errors.New("key not found")
}

return ioutil.NopCloser(bytes.NewReader(obj)), nil
}

func (o *InMemoryObjectStore) ListCommonPrefixes(bucket, prefix, delimiter string) ([]string, error) {
keys, err := o.ListObjects(bucket, prefix)
if err != nil {
return nil, err
}

// For each key, check if it has an instance of the delimiter *after* the prefix.
// If not, skip it; if so, return the prefix of the key up to/including the delimiter.

var prefixes []string
for _, key := range keys {
// everything after 'prefix'
afterPrefix := key[len(prefix):]

// index of the *start* of 'delimiter' in 'afterPrefix'
delimiterStart := strings.Index(afterPrefix, delimiter)
if delimiterStart == -1 {
continue
}

// return the prefix, plus everything after the prefix and before
// the delimiter, plus the delimiter
fullPrefix := prefix + afterPrefix[0:delimiterStart] + delimiter

prefixes = append(prefixes, fullPrefix)
}

return prefixes, nil
}

func (o *InMemoryObjectStore) ListObjects(bucket, prefix string) ([]string, error) {
bucketData, ok := o.Data[bucket]
if !ok {
return nil, errors.New("bucket not found")
}

var objs []string
for key := range bucketData {
if strings.HasPrefix(key, prefix) {
objs = append(objs, key)
}
}

return objs, nil
}

func (o *InMemoryObjectStore) DeleteObject(bucket, key string) error {
bucketData, ok := o.Data[bucket]
if !ok {
return errors.New("bucket not found")
}

delete(bucketData, key)

return nil
}

func (o *InMemoryObjectStore) CreateSignedURL(bucket, key string, ttl time.Duration) (string, error) {
bucketData, ok := o.Data[bucket]
if !ok {
return "", errors.New("bucket not found")
}

_, ok = bucketData[key]
if !ok {
return "", errors.New("key not found")
}

return "a-url", nil
}

//
// Test Helper Methods
//

func (o *InMemoryObjectStore) ClearBucket(bucket string) {
if _, ok := o.Data[bucket]; !ok {
return
}

o.Data[bucket] = make(map[string][]byte)
}
48 changes: 0 additions & 48 deletions pkg/cloudprovider/mocks/backup_lister.go

This file was deleted.

Loading

0 comments on commit f0edf73

Please sign in to comment.