Skip to content

Commit 310b4fd

Browse files
fjvelayaron2daixiang0beiwei30
authored
gcp bucket update (#1128)
* gcp bucket update create operation: support upload files in base64 * gcp bucket add get operation * gcp bucket add delete operation * gcp bucket add list operation * gcp bucket fix lint * gcp bucked add backward compatibility 'key' replace 'name' * gcp bucket fix lint * gcp bucket create operation return object url to download * gcp fix lint * gcp fix lint * gcp bucket fix lint * gcp bucket fix lint * gcp bucket fix error msg * rerun checks Co-authored-by: Yaron Schneider <yaronsc@microsoft.com> Co-authored-by: Long Dai <long0dai@foxmail.com> Co-authored-by: Ian Luo <ian.luo@gmail.com>
1 parent 0949537 commit 310b4fd

File tree

2 files changed

+434
-44
lines changed

2 files changed

+434
-44
lines changed

bindings/gcp/bucket/bucket.go

Lines changed: 223 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,36 @@ package bucket
77

88
import (
99
"context"
10+
b64 "encoding/base64"
1011
"encoding/json"
12+
"fmt"
13+
"io/ioutil"
14+
"net/url"
15+
"strconv"
1116

1217
"cloud.google.com/go/storage"
1318
"github.com/google/uuid"
19+
"google.golang.org/api/iterator"
1420
"google.golang.org/api/option"
1521

1622
"github.com/dapr/components-contrib/bindings"
1723
"github.com/dapr/kit/logger"
1824
)
1925

26+
const (
27+
objectURLBase = "https://storage.googleapis.com/%s/%s"
28+
metadataDecodeBase64 = "decodeBase64"
29+
metadataEncodeBase64 = "encodeBase64"
30+
31+
metadataKey = "key"
32+
maxResults = 1000
33+
34+
metadataKeyBC = "name"
35+
)
36+
2037
// GCPStorage allows saving data to GCP bucket storage.
2138
type GCPStorage struct {
22-
metadata gcpMetadata
39+
metadata *gcpMetadata
2340
client *storage.Client
2441
logger logger.Logger
2542
}
@@ -36,6 +53,18 @@ type gcpMetadata struct {
3653
TokenURI string `json:"token_uri"`
3754
AuthProviderCertURL string `json:"auth_provider_x509_cert_url"`
3855
ClientCertURL string `json:"client_x509_cert_url"`
56+
DecodeBase64 bool `json:"decodeBase64,string"`
57+
EncodeBase64 bool `json:"encodeBase64,string"`
58+
}
59+
60+
type listPayload struct {
61+
Prefix string `json:"prefix"`
62+
MaxResults int32 `json:"maxResults"`
63+
Delimiter string `json:"delimiter"`
64+
}
65+
66+
type createResponse struct {
67+
ObjectURL string `json:"objectURL"`
3968
}
4069

4170
// NewGCPStorage returns a new GCP storage instance.
@@ -45,62 +74,237 @@ func NewGCPStorage(logger logger.Logger) *GCPStorage {
4574

4675
// Init performs connection parsing.
4776
func (g *GCPStorage) Init(metadata bindings.Metadata) error {
48-
b, err := g.parseMetadata(metadata)
77+
m, b, err := g.parseMetadata(metadata)
4978
if err != nil {
5079
return err
5180
}
5281

53-
var gm gcpMetadata
54-
err = json.Unmarshal(b, &gm)
55-
if err != nil {
56-
return err
57-
}
5882
clientOptions := option.WithCredentialsJSON(b)
5983
ctx := context.Background()
6084
client, err := storage.NewClient(ctx, clientOptions)
6185
if err != nil {
6286
return err
6387
}
6488

65-
g.metadata = gm
89+
g.metadata = m
6690
g.client = client
6791

6892
return nil
6993
}
7094

71-
func (g *GCPStorage) parseMetadata(metadata bindings.Metadata) ([]byte, error) {
95+
func (g *GCPStorage) parseMetadata(metadata bindings.Metadata) (*gcpMetadata, []byte, error) {
7296
b, err := json.Marshal(metadata.Properties)
7397
if err != nil {
74-
return nil, err
98+
return nil, nil, err
7599
}
76100

77-
return b, nil
101+
var m gcpMetadata
102+
err = json.Unmarshal(b, &m)
103+
if err != nil {
104+
return nil, nil, err
105+
}
106+
107+
return &m, b, nil
78108
}
79109

80110
func (g *GCPStorage) Operations() []bindings.OperationKind {
81-
return []bindings.OperationKind{bindings.CreateOperation}
111+
return []bindings.OperationKind{
112+
bindings.CreateOperation,
113+
bindings.GetOperation,
114+
bindings.DeleteOperation,
115+
bindings.ListOperation,
116+
}
82117
}
83118

84119
func (g *GCPStorage) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
120+
req.Metadata = g.handleBackwardCompatibilityForMetadata(req.Metadata)
121+
122+
switch req.Operation {
123+
case bindings.CreateOperation:
124+
return g.create(req)
125+
case bindings.GetOperation:
126+
return g.get(req)
127+
case bindings.DeleteOperation:
128+
return g.delete(req)
129+
case bindings.ListOperation:
130+
return g.list(req)
131+
default:
132+
return nil, fmt.Errorf("unsupported operation %s", req.Operation)
133+
}
134+
}
135+
136+
func (g *GCPStorage) create(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
137+
var err error
138+
metadata, err := g.metadata.mergeWithRequestMetadata(req)
139+
if err != nil {
140+
return nil, fmt.Errorf("gcp bucket binding error. error merge metadata : %w", err)
141+
}
142+
85143
var name string
86-
if val, ok := req.Metadata["name"]; ok && val != "" {
144+
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
87145
name = val
88146
} else {
89-
id, err := uuid.NewRandom()
90-
if err != nil {
91-
return nil, err
147+
name = uuid.New().String()
148+
g.logger.Debugf("key not found. generating name %s", name)
149+
}
150+
151+
d, err := strconv.Unquote(string(req.Data))
152+
if err == nil {
153+
req.Data = []byte(d)
154+
}
155+
156+
if metadata.DecodeBase64 {
157+
decoded, decodeError := b64.StdEncoding.DecodeString(string(req.Data))
158+
if decodeError != nil {
159+
return nil, fmt.Errorf("gcp bucket binding error. decode : %w", decodeError)
92160
}
93-
name = id.String()
161+
req.Data = decoded
94162
}
163+
95164
h := g.client.Bucket(g.metadata.Bucket).Object(name).NewWriter(context.Background())
96165
defer h.Close()
97-
if _, err := h.Write(req.Data); err != nil {
166+
if _, err = h.Write(req.Data); err != nil {
167+
return nil, fmt.Errorf("gcp bucket binding error. Uploading: %w", err)
168+
}
169+
170+
objectURL, err := url.Parse(fmt.Sprintf(objectURLBase, g.metadata.Bucket, name))
171+
if err != nil {
172+
return nil, fmt.Errorf("gcp bucket binding error. error building url response: %w", err)
173+
}
174+
175+
resp := createResponse{
176+
ObjectURL: objectURL.String(),
177+
}
178+
179+
b, err := json.Marshal(resp)
180+
if err != nil {
181+
return nil, fmt.Errorf("gcp binding error. error marshalling create response: %w", err)
182+
}
183+
184+
return &bindings.InvokeResponse{
185+
Data: b,
186+
}, nil
187+
}
188+
189+
func (g *GCPStorage) get(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
190+
metadata, err := g.metadata.mergeWithRequestMetadata(req)
191+
if err != nil {
192+
return nil, fmt.Errorf("gcp binding binding error. error merge metadata : %w", err)
193+
}
194+
195+
var key string
196+
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
197+
key = val
198+
} else {
199+
return nil, fmt.Errorf("gcp bucket binding error: can't read key value")
200+
}
201+
202+
rc, err := g.client.Bucket(g.metadata.Bucket).Object(key).NewReader(context.Background())
203+
if err != nil {
204+
return nil, fmt.Errorf("gcp bucketgcp bucket binding error: error downloading bucket object: %w", err)
205+
}
206+
defer rc.Close()
207+
208+
data, err := ioutil.ReadAll(rc)
209+
if err != nil {
210+
return nil, fmt.Errorf("gcp bucketgcp bucket binding error: ioutil.ReadAll: %v", err)
211+
}
212+
213+
if metadata.EncodeBase64 {
214+
encoded := b64.StdEncoding.EncodeToString(data)
215+
data = []byte(encoded)
216+
}
217+
218+
return &bindings.InvokeResponse{
219+
Data: data,
220+
Metadata: nil,
221+
}, nil
222+
}
223+
224+
func (g *GCPStorage) delete(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
225+
var key string
226+
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
227+
key = val
228+
} else {
229+
return nil, fmt.Errorf("gcp bucketgcp bucket binding error: can't read key value")
230+
}
231+
232+
object := g.client.Bucket(g.metadata.Bucket).Object(key)
233+
234+
err := object.Delete(context.Background())
235+
236+
return nil, err
237+
}
238+
239+
func (g *GCPStorage) list(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
240+
var payload listPayload
241+
err := json.Unmarshal(req.Data, &payload)
242+
if err != nil {
98243
return nil, err
99244
}
100245

101-
return nil, nil
246+
if payload.MaxResults == int32(0) {
247+
payload.MaxResults = maxResults
248+
}
249+
250+
input := &storage.Query{
251+
Prefix: payload.Prefix,
252+
Delimiter: payload.Delimiter,
253+
}
254+
255+
var result []storage.ObjectAttrs
256+
it := g.client.Bucket(g.metadata.Bucket).Objects(context.Background(), input)
257+
for {
258+
attrs, errIt := it.Next()
259+
if errIt == iterator.Done || len(result) == int(payload.MaxResults) {
260+
break
261+
}
262+
result = append(result, *attrs)
263+
}
264+
265+
jsonResponse, err := json.Marshal(result)
266+
if err != nil {
267+
return nil, fmt.Errorf("gcp bucketgcp bucket binding error. list operation. cannot marshal blobs to json: %w", err)
268+
}
269+
270+
return &bindings.InvokeResponse{
271+
Data: jsonResponse,
272+
}, nil
102273
}
103274

104275
func (g *GCPStorage) Close() error {
105276
return g.client.Close()
106277
}
278+
279+
// Helper to merge config and request metadata.
280+
func (metadata gcpMetadata) mergeWithRequestMetadata(req *bindings.InvokeRequest) (gcpMetadata, error) {
281+
merged := metadata
282+
283+
if val, ok := req.Metadata[metadataDecodeBase64]; ok && val != "" {
284+
valBool, err := strconv.ParseBool(val)
285+
if err != nil {
286+
return merged, err
287+
}
288+
merged.DecodeBase64 = valBool
289+
}
290+
291+
if val, ok := req.Metadata[metadataEncodeBase64]; ok && val != "" {
292+
valBool, err := strconv.ParseBool(val)
293+
if err != nil {
294+
return merged, err
295+
}
296+
merged.EncodeBase64 = valBool
297+
}
298+
299+
return merged, nil
300+
}
301+
302+
// Add backward compatibility. 'key' replace 'name'.
303+
func (g *GCPStorage) handleBackwardCompatibilityForMetadata(metadata map[string]string) map[string]string {
304+
if val, ok := metadata[metadataKeyBC]; ok && val != "" {
305+
metadata[metadataKey] = val
306+
delete(metadata, metadataKeyBC)
307+
}
308+
309+
return metadata
310+
}

0 commit comments

Comments
 (0)