Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 223 additions & 19 deletions bindings/gcp/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,36 @@ package bucket

import (
"context"
b64 "encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"net/url"
"strconv"

"cloud.google.com/go/storage"
"github.com/google/uuid"
"google.golang.org/api/iterator"
"google.golang.org/api/option"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)

const (
objectURLBase = "https://storage.googleapis.com/%s/%s"
metadataDecodeBase64 = "decodeBase64"
metadataEncodeBase64 = "encodeBase64"

metadataKey = "key"
maxResults = 1000

metadataKeyBC = "name"
)

// GCPStorage allows saving data to GCP bucket storage.
type GCPStorage struct {
metadata gcpMetadata
metadata *gcpMetadata
client *storage.Client
logger logger.Logger
}
Expand All @@ -36,6 +53,18 @@ type gcpMetadata struct {
TokenURI string `json:"token_uri"`
AuthProviderCertURL string `json:"auth_provider_x509_cert_url"`
ClientCertURL string `json:"client_x509_cert_url"`
DecodeBase64 bool `json:"decodeBase64,string"`
EncodeBase64 bool `json:"encodeBase64,string"`
}

type listPayload struct {
Prefix string `json:"prefix"`
MaxResults int32 `json:"maxResults"`
Delimiter string `json:"delimiter"`
}

type createResponse struct {
ObjectURL string `json:"objectURL"`
}

// NewGCPStorage returns a new GCP storage instance.
Expand All @@ -45,62 +74,237 @@ func NewGCPStorage(logger logger.Logger) *GCPStorage {

// Init performs connection parsing.
func (g *GCPStorage) Init(metadata bindings.Metadata) error {
b, err := g.parseMetadata(metadata)
m, b, err := g.parseMetadata(metadata)
if err != nil {
return err
}

var gm gcpMetadata
err = json.Unmarshal(b, &gm)
if err != nil {
return err
}
clientOptions := option.WithCredentialsJSON(b)
ctx := context.Background()
client, err := storage.NewClient(ctx, clientOptions)
if err != nil {
return err
}

g.metadata = gm
g.metadata = m
g.client = client

return nil
}

func (g *GCPStorage) parseMetadata(metadata bindings.Metadata) ([]byte, error) {
func (g *GCPStorage) parseMetadata(metadata bindings.Metadata) (*gcpMetadata, []byte, error) {
b, err := json.Marshal(metadata.Properties)
if err != nil {
return nil, err
return nil, nil, err
}

return b, nil
var m gcpMetadata
err = json.Unmarshal(b, &m)
if err != nil {
return nil, nil, err
}

return &m, b, nil
}

func (g *GCPStorage) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
return []bindings.OperationKind{
bindings.CreateOperation,
bindings.GetOperation,
bindings.DeleteOperation,
bindings.ListOperation,
}
}

func (g *GCPStorage) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
req.Metadata = g.handleBackwardCompatibilityForMetadata(req.Metadata)

switch req.Operation {
case bindings.CreateOperation:
return g.create(req)
case bindings.GetOperation:
return g.get(req)
case bindings.DeleteOperation:
return g.delete(req)
case bindings.ListOperation:
return g.list(req)
default:
return nil, fmt.Errorf("unsupported operation %s", req.Operation)
}
}

func (g *GCPStorage) create(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var err error
metadata, err := g.metadata.mergeWithRequestMetadata(req)
if err != nil {
return nil, fmt.Errorf("gcp bucket binding error. error merge metadata : %w", err)
}

var name string
if val, ok := req.Metadata["name"]; ok && val != "" {
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
name = val
} else {
id, err := uuid.NewRandom()
if err != nil {
return nil, err
name = uuid.New().String()
g.logger.Debugf("key not found. generating name %s", name)
}

d, err := strconv.Unquote(string(req.Data))
if err == nil {
req.Data = []byte(d)
}

if metadata.DecodeBase64 {
decoded, decodeError := b64.StdEncoding.DecodeString(string(req.Data))
if decodeError != nil {
return nil, fmt.Errorf("gcp bucket binding error. decode : %w", decodeError)
}
name = id.String()
req.Data = decoded
}

h := g.client.Bucket(g.metadata.Bucket).Object(name).NewWriter(context.Background())
defer h.Close()
if _, err := h.Write(req.Data); err != nil {
if _, err = h.Write(req.Data); err != nil {
return nil, fmt.Errorf("gcp bucket binding error. Uploading: %w", err)
}

objectURL, err := url.Parse(fmt.Sprintf(objectURLBase, g.metadata.Bucket, name))
if err != nil {
return nil, fmt.Errorf("gcp bucket binding error. error building url response: %w", err)
}

resp := createResponse{
ObjectURL: objectURL.String(),
}

b, err := json.Marshal(resp)
if err != nil {
return nil, fmt.Errorf("gcp binding error. error marshalling create response: %w", err)
}

return &bindings.InvokeResponse{
Data: b,
}, nil
}

func (g *GCPStorage) get(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
metadata, err := g.metadata.mergeWithRequestMetadata(req)
if err != nil {
return nil, fmt.Errorf("gcp binding binding error. error merge metadata : %w", err)
}

var key string
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
key = val
} else {
return nil, fmt.Errorf("gcp bucket binding error: can't read key value")
}

rc, err := g.client.Bucket(g.metadata.Bucket).Object(key).NewReader(context.Background())
if err != nil {
return nil, fmt.Errorf("gcp bucketgcp bucket binding error: error downloading bucket object: %w", err)
}
defer rc.Close()

data, err := ioutil.ReadAll(rc)
if err != nil {
return nil, fmt.Errorf("gcp bucketgcp bucket binding error: ioutil.ReadAll: %v", err)
}

if metadata.EncodeBase64 {
encoded := b64.StdEncoding.EncodeToString(data)
data = []byte(encoded)
}

return &bindings.InvokeResponse{
Data: data,
Metadata: nil,
}, nil
}

func (g *GCPStorage) delete(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var key string
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
key = val
} else {
return nil, fmt.Errorf("gcp bucketgcp bucket binding error: can't read key value")
}

object := g.client.Bucket(g.metadata.Bucket).Object(key)

err := object.Delete(context.Background())

return nil, err
}

func (g *GCPStorage) list(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var payload listPayload
err := json.Unmarshal(req.Data, &payload)
if err != nil {
return nil, err
}

return nil, nil
if payload.MaxResults == int32(0) {
payload.MaxResults = maxResults
}

input := &storage.Query{
Prefix: payload.Prefix,
Delimiter: payload.Delimiter,
}

var result []storage.ObjectAttrs
it := g.client.Bucket(g.metadata.Bucket).Objects(context.Background(), input)
for {
attrs, errIt := it.Next()
if errIt == iterator.Done || len(result) == int(payload.MaxResults) {
break
}
result = append(result, *attrs)
}

jsonResponse, err := json.Marshal(result)
if err != nil {
return nil, fmt.Errorf("gcp bucketgcp bucket binding error. list operation. cannot marshal blobs to json: %w", err)
}

return &bindings.InvokeResponse{
Data: jsonResponse,
}, nil
}

func (g *GCPStorage) Close() error {
return g.client.Close()
}

// Helper to merge config and request metadata.
func (metadata gcpMetadata) mergeWithRequestMetadata(req *bindings.InvokeRequest) (gcpMetadata, error) {
merged := metadata

if val, ok := req.Metadata[metadataDecodeBase64]; ok && val != "" {
valBool, err := strconv.ParseBool(val)
if err != nil {
return merged, err
}
merged.DecodeBase64 = valBool
}

if val, ok := req.Metadata[metadataEncodeBase64]; ok && val != "" {
valBool, err := strconv.ParseBool(val)
if err != nil {
return merged, err
}
merged.EncodeBase64 = valBool
}

return merged, nil
}

// Add backward compatibility. 'key' replace 'name'.
func (g *GCPStorage) handleBackwardCompatibilityForMetadata(metadata map[string]string) map[string]string {
if val, ok := metadata[metadataKeyBC]; ok && val != "" {
metadata[metadataKey] = val
delete(metadata, metadataKeyBC)
}

return metadata
}
Loading