Skip to content

Commit

Permalink
blob: add URL schemes for gcsblob and s3blob (#605)
Browse files Browse the repository at this point in the history
  • Loading branch information
vangent authored Nov 2, 2018
1 parent 1ccb8e0 commit 2152f20
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 9 deletions.
71 changes: 68 additions & 3 deletions blob/gcsblob/gcsblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@

// Package gcsblob provides an implementation of blob that uses GCS.
//
// For blob.Open URLs, gcsblob registers for the "gs" protocol.
// The URL's Host is used as the bucket name.
// The following query options are supported:
// - cred_path: Sets path to the Google credentials file. If unset, default
// credentials are loaded.
// See https://cloud.google.com/docs/authentication/production.
// - access_id: Sets Options.GoogleAccessID.
// - private_key_path: Sets path to a private key, which is read and used
// to set Options.PrivateKey.
// Example URL: blob.Open("gs://mybucket")
//
// It exposes the following types for As:
// Bucket: *storage.Client
// ListObject: storage.ObjectAttrs
Expand All @@ -28,6 +39,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net/url"
"sort"
"strings"
"time"
Expand All @@ -37,13 +49,57 @@ import (
"github.com/google/go-cloud/gcp"

"cloud.google.com/go/storage"
"golang.org/x/oauth2/google"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)

const defaultPageSize = 1000

func init() {
blob.Register("gs", func(ctx context.Context, u *url.URL) (driver.Bucket, error) {
q := u.Query()
opts := &Options{}

if accessID := q["access_id"]; len(accessID) > 0 {
opts.GoogleAccessID = accessID[0]
}

if keyPath := q["private_key_path"]; len(keyPath) > 0 {
pk, err := ioutil.ReadFile(keyPath[0])
if err != nil {
return nil, fmt.Errorf("reading private key: %v", err)
}
opts.PrivateKey = pk
}

var creds *google.Credentials
if credPath := q["cred_path"]; len(credPath) == 0 {
var err error
creds, err = gcp.DefaultCredentials(ctx)
if err != nil {
return nil, err
}
} else {
jsonCreds, err := ioutil.ReadFile(credPath[0])
if err != nil {
return nil, fmt.Errorf("reading credentials: %v", err)
}
creds, err = google.CredentialsFromJSON(ctx, jsonCreds)
if err != nil {
return nil, fmt.Errorf("loading credentials: %v", err)
}
}

client, err := gcp.NewHTTPClient(gcp.DefaultTransport(), gcp.CredentialsTokenSource(creds))
if err != nil {
return nil, err
}
return openBucket(ctx, u.Host, client, opts)
})
}

// Options sets options for constructing a *blob.Bucket backed by GCS.
type Options struct {
// GoogleAccessID represents the authorizer for SignedURL.
Expand All @@ -62,8 +118,8 @@ type Options struct {
SignBytes func([]byte) ([]byte, error)
}

// OpenBucket returns a GCS Bucket that communicates using the given HTTP client.
func OpenBucket(ctx context.Context, bucketName string, client *gcp.HTTPClient, opts *Options) (*blob.Bucket, error) {
// openBucket returns a GCS Bucket that communicates using the given HTTP client.
func openBucket(ctx context.Context, bucketName string, client *gcp.HTTPClient, opts *Options) (driver.Bucket, error) {
if client == nil {
return nil, fmt.Errorf("OpenBucket requires an HTTP client")
}
Expand All @@ -74,7 +130,16 @@ func OpenBucket(ctx context.Context, bucketName string, client *gcp.HTTPClient,
if opts == nil {
opts = &Options{}
}
return blob.NewBucket(&bucket{name: bucketName, client: c, opts: opts}), nil
return &bucket{name: bucketName, client: c, opts: opts}, nil
}

// OpenBucket returns a GCS Bucket that communicates using the given HTTP client.
func OpenBucket(ctx context.Context, bucketName string, client *gcp.HTTPClient, opts *Options) (*blob.Bucket, error) {
drv, err := openBucket(ctx, bucketName, client, opts)
if err != nil {
return nil, err
}
return blob.NewBucket(drv), nil
}

// bucket represents a GCS bucket, which handles read, write and delete operations
Expand Down
45 changes: 40 additions & 5 deletions blob/s3blob/s3blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@

// Package s3blob provides an implementation of blob using S3.
//
// It exposes the following types for As:
// For blob.Open URLs, s3blob registers for the "s3" protocol.
// The URL's Host is used as the bucket name.
// The AWS session is created as described in
// https://docs.aws.amazon.com/sdk-for-go/api/aws/session/.
// The following query options are supported:
// - region: The AWS region for requests; sets aws.Config.Region.
// Example URL: blob.Open("s3://mybucket?region=us-east-1")
//
// s3blob exposes the following types for As:
// Bucket: *s3.S3
// ListObject: s3.Object for objects, s3.CommonPrefix for "directories".
// ListOptions.BeforeList: *s3.ListObjectsV2Input
Expand All @@ -30,6 +38,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net/url"
"sort"
"strconv"
"strings"
Expand All @@ -40,25 +49,51 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)

const defaultPageSize = 1000

func init() {
blob.Register("s3", func(ctx context.Context, u *url.URL) (driver.Bucket, error) {
q := u.Query()
cfg := &aws.Config{}

if region := q["region"]; len(region) > 0 {
cfg.Region = aws.String(region[0])
}
sess, err := session.NewSession(cfg)
if err != nil {
return nil, err
}
return openBucket(ctx, u.Host, sess, nil)
})
}

// Options sets options for constructing a *blob.Bucket backed by fileblob.
type Options struct{}

// OpenBucket returns an S3 Bucket.
func OpenBucket(ctx context.Context, bucketName string, sess client.ConfigProvider, _ *Options) (*blob.Bucket, error) {
// openBucket returns an S3 Bucket.
func openBucket(ctx context.Context, bucketName string, sess client.ConfigProvider, _ *Options) (driver.Bucket, error) {
if sess == nil {
return nil, errors.New("sess must be provided to get bucket")
}
return blob.NewBucket(&bucket{
return &bucket{
name: bucketName,
sess: sess,
client: s3.New(sess),
}), nil
}, nil
}

// OpenBucket returns an S3 Bucket.
func OpenBucket(ctx context.Context, bucketName string, sess client.ConfigProvider, opts *Options) (*blob.Bucket, error) {
drv, err := openBucket(ctx, bucketName, sess, opts)
if err != nil {
return nil, err
}
return blob.NewBucket(drv), nil
}

var emptyBody = ioutil.NopCloser(strings.NewReader(""))
Expand Down
2 changes: 1 addition & 1 deletion blob/s3blob/s3blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (h *harness) HTTPClient() *http.Client {
}

func (h *harness) MakeDriver(ctx context.Context) (driver.Bucket, error) {
return &bucket{name: bucketName, sess: h.session, client: s3.New(h.session)}, nil
return openBucket(ctx, bucketName, h.session, nil)
}

func (h *harness) Close() {
Expand Down

0 comments on commit 2152f20

Please sign in to comment.