Skip to content

Commit

Permalink
Merge pull request #11 from mickael-carl/go-cloud
Browse files Browse the repository at this point in the history
Support all major cloud providers for blob storage
  • Loading branch information
mickael-carl authored May 22, 2019
2 parents 876afd8 + df08771 commit f0496ad
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 140 deletions.
48 changes: 48 additions & 0 deletions go_dependencies.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ def bb_storage_go_dependencies():
importpath = "contrib.go.opencensus.io/exporter/jaeger",
)

go_repository(
name = "dev_gocloud",
commit = "a68836e8e108ad55d26e8e2d21579028090c8aa5",
importpath = "gocloud.dev",
)

go_repository(
name = "org_golang_google_api",
importpath = "google.golang.org/api",
Expand Down Expand Up @@ -151,10 +157,52 @@ def bb_storage_go_dependencies():
strip_prefix = "opencensus-go-0.21.0",
)

go_repository(
name = "com_google_cloud_go",
commit = "09ad026a62f0561b7f7e276569eda11a6afc9773",
importpath = "cloud.google.com/go",
)

go_repository(
name = "org_golang_x_xerrors",
commit = "385005612d73f6925de56cb1886917aeaf90e3c5",
importpath = "golang.org/x/xerrors",
)

go_repository(
name = "com_github_hashicorp_golang_lru",
importpath = "github.com/hashicorp/golang-lru",
urls = ["https://github.com/hashicorp/golang-lru/archive/v0.5.1.tar.gz"],
sha256 = "3bf57512af746dc0338651ba1c35c65fe907ff214ccb22d679539f7ea791511e",
strip_prefix = "golang-lru-0.5.1",
)

go_repository(
name = "com_github_googleapis_gax_go",
commit = "9e334198cafcf7b281a9673424d7b1c3a02ebd50",
importpath = "github.com/googleapis/gax-go",
)

go_repository(
name = "org_golang_x_oauth2",
commit = "9f3314589c9a9136388751d9adae6b0ed400978a",
importpath = "golang.org/x/oauth2",
)

go_repository(
name = "com_github_google_wire",
commit = "2183ee4806cf1878e136fea26f06f9abef9375b6",
importpath = "github.com/google/wire",
)

go_repository(
name = "com_github_azure_azure_pipeline_go",
commit = "55fedc85a614dcd0e942a66f302ae3efb83d563c",
importpath = "github.com/Azure/azure-pipeline-go",
)

go_repository(
name = "com_github_azure_azure_storage_blob_go",
commit = "8a1deeeabe0a24f918d29630ede0da2a1c8f3b2f",
importpath = "github.com/Azure/azure-storage-blob-go",
)
8 changes: 3 additions & 5 deletions pkg/blobstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,26 @@ go_library(
srcs = [
"action_cache_blob_access.go",
"blob_access.go",
"cloud_blob_access.go",
"content_addressable_storage_blob_access.go",
"error_blob_access.go",
"merkle_blob_access.go",
"metrics_blob_access.go",
"redis_blob_access.go",
"remote_blob_access.go",
"s3_blob_access.go",
"size_distinguishing_blob_access.go",
],
importpath = "github.com/buildbarn/bb-storage/pkg/blobstore",
visibility = ["//visibility:public"],
deps = [
"//pkg/util:go_default_library",
"@com_github_aws_aws_sdk_go//aws:go_default_library",
"@com_github_aws_aws_sdk_go//aws/awserr:go_default_library",
"@com_github_aws_aws_sdk_go//service/s3:go_default_library",
"@com_github_aws_aws_sdk_go//service/s3/s3manager:go_default_library",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:go_default_library",
"@com_github_go_redis_redis//:go_default_library",
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_google_uuid//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@dev_gocloud//blob:go_default_library",
"@dev_gocloud//gcerrors:go_default_library",
"@go_googleapis//google/bytestream:bytestream_go_proto",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
Expand Down
70 changes: 70 additions & 0 deletions pkg/blobstore/cloud_blob_access.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package blobstore

import (
"context"
"io"

"github.com/buildbarn/bb-storage/pkg/util"

"gocloud.dev/blob"
"gocloud.dev/gcerrors"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type cloudBlobAccess struct {
bucket *blob.Bucket
keyPrefix string
blobKeyFormat util.DigestKeyFormat
}

func NewCloudBlobAccess(bucket *blob.Bucket, keyPrefix string, keyFormat util.DigestKeyFormat) *cloudBlobAccess {
return &cloudBlobAccess{
bucket: bucket,
keyPrefix: keyPrefix,
blobKeyFormat: keyFormat,
}
}

func (ba *cloudBlobAccess) Get(ctx context.Context, digest *util.Digest) (int64, io.ReadCloser, error) {
result, err := ba.bucket.NewReader(ctx, *ba.getKey(digest), nil)
if err != nil {
if gcerrors.Code(err) == gcerrors.NotFound {
err = status.Errorf(codes.NotFound, err.Error())
}
return 0, nil, err
}
return result.Size(), result, err
}

func (ba *cloudBlobAccess) Put(ctx context.Context, digest *util.Digest, sizeBytes int64, r io.ReadCloser) error {
w, err := ba.bucket.NewWriter(ctx, *ba.getKey(digest), nil)
if err != nil {
return err
}
defer w.Close()
_, err = io.Copy(w, r)
return err
}

func (ba *cloudBlobAccess) Delete(ctx context.Context, digest *util.Digest) error {
return ba.bucket.Delete(ctx, *ba.getKey(digest))
}

func (ba *cloudBlobAccess) FindMissing(ctx context.Context, digests []*util.Digest) ([]*util.Digest, error) {
var missing []*util.Digest
for _, digest := range digests {
if exists, err := ba.bucket.Exists(ctx, *ba.getKey(digest)); err != nil {
return nil, err
} else if !exists {
missing = append(missing, digest)
}
}
return missing, nil
}

func (ba *cloudBlobAccess) getKey(digest *util.Digest) *string {
s := ba.keyPrefix + digest.GetKey(ba.blobKeyFormat)
return &s
}
13 changes: 11 additions & 2 deletions pkg/blobstore/configuration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,22 @@ go_library(
"@com_github_aws_aws_sdk_go//aws:go_default_library",
"@com_github_aws_aws_sdk_go//aws/credentials:go_default_library",
"@com_github_aws_aws_sdk_go//aws/session:go_default_library",
"@com_github_aws_aws_sdk_go//service/s3:go_default_library",
"@com_github_aws_aws_sdk_go//service/s3/s3manager:go_default_library",
"@com_github_azure_azure_pipeline_go//pipeline:go_default_library",
"@com_github_azure_azure_storage_blob_go//azblob:go_default_library",
"@com_github_go_redis_redis//:go_default_library",
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_prometheus//:go_default_library",
"@com_google_cloud_go//storage:go_default_library",
"@dev_gocloud//blob:go_default_library",
"@dev_gocloud//blob/azureblob:go_default_library",
"@dev_gocloud//blob/fileblob:go_default_library",
"@dev_gocloud//blob/gcsblob:go_default_library",
"@dev_gocloud//blob/memblob:go_default_library",
"@dev_gocloud//blob/s3blob:go_default_library",
"@dev_gocloud//gcp:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
"@org_golang_x_oauth2//google:go_default_library",
],
)
108 changes: 81 additions & 27 deletions pkg/blobstore/configuration/create_blob_access.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package configuration

import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"

"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/circular"
"github.com/buildbarn/bb-storage/pkg/blobstore/sharding"
Expand All @@ -21,9 +21,20 @@ import (
"github.com/golang/protobuf/proto"
"github.com/grpc-ecosystem/go-grpc-prometheus"

"gocloud.dev/blob"
"gocloud.dev/blob/azureblob"
_ "gocloud.dev/blob/fileblob"
"gocloud.dev/blob/gcsblob"
_ "gocloud.dev/blob/memblob"
"gocloud.dev/blob/s3blob"
"gocloud.dev/gcp"

"golang.org/x/oauth2/google"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"cloud.google.com/go/storage"
)

// CreateBlobAccessObjectsFromConfig creates a pair of BlobAccess
Expand Down Expand Up @@ -126,6 +137,74 @@ func createBlobAccess(config *pb.BlobAccessConfiguration, storageType string, di
circular.NewBulkAllocatingStateStore(
stateStore,
backend.Circular.DataAllocationChunkSizeBytes)))
case *pb.BlobAccessConfiguration_Cloud:
backendType = "cloud"
switch backendConfig := backend.Cloud.Config.(type) {
case *pb.CloudBlobAccessConfiguration_Url:
ctx := context.Background()
bucket, err := blob.OpenBucket(ctx, backendConfig.Url)
if err != nil {
return nil, err
}
implementation = blobstore.NewCloudBlobAccess(bucket, backend.Cloud.KeyPrefix, digestKeyFormat)
case *pb.CloudBlobAccessConfiguration_Azure:
backendType = "azure"
credential, err := azureblob.NewCredential(azureblob.AccountName(backendConfig.Azure.AccountName), azureblob.AccountKey(backendConfig.Azure.AccountKey))
if err != nil {
return nil, err
}
pipeline := azureblob.NewPipeline(credential, azblob.PipelineOptions{})
ctx := context.Background()
bucket, err := azureblob.OpenBucket(ctx, pipeline, azureblob.AccountName(backendConfig.Azure.AccountName), backendConfig.Azure.ContainerName, nil)
if err != nil {
return nil, err
}
implementation = blobstore.NewCloudBlobAccess(bucket, backend.Cloud.KeyPrefix, digestKeyFormat)
case *pb.CloudBlobAccessConfiguration_Gcs:
backendType = "gcs"
var creds *google.Credentials
var err error
ctx := context.Background()
if backendConfig.Gcs.Credentials != "" {
creds, err = google.CredentialsFromJSON(ctx, []byte(backendConfig.Gcs.Credentials), storage.ScopeReadWrite)
} else {
creds, err = google.FindDefaultCredentials(ctx, storage.ScopeReadWrite)
}
if err != nil {
return nil, err
}
client, err := gcp.NewHTTPClient(gcp.DefaultTransport(), gcp.CredentialsTokenSource(creds))
if err != nil {
return nil, err
}
bucket, err := gcsblob.OpenBucket(ctx, client, backendConfig.Gcs.Bucket, nil)
if err != nil {
return nil, err
}
implementation = blobstore.NewCloudBlobAccess(bucket, backend.Cloud.KeyPrefix, digestKeyFormat)
case *pb.CloudBlobAccessConfiguration_S3:
backendType = "s3"
cfg := aws.Config{
Endpoint: &backendConfig.S3.Endpoint,
Region: &backendConfig.S3.Region,
DisableSSL: &backendConfig.S3.DisableSsl,
S3ForcePathStyle: aws.Bool(true),
}
// If AccessKeyId isn't specified, allow AWS to search for credentials.
// In AWS EC2, this search will include the instance IAM Role.
if backendConfig.S3.AccessKeyId != "" {
cfg.Credentials = credentials.NewStaticCredentials(backendConfig.S3.AccessKeyId, backendConfig.S3.SecretAccessKey, "")
}
session := session.New(&cfg)
ctx := context.Background()
bucket, err := s3blob.OpenBucket(ctx, session, backendConfig.S3.Bucket, nil)
if err != nil {
return nil, err
}
implementation = blobstore.NewCloudBlobAccess(bucket, backend.Cloud.KeyPrefix, digestKeyFormat)
default:
return nil, errors.New("Cloud configuration did not contain a backend")
}
case *pb.BlobAccessConfiguration_Error:
backendType = "failing"
implementation = blobstore.NewErrorBlobAccess(status.ErrorProto(backend.Error))
Expand Down Expand Up @@ -157,31 +236,6 @@ func createBlobAccess(config *pb.BlobAccessConfiguration, storageType string, di
case *pb.BlobAccessConfiguration_Remote:
backendType = "remote"
implementation = blobstore.NewRemoteBlobAccess(backend.Remote.Address, storageType)
case *pb.BlobAccessConfiguration_S3:
backendType = "s3"
cfg := aws.Config{
Endpoint: &backend.S3.Endpoint,
Region: &backend.S3.Region,
DisableSSL: &backend.S3.DisableSsl,
S3ForcePathStyle: aws.Bool(true),
}
// If AccessKeyId isn't specified, allow AWS to search for credentials.
// In AWS EC2, this search will include the instance IAM Role.
if backend.S3.AccessKeyId != "" {
cfg.Credentials = credentials.NewStaticCredentials(backend.S3.AccessKeyId, backend.S3.SecretAccessKey, "")
}
session := session.New(&cfg)
s3 := s3.New(session)
// Set the uploader concurrency to 1 to drastically reduce memory usage.
// TODO(edsch): Maybe the concurrency can be left alone for this process?
uploader := s3manager.NewUploader(session)
uploader.Concurrency = 1
implementation = blobstore.NewS3BlobAccess(
s3,
uploader,
&backend.S3.Bucket,
backend.S3.KeyPrefix,
digestKeyFormat)
case *pb.BlobAccessConfiguration_Sharding:
backendType = "sharding"
var backends []blobstore.BlobAccess
Expand Down
Loading

0 comments on commit f0496ad

Please sign in to comment.