From 027ade2b6f1a9ac25e3f3650f46c117948ef3526 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Wed, 10 Feb 2021 14:15:01 +0200 Subject: [PATCH 01/15] Add Azure adapter --- block/azure/adapter.go | 430 ++++++++++++++++++++++++++++++++++++ block/azure/chunkwriting.go | 273 +++++++++++++++++++++++ block/azure/stats.go | 31 +++ block/factory/build.go | 28 +++ block/namespace.go | 5 + block/params/block.go | 6 + config/config.go | 6 + docs/assets/js/swagger.yml | 2 +- go.mod | 10 +- go.sum | 51 +++++ swagger.yml | 2 +- 11 files changed, 841 insertions(+), 3 deletions(-) create mode 100644 block/azure/adapter.go create mode 100644 block/azure/chunkwriting.go create mode 100644 block/azure/stats.go diff --git a/block/azure/adapter.go b/block/azure/adapter.go new file mode 100644 index 00000000000..5a074779306 --- /dev/null +++ b/block/azure/adapter.go @@ -0,0 +1,430 @@ +package azure + +import ( + "bufio" + "context" + "encoding/base64" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "sort" + "strconv" + "time" + + "github.com/Azure/azure-pipeline-go/pipeline" + + "github.com/Azure/azure-storage-blob-go/azblob" + + "github.com/treeverse/lakefs/block" + "github.com/treeverse/lakefs/logging" +) + +var ( + ErrNotImplemented = errors.New("not implemented") +) + +const ( + BlockstoreType = "azure" + sizeSuffix = "_size" + idSuffix = "_id" + _1MiB = 1024 * 1024 + defaultMaxRetryRequests = 20 +) + +type Adapter struct { + ctx context.Context + p pipeline.Pipeline + accountName string + configurations configurations +} + +type configurations struct { + retryReaderOptions azblob.RetryReaderOptions +} + +func NewAdapter(pipeline pipeline.Pipeline, accountName string, opts ...func(a *Adapter)) *Adapter { + a := &Adapter{ + ctx: context.Background(), + accountName: accountName, + p: pipeline, + configurations: configurations{retryReaderOptions: azblob.RetryReaderOptions{MaxRetryRequests: defaultMaxRetryRequests}}, + } + for _, opt := range opts { + opt(a) + } + return a +} + +func (a *Adapter) WithContext(ctx context.Context) block.Adapter { + return &Adapter{ + p: a.p, + accountName: a.accountName, + ctx: ctx, + } +} + +func (a *Adapter) log() logging.Logger { + return logging.FromContext(a.ctx) +} + +func resolveNamespace(obj block.ObjectPointer) (block.QualifiedKey, error) { + qualifiedKey, err := block.ResolveNamespace(obj.StorageNamespace, obj.Identifier) + if err != nil { + return qualifiedKey, err + } + if qualifiedKey.StorageType != block.StorageTypeAzure { + return qualifiedKey, block.ErrInvalidNamespace + } + return qualifiedKey, nil +} + +func resolveNamespacePrefix(lsOpts block.WalkOpts) (block.QualifiedPrefix, error) { + qualifiedPrefix, err := block.ResolveNamespacePrefix(lsOpts.StorageNamespace, lsOpts.Prefix) + if err != nil { + return qualifiedPrefix, err + } + if qualifiedPrefix.StorageType != block.StorageTypeAzure { + return qualifiedPrefix, block.ErrInvalidNamespace + } + return qualifiedPrefix, nil +} + +func (a *Adapter) GenerateInventory(ctx context.Context, logger logging.Logger, inventoryURL string, shouldSort bool, prefixes []string) (block.Inventory, error) { + return nil, fmt.Errorf("inventory %w", ErrNotImplemented) +} + +func (a *Adapter) getContainerURL(containerName string) azblob.ContainerURL { + URL, _ := url.Parse( + fmt.Sprintf("https://%s.blob.core.windows.net/%s", a.accountName, containerName)) + + return azblob.NewContainerURL(*URL, a.p) +} + +func (a *Adapter) getBlobURL(containerName, fileName string) azblob.BlobURL { + containerURL := a.getContainerURL(containerName) + + return containerURL.NewBlobURL(fileName) +} + +func (a *Adapter) getBlockBlobURL(containerName, fileName string) azblob.BlockBlobURL { + containerURL := a.getContainerURL(containerName) + + return containerURL.NewBlockBlobURL(fileName) +} + +func (a *Adapter) getIDURL(containerName, fileName string) azblob.BlockBlobURL { + return a.getBlockBlobURL(containerName, fileName+idSuffix) +} + +func (a *Adapter) getSizeURL(containerName, fileName string) azblob.BlockBlobURL { + return a.getBlockBlobURL(containerName, fileName+sizeSuffix) +} + +func translatePutOpts(opts block.PutOpts) azblob.UploadStreamToBlockBlobOptions { + res := azblob.UploadStreamToBlockBlobOptions{} + if opts.StorageClass != nil { + res.BlobAccessTier = azblob.AccessTierType(*opts.StorageClass) + } + return res +} + +func (a *Adapter) Put(obj block.ObjectPointer, sizeBytes int64, reader io.Reader, opts block.PutOpts) error { + var err error + defer reportMetrics("Put", time.Now(), &sizeBytes, &err) + + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return err + } + blobURL := a.getBlockBlobURL(qualifiedKey.StorageNamespace, qualifiedKey.Key) + + _, err = azblob.UploadStreamToBlockBlob(a.ctx, reader, blobURL, translatePutOpts(opts)) + return err +} + +func (a *Adapter) Get(obj block.ObjectPointer, _ int64) (io.ReadCloser, error) { + var err error + defer reportMetrics("get", time.Now(), nil, &err) + + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return nil, err + } + blobURL := a.getBlobURL(qualifiedKey.StorageNamespace, qualifiedKey.Key) + + keyOptions := azblob.ClientProvidedKeyOptions{} + downloadResponse, err := blobURL.Download(a.ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, keyOptions) + if err != nil { + return nil, err + } + // NOTE: automatically retries are performed if the connection fails + bodyStream := downloadResponse.Body(a.configurations.retryReaderOptions) + return bodyStream, err +} + +func (a *Adapter) Walk(walkOpt block.WalkOpts, walkFn block.WalkFunc) error { + var err error + defer reportMetrics("Walk", time.Now(), nil, &err) + + qualifiedPrefix, err := resolveNamespacePrefix(walkOpt) + if err != nil { + return err + } + + containerURL := a.getContainerURL(qualifiedPrefix.StorageNamespace) + + for marker := (azblob.Marker{}); marker.NotDone(); { + listBlob, err := containerURL.ListBlobsFlatSegment(a.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: qualifiedPrefix.Prefix}) + if err != nil { + return err + } + + marker = listBlob.NextMarker + for _, blobInfo := range listBlob.Segment.BlobItems { + if err := walkFn(blobInfo.Name); err != nil { + return err + } + } + } + return nil +} + +func (a *Adapter) Exists(obj block.ObjectPointer) (bool, error) { + var err error + defer reportMetrics("Exists", time.Now(), nil, &err) + + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return false, err + } + + blobURL := a.getBlobURL(qualifiedKey.StorageNamespace, qualifiedKey.Key) + _, err = blobURL.GetProperties(a.ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) + if err != nil { + if err.(azblob.StorageError).ServiceCode() == azblob.ServiceCodeBlobNotFound { + return false, nil + } + return false, err + } + return true, nil +} + +func (a *Adapter) GetRange(obj block.ObjectPointer, startPosition int64, endPosition int64) (io.ReadCloser, error) { + var err error + defer reportMetrics("GetRange", time.Now(), nil, &err) + + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return nil, err + } + + blobURL := a.getBlobURL(qualifiedKey.StorageNamespace, qualifiedKey.Key) + + downloadResponse, err := blobURL.Download(a.ctx, startPosition, endPosition-startPosition+1, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return nil, err + } + + bodyStream := downloadResponse.Body(a.configurations.retryReaderOptions) + return bodyStream, err +} + +func (a *Adapter) GetProperties(obj block.ObjectPointer) (block.Properties, error) { + var err error + defer reportMetrics("GetProperties", time.Now(), nil, &err) + + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return block.Properties{}, err + } + + blobURL := a.getBlobURL(qualifiedKey.StorageNamespace, qualifiedKey.Key) + props, err := blobURL.GetProperties(a.ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return block.Properties{}, err + } + storageClass := props.AccessTier() + return block.Properties{StorageClass: &storageClass}, nil +} + +func (a *Adapter) Remove(obj block.ObjectPointer) error { + var err error + defer reportMetrics("Remove", time.Now(), nil, &err) + + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return err + } + + containerName := qualifiedKey.StorageNamespace + fileName := qualifiedKey.Key + blobURL := a.getBlobURL(containerName, fileName) + _, err = blobURL.Delete(a.ctx, "", azblob.BlobAccessConditions{}) + return err +} + +func (a *Adapter) Copy(sourceObj, destinationObj block.ObjectPointer) error { + var err error + defer reportMetrics("Copy", time.Now(), nil, &err) + + qualifiedDestinationKey, err := resolveNamespace(destinationObj) + if err != nil { + return err + } + qualifiedSourceKey, err := resolveNamespace(sourceObj) + if err != nil { + return err + } + + sourceURL := a.getBlobURL(qualifiedSourceKey.StorageNamespace, qualifiedSourceKey.Key) + destinationURL := a.getBlobURL(qualifiedDestinationKey.StorageNamespace, qualifiedDestinationKey.Key) + _, err = destinationURL.StartCopyFromURL(a.ctx, sourceURL.URL(), azblob.Metadata{}, azblob.ModifiedAccessConditions{}, azblob.BlobAccessConditions{}, azblob.AccessTierNone, azblob.BlobTagsMap{}) + return err +} + +func (a *Adapter) CreateMultiPartUpload(obj block.ObjectPointer, r *http.Request, opts block.CreateMultiPartUploadOpts) (string, error) { + var err error + defer reportMetrics("CreateMultiPartUpload", time.Now(), nil, &err) + + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return "", err + } + + return qualifiedKey.Key, nil +} + +func (a *Adapter) UploadPart(obj block.ObjectPointer, size int64, reader io.Reader, uploadID string, partNumber int64) (string, error) { + var err error + defer reportMetrics("UploadPart", time.Now(), nil, &err) + + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return "", err + } + + containerName := qualifiedKey.StorageNamespace + objName := qualifiedKey.Key + + blobURL := a.getBlockBlobURL(containerName, objName) + blobIDsURL := a.getIDURL(containerName, objName) + blobSizesURL := a.getSizeURL(containerName, objName) + + return copyFromReader(a.ctx, reader, blobURL, blobIDsURL, blobSizesURL, azblob.UploadStreamToBlockBlobOptions{}) +} + +func (a *Adapter) AbortMultiPartUpload(_ block.ObjectPointer, _ string) error { + // Azure has no abort, in case of commit, uncommitted parts are erased, otherwise staged data is erased after 7 days + return nil +} + +func (a *Adapter) ValidateConfiguration(_ string) error { + return nil +} + +func (a *Adapter) BlockstoreType() string { + return BlockstoreType +} + +func (a *Adapter) CompleteMultiPartUpload(obj block.ObjectPointer, uploadID string, multipartList *block.MultipartUploadCompletion) (*string, int64, error) { + var err error + defer reportMetrics("CompleteMultiPartUpload", time.Now(), nil, &err) + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return nil, 0, err + } + containerName := qualifiedKey.StorageNamespace + _ = a.log().WithFields(logging.Fields{ + "upload_id": uploadID, + "qualified_ns": qualifiedKey.StorageNamespace, + "qualified_key": qualifiedKey.Key, + "key": obj.Identifier, + }) + + parts := multipartList.Part + sort.Slice(parts, func(i, j int) bool { + return *parts[i].PartNumber < *parts[j].PartNumber + }) + // extract staging blockIDs + metaBlockIDs := make([]string, 0) + for _, part := range multipartList.Part { + base64Etag := base64.StdEncoding.EncodeToString([]byte(*part.ETag)) + metaBlockIDs = append(metaBlockIDs, base64Etag) + } + + fileName := qualifiedKey.Key + stageBlockIDs, err := a.getMultipartIDs(containerName, fileName, metaBlockIDs) + + size, err := a.getMultipartSize(containerName, fileName, metaBlockIDs) + + blobURL := a.getBlockBlobURL(containerName, fileName) + + res, err := blobURL.CommitBlockList(a.ctx, stageBlockIDs, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, azblob.AccessTierNone, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return nil, 0, err + } + etag := string(res.ETag()) + etag = etag[1 : len(etag)-2] + // list bucket parts and validate request match + return &etag, int64(size), nil +} + +func (a *Adapter) getMultipartIDs(containerName, fileName string, base64BlockIDs []string) ([]string, error) { + blobURL := a.getIDURL(containerName, fileName) + _, err := blobURL.CommitBlockList(a.ctx, base64BlockIDs, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, azblob.AccessTierNone, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return nil, err + } + + downloadResponse, err := blobURL.Download(a.ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return nil, err + } + bodyStream := downloadResponse.Body(a.configurations.retryReaderOptions) + scanner := bufio.NewScanner(bodyStream) + ids := make([]string, 0) + for scanner.Scan() { + id := scanner.Text() + ids = append(ids, id) + } + + // remove + _, err = blobURL.Delete(a.ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) + if err != nil { + a.log().WithError(err).Warn("Failed to delete multipart ids data file") + } + return ids, nil +} + +func (a *Adapter) getMultipartSize(containerName, fileName string, base64BlockIDs []string) (int, error) { + sizeURL := a.getSizeURL(containerName, fileName) + _, err := sizeURL.CommitBlockList(a.ctx, base64BlockIDs, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, "", azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return 0, err + } + + downloadResponse, err := sizeURL.Download(a.ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return 0, err + } + bodyStream := downloadResponse.Body(a.configurations.retryReaderOptions) + scanner := bufio.NewScanner(bodyStream) + size := 0 + for scanner.Scan() { + s := scanner.Text() + stageSize, err := strconv.Atoi(s) + if err != nil { + return 0, err + } + size += stageSize + } + + // remove + _, err = sizeURL.Delete(a.ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) + if err != nil { + a.log().WithError(err).Warn("Failed to delete multipart size data file") + } + return size, nil +} diff --git a/block/azure/chunkwriting.go b/block/azure/chunkwriting.go new file mode 100644 index 00000000000..4f41969255f --- /dev/null +++ b/block/azure/chunkwriting.go @@ -0,0 +1,273 @@ +package azure + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/binary" + "encoding/hex" + "errors" + "fmt" + "io" + "strconv" + "strings" + "sync" + "sync/atomic" + + "github.com/treeverse/lakefs/block" + + "github.com/Azure/azure-storage-blob-go/azblob" + + guuid "github.com/google/uuid" +) + +// This code is taken from azblob chunkwriting.go +// The reason is that the original code commit the data at the end of the copy +// In order to support multipart upload we need to save the blockIDs instead of committing them +// And once complete multipart is called we commit all the blockIDs + +// blockWriter provides methods to upload blocks that represent a file to a server and commit them. +// This allows us to provide a local implementation that fakes the server for hermetic testing. +type blockWriter interface { + StageBlock(context.Context, string, io.ReadSeeker, azblob.LeaseAccessConditions, []byte, azblob.ClientProvidedKeyOptions) (*azblob.BlockBlobStageBlockResponse, error) + CommitBlockList(context.Context, []string, azblob.BlobHTTPHeaders, azblob.Metadata, azblob.BlobAccessConditions, azblob.AccessTierType, azblob.BlobTagsMap, azblob.ClientProvidedKeyOptions) (*azblob.BlockBlobCommitBlockListResponse, error) +} + +func defaults(u *azblob.UploadStreamToBlockBlobOptions) error { + if u.TransferManager != nil { + return nil + } + + if u.MaxBuffers == 0 { + u.MaxBuffers = 1 + } + + if u.BufferSize < _1MiB { + u.BufferSize = _1MiB + } + + var err error + u.TransferManager, err = azblob.NewStaticBuffer(u.BufferSize, u.MaxBuffers) + if err != nil { + return fmt.Errorf("bug: default transfer manager could not be created: %s", err) + } + return nil +} + +// copyFromReader copies a source io.Reader to blob storage using concurrent uploads. +// TODO(someone): The existing model provides a buffer size and buffer limit as limiting factors. The buffer size is probably +// useless other than needing to be above some number, as the network stack is going to hack up the buffer over some size. The +// max buffers is providing a cap on how much memory we use (by multiplying it times the buffer size) and how many go routines can upload +// at a time. I think having a single max memory dial would be more efficient. We can choose an internal buffer size that works +// well, 4 MiB or 8 MiB, and autoscale to as many goroutines within the memory limit. This gives a single dial to tweak and we can +// choose a max value for the memory setting based on internal transfers within Azure (which will give us the maximum throughput model). +// We can even provide a utility to dial this number in for customer networks to optimize their copies. +func copyFromReader(ctx context.Context, from io.Reader, to blockWriter, toIDs blockWriter, toSizes blockWriter, o azblob.UploadStreamToBlockBlobOptions) (string, error) { + if err := defaults(&o); err != nil { + return "", err + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + cp := &copier{ + ctx: ctx, + cancel: cancel, + reader: block.NewHashingReader(from, block.HashFunctionMD5), + to: to, + toIDs: toIDs, + toSizes: toSizes, + id: newID(), + o: o, + errCh: make(chan error, 1), + } + + // Send all our chunks until we get an error. + var err error + for { + if err = cp.sendChunk(); err != nil { + break + } + } + // If the error is not EOF, then we have a problem. + if err != nil && !errors.Is(err, io.EOF) { + return "", err + } + + // Close out our upload. + if err := cp.close(); err != nil { + return "", err + } + + // This part was added + // Instead of committing in close ( what is originally done in azblob/chunkwriting.go) + // we stage The blockIDs and size of this copy operation in the relevant blockBlobURLs ( cp.toIDs, cp.toSizes) + // Later on, in complete multipart upload we commit the file blockBlobURLs with etags as the blockIDs + // Then by reading the files we get the relevant blockIDs and sizes + etag := "\"" + hex.EncodeToString(cp.reader.Md5.Sum(nil)) + "\"" + base64Etag := base64.StdEncoding.EncodeToString([]byte(etag)) + + // write to blockIDs + pd := strings.Join(cp.id.issued(), "\n") + "\n" + _, err = cp.toIDs.StageBlock(cp.ctx, base64Etag, strings.NewReader(pd), cp.o.AccessConditions.LeaseAccessConditions, nil, cp.o.ClientProvidedKeyOptions) + if err != nil { + return "", fmt.Errorf("failed staging part data: %w", err) + } + // write block sizes + sd := strconv.Itoa(int(cp.reader.CopiedSize)) + "\n" + _, err = cp.toSizes.StageBlock(cp.ctx, base64Etag, strings.NewReader(sd), cp.o.AccessConditions.LeaseAccessConditions, nil, cp.o.ClientProvidedKeyOptions) + if err != nil { + return "", fmt.Errorf("failed staging part data: %w", err) + } + return etag, nil +} + +// copier streams a file via chunks in parallel from a reader representing a file. +// Do not use directly, instead use copyFromReader(). +type copier struct { + // ctx holds the context of a copier. This is normally a faux pas to store a Context in a struct. In this case, + // the copier has the lifetime of a function call, so its fine. + ctx context.Context + cancel context.CancelFunc + + // o contains our options for uploading. + o azblob.UploadStreamToBlockBlobOptions + + // id provides the ids for each chunk. + id *id + + // reader is the source to be written to storage. + reader *block.HashingReader + // to is the location we are writing our chunks to. + to blockWriter + toIDs blockWriter + toSizes blockWriter + + // errCh is used to hold the first error from our concurrent writers. + errCh chan error + // wg provides a count of how many writers we are waiting to finish. + wg sync.WaitGroup + + // result holds the final result from blob storage after we have submitted all chunks. + result *azblob.BlockBlobCommitBlockListResponse +} + +type copierChunk struct { + buffer []byte + id string +} + +// getErr returns an error by priority. First, if a function set an error, it returns that error. Next, if the Context has an error +// it returns that error. Otherwise it is nil. getErr supports only returning an error once per copier. +func (c *copier) getErr() error { + select { + case err := <-c.errCh: + return err + default: + } + return c.ctx.Err() +} + +// sendChunk reads data from out internal reader, creates a chunk, and sends it to be written via a channel. +// sendChunk returns io.EOF when the reader returns an io.EOF or io.ErrUnexpectedEOF. +func (c *copier) sendChunk() error { + if err := c.getErr(); err != nil { + return err + } + + buffer := c.o.TransferManager.Get() + if len(buffer) == 0 { + return errors.New("TransferManager returned a 0 size buffer, this is a bug in the manager") + } + n, err := io.ReadFull(c.reader, buffer) + switch { + case err == nil && n == 0: + return nil + case err == nil: + id := c.id.next() + c.wg.Add(1) + c.o.TransferManager.Run( + func() { + defer c.wg.Done() + c.write(copierChunk{buffer: buffer[0:n], id: id}) + }, + ) + return nil + case err != nil && (err == io.EOF || err == io.ErrUnexpectedEOF) && n == 0: + return io.EOF + } + + if err == io.EOF || err == io.ErrUnexpectedEOF { + id := c.id.next() + c.wg.Add(1) + c.o.TransferManager.Run( + func() { + defer c.wg.Done() + c.write(copierChunk{buffer: buffer[0:n], id: id}) + }, + ) + return io.EOF + } + if err := c.getErr(); err != nil { + return err + } + return err +} + +// write uploads a chunk to blob storage. +func (c *copier) write(chunk copierChunk) { + defer c.o.TransferManager.Put(chunk.buffer) + + if err := c.ctx.Err(); err != nil { + return + } + _, err := c.to.StageBlock(c.ctx, chunk.id, bytes.NewReader(chunk.buffer), c.o.AccessConditions.LeaseAccessConditions, nil, c.o.ClientProvidedKeyOptions) + if err != nil { + c.errCh <- fmt.Errorf("write error: %w", err) + return + } + return +} + +// close commits our blocks to blob storage and closes our writer. +func (c *copier) close() error { + c.wg.Wait() + + if err := c.getErr(); err != nil { + return err + } + + var err error + return err +} + +// id allows the creation of unique IDs based on UUID4 + an int32. This auto-increments. +type id struct { + u [64]byte + num uint32 + all []string +} + +// newID constructs a new id. +func newID() *id { + uu := guuid.New() + u := [64]byte{} + copy(u[:], uu[:]) + return &id{u: u} +} + +// next returns the next ID. +func (id *id) next() string { + defer atomic.AddUint32(&id.num, 1) + + binary.BigEndian.PutUint32((id.u[len(guuid.UUID{}):]), atomic.LoadUint32(&id.num)) + str := base64.StdEncoding.EncodeToString(id.u[:]) + id.all = append(id.all, str) + + return str +} + +// issued returns all ids that have been issued. This returned value shares the internal slice so it is not safe to modify the return. +// The value is only valid until the next time next() is called. +func (id *id) issued() []string { + return id.all +} diff --git a/block/azure/stats.go b/block/azure/stats.go new file mode 100644 index 00000000000..448541573ab --- /dev/null +++ b/block/azure/stats.go @@ -0,0 +1,31 @@ +package azure + +import ( + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var durationHistograms = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "azure_operation_duration_seconds", + Help: "durations of outgoing azure operations", + }, + []string{"operation", "error"}) + +var requestSizeHistograms = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "azure_operation_size_bytes", + Help: "handled sizes of outgoing azure operations", + Buckets: prometheus.ExponentialBuckets(1, 10, 10), + }, []string{"operation", "error"}) + +func reportMetrics(operation string, start time.Time, sizeBytes *int64, err *error) { + isErrStr := strconv.FormatBool(*err != nil) + durationHistograms.WithLabelValues(operation, isErrStr).Observe(time.Since(start).Seconds()) + if sizeBytes != nil { + requestSizeHistograms.WithLabelValues(operation, isErrStr).Observe(float64(*sizeBytes)) + } +} diff --git a/block/factory/build.go b/block/factory/build.go index 538fc7b8ee7..193819b226a 100644 --- a/block/factory/build.go +++ b/block/factory/build.go @@ -5,6 +5,10 @@ import ( "errors" "fmt" + "github.com/Azure/azure-storage-blob-go/azblob" + + "github.com/treeverse/lakefs/block/azure" + "cloud.google.com/go/storage" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" @@ -54,6 +58,12 @@ func BuildBlockAdapter(c params.AdapterConfig) (block.Adapter, error) { return nil, err } return buildGSAdapter(p) + case azure.BlockstoreType: + p, err := c.GetBlockAdapterAzureParams() + if err != nil { + return nil, err + } + return buildAzureAdapter(p) default: return nil, fmt.Errorf("%w '%s' please choose one of %s", ErrInvalidBlockStoreType, blockstore, []string{local.BlockstoreType, s3a.BlockstoreType, mem.BlockstoreType, transient.BlockstoreType, gs.BlockstoreType}) @@ -108,3 +118,21 @@ func buildGSAdapter(params params.GS) (*gs.Adapter, error) { log.WithField("type", "gs").Info("initialized blockstore adapter") return adapter, nil } + +func buildAzureAdapter(params params.Azure) (*azure.Adapter, error) { + accountName := params.StorageAccount + accountKey := params.StorageAccessKey // TODO(Guys): check if I need to try and get from other places such as env var "AZURE_STORAGE_ACCOUNT" "AZURE_STORAGE_ACCESS_KEY" or any file like .aws/credentials style + if len(accountName) == 0 || len(accountKey) == 0 { + return nil, fmt.Errorf("Either the AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set") + } + + // Create a default request pipeline using your storage account name and account key. + credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) + if err != nil { + return nil, fmt.Errorf("Invalid credentials with error: " + err.Error()) + } + p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) + a := azure.NewAdapter(p, accountName) + + return a, nil +} diff --git a/block/namespace.go b/block/namespace.go index 5f9614e3ba9..fac9c85a72f 100644 --- a/block/namespace.go +++ b/block/namespace.go @@ -15,6 +15,7 @@ const ( StorageTypeLocal StorageTypeS3 StorageTypeGS + StorageTypeAzure ) var ( @@ -44,6 +45,8 @@ func (qk QualifiedKey) Format() string { scheme = "gs" case StorageTypeS3: scheme = "s3" + case StorageTypeAzure: + scheme = "azure" default: panic("unknown storage type") } @@ -62,6 +65,8 @@ func GetStorageType(namespaceURL *url.URL) (StorageType, error) { return StorageTypeLocal, nil case "gs": return StorageTypeGS, nil + case "azure": + return StorageTypeAzure, nil default: return st, fmt.Errorf("%s: %w", namespaceURL.Scheme, ErrInvalidNamespace) } diff --git a/block/params/block.go b/block/params/block.go index ecb3e9f3c05..64aed4fa4ac 100644 --- a/block/params/block.go +++ b/block/params/block.go @@ -12,6 +12,7 @@ type AdapterConfig interface { GetBlockAdapterLocalParams() (Local, error) GetBlockAdapterS3Params() (S3, error) GetBlockAdapterGSParams() (GS, error) + GetBlockAdapterAzureParams() (Azure, error) } type Mem struct{} @@ -30,3 +31,8 @@ type GS struct { CredentialsFile string CredentialsJSON string } + +type Azure struct { + StorageAccount string + StorageAccessKey string +} diff --git a/config/config.go b/config/config.go index 6ef2dc1c48d..880b0ac6e4e 100644 --- a/config/config.go +++ b/config/config.go @@ -316,6 +316,12 @@ func (c *Config) GetBlockAdapterGSParams() (blockparams.GS, error) { CredentialsJSON: viper.GetString("blockstore.gs.credentials_json"), }, nil } +func (c *Config) GetBlockAdapterAzureParams() (blockparams.Azure, error) { + return blockparams.Azure{ + StorageAccount: viper.GetString("blockstore.azure.storage_account"), + StorageAccessKey: viper.GetString("blockstore.azure.storage_access_key"), + }, nil +} func (c *Config) GetAuthCacheConfig() authparams.ServiceCache { return authparams.ServiceCache{ diff --git a/docs/assets/js/swagger.yml b/docs/assets/js/swagger.yml index 4286cc993d7..eebb8634145 100644 --- a/docs/assets/js/swagger.yml +++ b/docs/assets/js/swagger.yml @@ -96,7 +96,7 @@ definitions: type: string description: "Filesystem URI to store the underlying data in (e.g. 's3://my-bucket/some/path/')" example: "s3://example-bucket/" - pattern: '^(s3|gs|mem|local|transient)://.*$' + pattern: '^(s3|gs|azure|mem|local|transient)://.*$' default_branch: example: "master" type: string diff --git a/go.mod b/go.mod index f66e160519b..3ceb45e2332 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,12 @@ go 1.15 require ( cloud.google.com/go v0.74.0 cloud.google.com/go/storage v1.12.0 + github.com/Azure/azure-pipeline-go v0.2.3 + github.com/Azure/azure-sdk-for-go v51.0.0+incompatible // indirect + github.com/Azure/azure-storage-blob-go v0.13.0 + github.com/Azure/go-autorest/autorest/azure/auth v0.5.6 // indirect + github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect + github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect github.com/Masterminds/squirrel v1.5.0 github.com/Microsoft/go-winio v0.4.16 // indirect github.com/apache/thrift v0.13.0 @@ -80,10 +86,12 @@ require ( github.com/xitongsys/parquet-go-source v0.0.0-20200805105948-52b27ba08556 golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad golang.org/x/exp v0.0.0-20201229011636-eab1b5eb1a03 // indirect + golang.org/x/mod v0.4.1 // indirect golang.org/x/net v0.0.0-20201224014010-6772e930b67b golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5 - golang.org/x/sys v0.0.0-20210113181707-4bcb84eeeb78 // indirect + golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 + golang.org/x/tools v0.1.0 // indirect gonum.org/v1/netlib v0.0.0-20200603212716-16abd5ac5bc7 // indirect google.golang.org/api v0.36.0 google.golang.org/protobuf v1.25.0 diff --git a/go.sum b/go.sum index 8fd676f74e4..78939a84cf1 100644 --- a/go.sum +++ b/go.sum @@ -49,8 +49,39 @@ cloud.google.com/go/storage v1.12.0/go.mod h1:fFLk2dp2oAhDz8QFKwqrjdJvxSp/W2g7ni dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= +github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= +github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= +github.com/Azure/azure-sdk-for-go v0.2.0-beta h1:wYBqYNMWr0WL2lcEZi+dlK9n+N0wJ0Pjs4BKeOnDjfQ= +github.com/Azure/azure-sdk-for-go v51.0.0+incompatible h1:p7blnyJSjJqf5jflHbSGhIhEpXIgIFmYZNg5uwqweso= +github.com/Azure/azure-sdk-for-go v51.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/azure-storage-blob-go v0.13.0 h1:lgWHvFh+UYBNVQLFHXkvul2f6yOPA9PIH82RTG2cSwc= +github.com/Azure/azure-storage-blob-go v0.13.0/go.mod h1:pA9kNqtjUeQF2zOSu4s//nUdBD+e64lEuc4sVnuOfNs= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= +github.com/Azure/go-autorest v1.1.1 h1:4G9tVCqooRY3vDTB2bA1Z01PlSALtnUbji0AfzthUSs= +github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= +github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= +github.com/Azure/go-autorest/autorest v0.11.17 h1:2zCdHwNgRH+St1J+ZMf66xI8aLr/5KMy+wWLH97zwYM= +github.com/Azure/go-autorest/autorest v0.11.17/go.mod h1:eipySxLmqSyC5s5k1CLupqet0PSENBEDP93LQ9a8QYw= +github.com/Azure/go-autorest/autorest/adal v0.9.2/go.mod h1:/3SMAM86bP6wC9Ev35peQDUeqFZBMH07vvUOmg4z/fE= +github.com/Azure/go-autorest/autorest/adal v0.9.5/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A= +github.com/Azure/go-autorest/autorest/adal v0.9.10 h1:r6fZHMaHD8B6LDCn0o5vyBFHIHrM6Ywwx7mb49lPItI= +github.com/Azure/go-autorest/autorest/adal v0.9.10/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A= +github.com/Azure/go-autorest/autorest/azure/auth v0.5.6 h1:cgiBtUxatlt/e3qY6fQJioqbocWHr5osz259MomF5M0= +github.com/Azure/go-autorest/autorest/azure/auth v0.5.6/go.mod h1:nYlP+G+n8MhD5CjIi6W8nFTIJn/PnTHes5nUbK6BxD0= +github.com/Azure/go-autorest/autorest/azure/cli v0.4.2 h1:dMOmEJfkLKW/7JsokJqkyoYSgmR08hi9KrhjZb+JALY= +github.com/Azure/go-autorest/autorest/azure/cli v0.4.2/go.mod h1:7qkJkT+j6b+hIpzMOwPChJhTqS8VbsqqgULzMNRugoM= +github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw= +github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74= +github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= +github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk= +github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE= +github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac= +github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E= +github.com/Azure/go-autorest/logger v0.2.0 h1:e4RVHVZKC5p6UANLJHkM4OfR1UKZPj8Wt8Pcx+3oqrE= +github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= +github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= +github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= @@ -259,6 +290,9 @@ github.com/dgryski/go-lttb v0.0.0-20180810165845-318fcdf10a77/go.mod h1:Va5MyIzk github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dhui/dktest v0.3.3 h1:DBuH/9GFaWbDRa42qsut/hbQu+srAQ0rPWnUoiGX7CA= github.com/dhui/dktest v0.3.3/go.mod h1:EML9sP4sqJELHn4jV7B0TY8oF6077nk83/tz7M56jcQ= +github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8= +github.com/dimchansky/utfbom v1.1.1 h1:vV6w1AhK4VMnhBno/TPVCoK9U/LP0PkLCS9tbxHdi/U= +github.com/dimchansky/utfbom v1.1.1/go.mod h1:SxdoEBH5qIqFocHMyGOXVAybYJdr71b1Q/j0mACtrfE= github.com/dlmiddlecote/sqlstats v1.0.1 h1:IMOMNS5sZAi84198EAxxA27BVZ2i2h1ylvk+8p9nK7o= github.com/dlmiddlecote/sqlstats v1.0.1/go.mod h1:wnid52FfRm1P/Z/81xQ4pd8ayRzL9o7UWkyCNegbAQg= github.com/docker/distribution v2.7.1+incompatible h1:a5mlkVzth6W5A4fOsS3D2EO5BUmsJpcB+cRlLU7cSug= @@ -297,6 +331,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= +github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= +github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= @@ -312,6 +348,7 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s= github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM= github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= +github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8 h1:DujepqpGd1hyOd7aW59XpK7Qymp8iy83xq74fLr21is= github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= github.com/go-critic/go-critic v0.5.2 h1:3RJdgf6u4NZUumoP8nzbqiiNT8e1tC2Oc7jlgqre/IA= @@ -957,6 +994,8 @@ github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+v github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqfI= +github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= @@ -1180,6 +1219,7 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 h1:GHRpF1pTW19a8tTFrMLUcfWwyC0pnifVo2ClaLq+hP8= github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5PCi+MFsC7HjREoAz1BU+Mq60+05gifQSsHSDG/8= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/schollz/progressbar/v3 v3.7.2 h1:0mjLacO6y9vSdcopQ8TEK8AsIWFJXTVU9eJDkoR/MkE= github.com/schollz/progressbar/v3 v3.7.2/go.mod h1:CG/f0JmacksUc6TkZToO7tVq4t03zIQSQUtTd7F9GR4= @@ -1419,6 +1459,7 @@ golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899 h1:DZhuSZLsGlFL4CmhA8BcRA0mnthyA/nZ00AqCUo7vHg= golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9 h1:umElSU9WZirRdgu2yFHY0ayQkEnKiOC1TtM3fWXFnoU= golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY= @@ -1473,6 +1514,8 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.1-0.20200828183125-ce943fd02449/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0 h1:8pl+sMODzuvGJkmj2W4kZihvVb5mKm8pB/X44PIQHv8= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.1 h1:Kvvh58BN8Y9/lBi7hTekvtMpm07eUZ0ck5pRHpsMWrY= +golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1502,6 +1545,7 @@ golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191003171128-d98b1b443823/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -1589,6 +1633,7 @@ golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1629,6 +1674,10 @@ golang.org/x/sys v0.0.0-20201218084310-7d0127a74742 h1:+CBz4km/0KPU3RGTwARGh/noP golang.org/x/sys v0.0.0-20201218084310-7d0127a74742/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210113181707-4bcb84eeeb78 h1:nVuTkr9L6Bq62qpUqKo/RnZCFfzDBL0bYo6w9OJUqZY= golang.org/x/sys v0.0.0-20210113181707-4bcb84eeeb78/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k= +golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c h1:VwygUrnw9jn88c4u8GD3rZQbqrP/tgas88tPUbBxQrk= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -1759,6 +1808,8 @@ golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= +golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/swagger.yml b/swagger.yml index 89b497dd7d0..e35fea937cf 100644 --- a/swagger.yml +++ b/swagger.yml @@ -96,7 +96,7 @@ definitions: type: string description: "Filesystem URI to store the underlying data in (e.g. 's3://my-bucket/some/path/')" example: "s3://example-bucket/" - pattern: '^(s3|gs|mem|local|transient)://.*$' + pattern: '^(s3|gs|azure|mem|local|transient)://.*$' default_branch: example: "master" type: string From 5d9802700f52ab02c457173bf240b5269566d215 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Thu, 11 Feb 2021 07:40:44 +0200 Subject: [PATCH 02/15] Fix failing downloads --- block/factory/build.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/block/factory/build.go b/block/factory/build.go index 193819b226a..260b9e84c5a 100644 --- a/block/factory/build.go +++ b/block/factory/build.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/Azure/azure-storage-blob-go/azblob" @@ -66,7 +67,7 @@ func BuildBlockAdapter(c params.AdapterConfig) (block.Adapter, error) { return buildAzureAdapter(p) default: return nil, fmt.Errorf("%w '%s' please choose one of %s", - ErrInvalidBlockStoreType, blockstore, []string{local.BlockstoreType, s3a.BlockstoreType, mem.BlockstoreType, transient.BlockstoreType, gs.BlockstoreType}) + ErrInvalidBlockStoreType, blockstore, []string{local.BlockstoreType, s3a.BlockstoreType, azure.BlockstoreType, mem.BlockstoreType, transient.BlockstoreType, gs.BlockstoreType}) } } @@ -131,7 +132,7 @@ func buildAzureAdapter(params params.Azure) (*azure.Adapter, error) { if err != nil { return nil, fmt.Errorf("Invalid credentials with error: " + err.Error()) } - p := azblob.NewPipeline(credential, azblob.PipelineOptions{}) + p := azblob.NewPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: 10 * time.Minute}}) a := azure.NewAdapter(p, accountName) return a, nil From fc8176c273bd839173b6fd3a094744e70e897788 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Thu, 11 Feb 2021 07:41:46 +0200 Subject: [PATCH 03/15] Change namespace from azure to wasb --- block/namespace.go | 4 ++-- docs/assets/js/swagger.yml | 2 +- swagger.yml | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/block/namespace.go b/block/namespace.go index fac9c85a72f..428785949a6 100644 --- a/block/namespace.go +++ b/block/namespace.go @@ -46,7 +46,7 @@ func (qk QualifiedKey) Format() string { case StorageTypeS3: scheme = "s3" case StorageTypeAzure: - scheme = "azure" + scheme = "wasb" default: panic("unknown storage type") } @@ -65,7 +65,7 @@ func GetStorageType(namespaceURL *url.URL) (StorageType, error) { return StorageTypeLocal, nil case "gs": return StorageTypeGS, nil - case "azure": + case "wasb": return StorageTypeAzure, nil default: return st, fmt.Errorf("%s: %w", namespaceURL.Scheme, ErrInvalidNamespace) diff --git a/docs/assets/js/swagger.yml b/docs/assets/js/swagger.yml index eebb8634145..5230b1e5169 100644 --- a/docs/assets/js/swagger.yml +++ b/docs/assets/js/swagger.yml @@ -96,7 +96,7 @@ definitions: type: string description: "Filesystem URI to store the underlying data in (e.g. 's3://my-bucket/some/path/')" example: "s3://example-bucket/" - pattern: '^(s3|gs|azure|mem|local|transient)://.*$' + pattern: '^(s3|gs|wasb|mem|local|transient)://.*$' default_branch: example: "master" type: string diff --git a/swagger.yml b/swagger.yml index c30bc2fbb56..5c9eebcb9b5 100644 --- a/swagger.yml +++ b/swagger.yml @@ -95,7 +95,7 @@ definitions: type: string description: "Filesystem URI to store the underlying data in (e.g. 's3://my-bucket/some/path/')" example: "s3://example-bucket/" - pattern: '^(s3|gs|azure|mem|local|transient)://.*$' + pattern: '^(s3|gs|wasb|mem|local|transient)://.*$' default_branch: example: "master" type: string From b622cc7237fceede283406c1014536572edbc302 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Thu, 11 Feb 2021 08:02:50 +0200 Subject: [PATCH 04/15] Add missing methods --- block/azure/adapter.go | 119 ++++++++++++++++++++++++++++++++--------- 1 file changed, 93 insertions(+), 26 deletions(-) diff --git a/block/azure/adapter.go b/block/azure/adapter.go index 5a074779306..1d7a7b13a8e 100644 --- a/block/azure/adapter.go +++ b/block/azure/adapter.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "encoding/base64" + "encoding/hex" "errors" "fmt" "io" @@ -11,8 +12,11 @@ import ( "net/url" "sort" "strconv" + "strings" "time" + guuid "github.com/google/uuid" + "github.com/Azure/azure-pipeline-go/pipeline" "github.com/Azure/azure-storage-blob-go/azblob" @@ -26,11 +30,11 @@ var ( ) const ( - BlockstoreType = "azure" + BlockstoreType = "wasb" sizeSuffix = "_size" idSuffix = "_id" _1MiB = 1024 * 1024 - defaultMaxRetryRequests = 20 + defaultMaxRetryRequests = 0 ) type Adapter struct { @@ -132,7 +136,7 @@ func translatePutOpts(opts block.PutOpts) azblob.UploadStreamToBlockBlobOptions func (a *Adapter) Put(obj block.ObjectPointer, sizeBytes int64, reader io.Reader, opts block.PutOpts) error { var err error - defer reportMetrics("Put", time.Now(), &sizeBytes, &err) + //defer reportMetrics("Put", time.Now(), &sizeBytes, &err) qualifiedKey, err := resolveNamespace(obj) if err != nil { @@ -146,8 +150,19 @@ func (a *Adapter) Put(obj block.ObjectPointer, sizeBytes int64, reader io.Reader func (a *Adapter) Get(obj block.ObjectPointer, _ int64) (io.ReadCloser, error) { var err error - defer reportMetrics("get", time.Now(), nil, &err) + defer reportMetrics("Get", time.Now(), nil, &err) + + return a.Download(obj, 0, azblob.CountToEnd) +} + +func (a *Adapter) GetRange(obj block.ObjectPointer, startPosition int64, endPosition int64) (io.ReadCloser, error) { + var err error + defer reportMetrics("GetRange", time.Now(), nil, &err) + + return a.Download(obj, startPosition, endPosition-startPosition+1) +} +func (a *Adapter) Download(obj block.ObjectPointer, offset, count int64) (io.ReadCloser, error) { qualifiedKey, err := resolveNamespace(obj) if err != nil { return nil, err @@ -155,13 +170,14 @@ func (a *Adapter) Get(obj block.ObjectPointer, _ int64) (io.ReadCloser, error) { blobURL := a.getBlobURL(qualifiedKey.StorageNamespace, qualifiedKey.Key) keyOptions := azblob.ClientProvidedKeyOptions{} - downloadResponse, err := blobURL.Download(a.ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, keyOptions) + downloadResponse, err := blobURL.Download(a.ctx, offset, count, azblob.BlobAccessConditions{}, false, keyOptions) + if err != nil { return nil, err } // NOTE: automatically retries are performed if the connection fails bodyStream := downloadResponse.Body(a.configurations.retryReaderOptions) - return bodyStream, err + return bodyStream, nil } func (a *Adapter) Walk(walkOpt block.WalkOpts, walkFn block.WalkFunc) error { @@ -211,26 +227,6 @@ func (a *Adapter) Exists(obj block.ObjectPointer) (bool, error) { return true, nil } -func (a *Adapter) GetRange(obj block.ObjectPointer, startPosition int64, endPosition int64) (io.ReadCloser, error) { - var err error - defer reportMetrics("GetRange", time.Now(), nil, &err) - - qualifiedKey, err := resolveNamespace(obj) - if err != nil { - return nil, err - } - - blobURL := a.getBlobURL(qualifiedKey.StorageNamespace, qualifiedKey.Key) - - downloadResponse, err := blobURL.Download(a.ctx, startPosition, endPosition-startPosition+1, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) - if err != nil { - return nil, err - } - - bodyStream := downloadResponse.Body(a.configurations.retryReaderOptions) - return bodyStream, err -} - func (a *Adapter) GetProperties(obj block.ObjectPointer) (block.Properties, error) { var err error defer reportMetrics("GetProperties", time.Now(), nil, &err) @@ -315,6 +311,77 @@ func (a *Adapter) UploadPart(obj block.ObjectPointer, size int64, reader io.Read return copyFromReader(a.ctx, reader, blobURL, blobIDsURL, blobSizesURL, azblob.UploadStreamToBlockBlobOptions{}) } +func (a *Adapter) UploadCopyPart(sourceObj, destinationObj block.ObjectPointer, uploadID string, partNumber int64) (string, error) { + var err error + defer reportMetrics("UploadPart", time.Now(), nil, &err) + + return a.copyPartRange(sourceObj, destinationObj, 0, azblob.CountToEnd) +} + +func (a *Adapter) UploadCopyPartRange(sourceObj, destinationObj block.ObjectPointer, uploadID string, partNumber, startPosition, endPosition int64) (string, error) { + var err error + defer reportMetrics("UploadPart", time.Now(), nil, &err) + + return a.copyPartRange(sourceObj, destinationObj, startPosition, endPosition-startPosition+1) +} + +func generateRandomBlockID() string { + uu := guuid.New() + u := [64]byte{} + copy(u[:], uu[:]) + return base64.StdEncoding.EncodeToString(u[:]) +} + +func (a *Adapter) copyPartRange(sourceObj, destinationObj block.ObjectPointer, startPosition, count int64) (string, error) { + qualifiedSourceKey, err := resolveNamespace(sourceObj) + if err != nil { + return "", err + } + + qualifiedDestinationKey, err := resolveNamespace(destinationObj) + if err != nil { + return "", err + } + + containerName := qualifiedDestinationKey.StorageNamespace + objName := qualifiedDestinationKey.Key + + blobURL := a.getBlockBlobURL(containerName, objName) + + sourceBlobURL := a.getBlockBlobURL(qualifiedSourceKey.StorageNamespace, qualifiedSourceKey.Key) + + base64BlockID := generateRandomBlockID() + _, err = blobURL.StageBlockFromURL(a.ctx, base64BlockID, sourceBlobURL.URL(), startPosition, count, azblob.LeaseAccessConditions{}, azblob.ModifiedAccessConditions{}, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return "", err + } + + // add size and id to etag + response, err := sourceBlobURL.GetProperties(a.ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return "", err + } + etag := "\"" + hex.EncodeToString(response.ContentMD5()) + "\"" + size := response.ContentLength() + base64Etag := base64.StdEncoding.EncodeToString([]byte(etag)) + // stage id data + blobIDsURL := a.getIDURL(containerName, objName) + _, err = blobIDsURL.StageBlock(a.ctx, base64Etag, strings.NewReader(base64BlockID+"\n"), azblob.LeaseAccessConditions{}, nil, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return "", fmt.Errorf("failed staging part data: %w", err) + } + + // stage size data + sizeData := strconv.Itoa(int(size)) + "\n" + blobSizesURL := a.getSizeURL(containerName, objName) + _, err = blobSizesURL.StageBlock(a.ctx, base64Etag, strings.NewReader(sizeData), azblob.LeaseAccessConditions{}, nil, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return "", fmt.Errorf("failed staging part data: %w", err) + } + + return etag, nil +} + func (a *Adapter) AbortMultiPartUpload(_ block.ObjectPointer, _ string) error { // Azure has no abort, in case of commit, uncommitted parts are erased, otherwise staged data is erased after 7 days return nil From 24f0b2de9d2c10747ee9805104d155b12a5d9373 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Thu, 11 Feb 2021 09:51:42 +0200 Subject: [PATCH 05/15] Minimum changes in copied file --- block/azure/adapter.go | 9 +++- block/azure/chunkwriting.go | 62 ++++++++------------------- block/azure/multipart_block_writer.go | 57 ++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 45 deletions(-) create mode 100644 block/azure/multipart_block_writer.go diff --git a/block/azure/adapter.go b/block/azure/adapter.go index 1d7a7b13a8e..db8b7ae4895 100644 --- a/block/azure/adapter.go +++ b/block/azure/adapter.go @@ -308,7 +308,14 @@ func (a *Adapter) UploadPart(obj block.ObjectPointer, size int64, reader io.Read blobIDsURL := a.getIDURL(containerName, objName) blobSizesURL := a.getSizeURL(containerName, objName) - return copyFromReader(a.ctx, reader, blobURL, blobIDsURL, blobSizesURL, azblob.UploadStreamToBlockBlobOptions{}) + hashReader := block.NewHashingReader(reader, block.HashFunctionMD5) + + multipartBlockWriter := NewMultipartBlockWriter(hashReader, blobURL, blobIDsURL, blobSizesURL) + _, err = copyFromReader(a.ctx, hashReader, multipartBlockWriter, azblob.UploadStreamToBlockBlobOptions{}) + if err != nil { + return "", err + } + return multipartBlockWriter.etag, nil } func (a *Adapter) UploadCopyPart(sourceObj, destinationObj block.ObjectPointer, uploadID string, partNumber int64) (string, error) { diff --git a/block/azure/chunkwriting.go b/block/azure/chunkwriting.go index 4f41969255f..0cb7bc4672b 100644 --- a/block/azure/chunkwriting.go +++ b/block/azure/chunkwriting.go @@ -5,17 +5,12 @@ import ( "context" "encoding/base64" "encoding/binary" - "encoding/hex" "errors" "fmt" "io" - "strconv" - "strings" "sync" "sync/atomic" - "github.com/treeverse/lakefs/block" - "github.com/Azure/azure-storage-blob-go/azblob" guuid "github.com/google/uuid" @@ -62,23 +57,22 @@ func defaults(u *azblob.UploadStreamToBlockBlobOptions) error { // well, 4 MiB or 8 MiB, and autoscale to as many goroutines within the memory limit. This gives a single dial to tweak and we can // choose a max value for the memory setting based on internal transfers within Azure (which will give us the maximum throughput model). // We can even provide a utility to dial this number in for customer networks to optimize their copies. -func copyFromReader(ctx context.Context, from io.Reader, to blockWriter, toIDs blockWriter, toSizes blockWriter, o azblob.UploadStreamToBlockBlobOptions) (string, error) { +func copyFromReader(ctx context.Context, from io.Reader, to blockWriter, o azblob.UploadStreamToBlockBlobOptions) (*azblob.BlockBlobCommitBlockListResponse, error) { if err := defaults(&o); err != nil { - return "", err + return nil, err } ctx, cancel := context.WithCancel(ctx) defer cancel() + cp := &copier{ - ctx: ctx, - cancel: cancel, - reader: block.NewHashingReader(from, block.HashFunctionMD5), - to: to, - toIDs: toIDs, - toSizes: toSizes, - id: newID(), - o: o, - errCh: make(chan error, 1), + ctx: ctx, + cancel: cancel, + reader: from, + to: to, + id: newID(), + o: o, + errCh: make(chan error, 1), } // Send all our chunks until we get an error. @@ -90,35 +84,15 @@ func copyFromReader(ctx context.Context, from io.Reader, to blockWriter, toIDs b } // If the error is not EOF, then we have a problem. if err != nil && !errors.Is(err, io.EOF) { - return "", err + return nil, err } // Close out our upload. if err := cp.close(); err != nil { - return "", err + return nil, err } - // This part was added - // Instead of committing in close ( what is originally done in azblob/chunkwriting.go) - // we stage The blockIDs and size of this copy operation in the relevant blockBlobURLs ( cp.toIDs, cp.toSizes) - // Later on, in complete multipart upload we commit the file blockBlobURLs with etags as the blockIDs - // Then by reading the files we get the relevant blockIDs and sizes - etag := "\"" + hex.EncodeToString(cp.reader.Md5.Sum(nil)) + "\"" - base64Etag := base64.StdEncoding.EncodeToString([]byte(etag)) - - // write to blockIDs - pd := strings.Join(cp.id.issued(), "\n") + "\n" - _, err = cp.toIDs.StageBlock(cp.ctx, base64Etag, strings.NewReader(pd), cp.o.AccessConditions.LeaseAccessConditions, nil, cp.o.ClientProvidedKeyOptions) - if err != nil { - return "", fmt.Errorf("failed staging part data: %w", err) - } - // write block sizes - sd := strconv.Itoa(int(cp.reader.CopiedSize)) + "\n" - _, err = cp.toSizes.StageBlock(cp.ctx, base64Etag, strings.NewReader(sd), cp.o.AccessConditions.LeaseAccessConditions, nil, cp.o.ClientProvidedKeyOptions) - if err != nil { - return "", fmt.Errorf("failed staging part data: %w", err) - } - return etag, nil + return cp.result, nil } // copier streams a file via chunks in parallel from a reader representing a file. @@ -136,11 +110,9 @@ type copier struct { id *id // reader is the source to be written to storage. - reader *block.HashingReader + reader io.Reader // to is the location we are writing our chunks to. - to blockWriter - toIDs blockWriter - toSizes blockWriter + to blockWriter // errCh is used to hold the first error from our concurrent writers. errCh chan error @@ -176,8 +148,9 @@ func (c *copier) sendChunk() error { buffer := c.o.TransferManager.Get() if len(buffer) == 0 { - return errors.New("TransferManager returned a 0 size buffer, this is a bug in the manager") + return fmt.Errorf("TransferManager returned a 0 size buffer, this is a bug in the manager") } + n, err := io.ReadFull(c.reader, buffer) switch { case err == nil && n == 0: @@ -237,6 +210,7 @@ func (c *copier) close() error { } var err error + c.result, err = c.to.CommitBlockList(c.ctx, c.id.issued(), c.o.BlobHTTPHeaders, c.o.Metadata, c.o.AccessConditions, c.o.BlobAccessTier, c.o.BlobTagsMap, c.o.ClientProvidedKeyOptions) return err } diff --git a/block/azure/multipart_block_writer.go b/block/azure/multipart_block_writer.go new file mode 100644 index 00000000000..deab2e602c4 --- /dev/null +++ b/block/azure/multipart_block_writer.go @@ -0,0 +1,57 @@ +package azure + +import ( + "context" + "encoding/base64" + "encoding/hex" + "fmt" + "io" + "strconv" + "strings" + + "github.com/treeverse/lakefs/block" + + "github.com/Azure/azure-storage-blob-go/azblob" +) + +type MultipartBlockWriter struct { + reader *block.HashingReader // the reader that would be passed to copyFromReader, this is needed in order to get size and md5 + // to is the location we are writing our chunks to. + to azblob.BlockBlobURL + toIDs azblob.BlockBlobURL + toSizes azblob.BlockBlobURL + etag string +} + +func NewMultipartBlockWriter(reader *block.HashingReader, to, toIDs, toSizes azblob.BlockBlobURL) *MultipartBlockWriter { + return &MultipartBlockWriter{ + reader: reader, + to: to, + toIDs: toIDs, + toSizes: toSizes, + } +} +func (m *MultipartBlockWriter) StageBlock(ctx context.Context, s string, seeker io.ReadSeeker, conditions azblob.LeaseAccessConditions, bytes []byte, options azblob.ClientProvidedKeyOptions) (*azblob.BlockBlobStageBlockResponse, error) { + return m.to.StageBlock(ctx, s, seeker, conditions, bytes, options) +} + +func (m *MultipartBlockWriter) CommitBlockList(ctx context.Context, ids []string, headers azblob.BlobHTTPHeaders, metadata azblob.Metadata, conditions azblob.BlobAccessConditions, tierType azblob.AccessTierType, tagsMap azblob.BlobTagsMap, options azblob.ClientProvidedKeyOptions) (*azblob.BlockBlobCommitBlockListResponse, error) { + + m.etag = "\"" + hex.EncodeToString(m.reader.Md5.Sum(nil)) + "\"" + base64Etag := base64.StdEncoding.EncodeToString([]byte(m.etag)) + + // write to blockIDs + pd := strings.Join(ids, "\n") + "\n" + _, err := m.toIDs.StageBlock(ctx, base64Etag, strings.NewReader(pd), conditions.LeaseAccessConditions, nil, options) + if err != nil { + return nil, fmt.Errorf("failed staging part data: %w", err) + } + // write block sizes + sd := strconv.Itoa(int(m.reader.CopiedSize)) + "\n" + _, err = m.toSizes.StageBlock(ctx, base64Etag, strings.NewReader(sd), conditions.LeaseAccessConditions, nil, options) + if err != nil { + return nil, fmt.Errorf("failed staging part data: %w", err) + } + + return &azblob.BlockBlobCommitBlockListResponse{}, err +} From 5cd83540284659aee65743f1dbd6d44fb3d55bf0 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Sun, 14 Feb 2021 17:28:51 +0200 Subject: [PATCH 06/15] Add azure to nessie --- .github/workflows/nessie.yaml | 25 +++++++++++++++++++++++++ nessie/ops/docker-compose.yaml | 2 ++ 2 files changed, 27 insertions(+) diff --git a/.github/workflows/nessie.yaml b/.github/workflows/nessie.yaml index a5856d93e4f..35284aa2314 100644 --- a/.github/workflows/nessie.yaml +++ b/.github/workflows/nessie.yaml @@ -208,6 +208,31 @@ jobs: if: ${{ failure() }} continue-on-error: true run: docker-compose -f nessie/ops/docker-compose.yaml logs --tail=1000 lakefs + - name: Run lakeFS Azure + env: + TAG: ${{ steps.version.outputs.tag }} + # Setting Account_ID as a secret as a way to avoid specifying it here + REPO: ${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.us-east-1.amazonaws.com + LAKEFS_CATALOGER_TYPE: ${{ matrix.cataloger }} + LAKEFS_STATS_ENABLED: "false" + LAKEFS_BLOCKSTORE_TYPE: wasb + LAKEFS_GATEWAYS_S3_DOMAIN_NAME: s3.local.lakefs.io:8000 + DOCKER_REG: ${{ steps.login-ecr.outputs.registry }} + AWS_ACCESS_KEY_ID: "" + AWS_SECRET_ACCESS_KEY: "" + LAKEFS_AZURE_STORAGE_ACCOUNT: ${{ secrets.LAKEFS_AZURE_STORAGE_ACCOUNT }} + LAKEFS_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_AZURE_STORAGE_ACCESS_KEY }} + run: | + docker-compose -f nessie/ops/docker-compose.yaml down -v + docker-compose -f nessie/ops/docker-compose.yaml up --quiet-pull -d + - name: Run Nessie Azure + env: + NESSIE_STORAGE_NAMESPACE: wasb://nessie-system-testing/${{ github.run_number }}-${{ matrix.cataloger }} + run: go test -v ./nessie --system-tests + - name: lakeFS Logs on Azure failure + if: ${{ failure() }} + continue-on-error: true + run: docker-compose -f nessie/ops/docker-compose.yaml logs --tail=1000 lakefs - name: Publish coverage uses: codecov/codecov-action@v1 with: diff --git a/nessie/ops/docker-compose.yaml b/nessie/ops/docker-compose.yaml index 202b5e4f41b..12457aa5ada 100644 --- a/nessie/ops/docker-compose.yaml +++ b/nessie/ops/docker-compose.yaml @@ -18,6 +18,8 @@ services: - LAKEFS_BLOCKSTORE_GS_CREDENTIALS_JSON - LAKEFS_STATS_ENABLED - LAKEFS_CATALOGER_TYPE + - LAKEFS_AZURE_STORAGE_ACCOUNT + - LAKEFS_AZURE_STORAGE_ACCESS_KEY entrypoint: ["/app/wait-for", "postgres:5432", "--", "/app/lakefs", "run"] postgres: image: "postgres:11" From 9be22b7f5b30535b6c3c475f8f6998fc5860b77f Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Sun, 14 Feb 2021 17:48:36 +0200 Subject: [PATCH 07/15] Fix lint errors --- block/azure/adapter.go | 2 +- block/azure/chunkwriting.go | 12 ++++++------ block/azure/multipart_block_writer.go | 1 - block/factory/build.go | 10 ++++++---- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/block/azure/adapter.go b/block/azure/adapter.go index db8b7ae4895..6e53a97294c 100644 --- a/block/azure/adapter.go +++ b/block/azure/adapter.go @@ -136,7 +136,7 @@ func translatePutOpts(opts block.PutOpts) azblob.UploadStreamToBlockBlobOptions func (a *Adapter) Put(obj block.ObjectPointer, sizeBytes int64, reader io.Reader, opts block.PutOpts) error { var err error - //defer reportMetrics("Put", time.Now(), &sizeBytes, &err) + defer reportMetrics("Put", time.Now(), &sizeBytes, &err) qualifiedKey, err := resolveNamespace(obj) if err != nil { diff --git a/block/azure/chunkwriting.go b/block/azure/chunkwriting.go index 0cb7bc4672b..0f5e24de16b 100644 --- a/block/azure/chunkwriting.go +++ b/block/azure/chunkwriting.go @@ -12,10 +12,11 @@ import ( "sync/atomic" "github.com/Azure/azure-storage-blob-go/azblob" - guuid "github.com/google/uuid" ) +var ErrEmptyBuffer = errors.New("TransferManager returned a 0 size buffer, this is a bug in the manager") + // This code is taken from azblob chunkwriting.go // The reason is that the original code commit the data at the end of the copy // In order to support multipart upload we need to save the blockIDs instead of committing them @@ -44,7 +45,7 @@ func defaults(u *azblob.UploadStreamToBlockBlobOptions) error { var err error u.TransferManager, err = azblob.NewStaticBuffer(u.BufferSize, u.MaxBuffers) if err != nil { - return fmt.Errorf("bug: default transfer manager could not be created: %s", err) + return fmt.Errorf("bug: default transfer manager could not be created: %w", err) } return nil } @@ -148,7 +149,7 @@ func (c *copier) sendChunk() error { buffer := c.o.TransferManager.Get() if len(buffer) == 0 { - return fmt.Errorf("TransferManager returned a 0 size buffer, this is a bug in the manager") + return ErrEmptyBuffer } n, err := io.ReadFull(c.reader, buffer) @@ -165,11 +166,11 @@ func (c *copier) sendChunk() error { }, ) return nil - case err != nil && (err == io.EOF || err == io.ErrUnexpectedEOF) && n == 0: + case err != nil && (errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)) && n == 0: return io.EOF } - if err == io.EOF || err == io.ErrUnexpectedEOF { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { id := c.id.next() c.wg.Add(1) c.o.TransferManager.Run( @@ -198,7 +199,6 @@ func (c *copier) write(chunk copierChunk) { c.errCh <- fmt.Errorf("write error: %w", err) return } - return } // close commits our blocks to blob storage and closes our writer. diff --git a/block/azure/multipart_block_writer.go b/block/azure/multipart_block_writer.go index deab2e602c4..3a896f62366 100644 --- a/block/azure/multipart_block_writer.go +++ b/block/azure/multipart_block_writer.go @@ -36,7 +36,6 @@ func (m *MultipartBlockWriter) StageBlock(ctx context.Context, s string, seeker } func (m *MultipartBlockWriter) CommitBlockList(ctx context.Context, ids []string, headers azblob.BlobHTTPHeaders, metadata azblob.Metadata, conditions azblob.BlobAccessConditions, tierType azblob.AccessTierType, tagsMap azblob.BlobTagsMap, options azblob.ClientProvidedKeyOptions) (*azblob.BlockBlobCommitBlockListResponse, error) { - m.etag = "\"" + hex.EncodeToString(m.reader.Md5.Sum(nil)) + "\"" base64Etag := base64.StdEncoding.EncodeToString([]byte(m.etag)) diff --git a/block/factory/build.go b/block/factory/build.go index 260b9e84c5a..7e0dff73710 100644 --- a/block/factory/build.go +++ b/block/factory/build.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "os" "time" "github.com/Azure/azure-storage-blob-go/azblob" @@ -122,11 +123,12 @@ func buildGSAdapter(params params.GS) (*gs.Adapter, error) { func buildAzureAdapter(params params.Azure) (*azure.Adapter, error) { accountName := params.StorageAccount - accountKey := params.StorageAccessKey // TODO(Guys): check if I need to try and get from other places such as env var "AZURE_STORAGE_ACCOUNT" "AZURE_STORAGE_ACCESS_KEY" or any file like .aws/credentials style - if len(accountName) == 0 || len(accountKey) == 0 { - return nil, fmt.Errorf("Either the AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set") - } + accountKey := params.StorageAccessKey + if len(accountName) == 0 && len(accountKey) == 0 { + // fallback to Azure environment variables + accountName, accountKey = os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY") + } // Create a default request pipeline using your storage account name and account key. credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) if err != nil { From 1ce074ba46711d06bf9b12d05d2ca17ee7817889 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Sun, 14 Feb 2021 18:41:45 +0200 Subject: [PATCH 08/15] Fix lint errors --- block/azure/adapter.go | 1 + block/factory/build.go | 15 +++++---------- block/params/block.go | 1 + config/config.go | 6 ++++++ 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/block/azure/adapter.go b/block/azure/adapter.go index 6e53a97294c..45f7d1a6f3b 100644 --- a/block/azure/adapter.go +++ b/block/azure/adapter.go @@ -281,6 +281,7 @@ func (a *Adapter) Copy(sourceObj, destinationObj block.ObjectPointer) error { } func (a *Adapter) CreateMultiPartUpload(obj block.ObjectPointer, r *http.Request, opts block.CreateMultiPartUploadOpts) (string, error) { + // Azure has no create multipart upload var err error defer reportMetrics("CreateMultiPartUpload", time.Now(), nil, &err) diff --git a/block/factory/build.go b/block/factory/build.go index 7e0dff73710..73b7e6c1a7d 100644 --- a/block/factory/build.go +++ b/block/factory/build.go @@ -5,17 +5,14 @@ import ( "errors" "fmt" "os" - "time" - - "github.com/Azure/azure-storage-blob-go/azblob" - - "github.com/treeverse/lakefs/block/azure" "cloud.google.com/go/storage" + "github.com/Azure/azure-storage-blob-go/azblob" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" log "github.com/sirupsen/logrus" "github.com/treeverse/lakefs/block" + "github.com/treeverse/lakefs/block/azure" "github.com/treeverse/lakefs/block/gs" "github.com/treeverse/lakefs/block/local" "github.com/treeverse/lakefs/block/mem" @@ -127,15 +124,13 @@ func buildAzureAdapter(params params.Azure) (*azure.Adapter, error) { if len(accountName) == 0 && len(accountKey) == 0 { // fallback to Azure environment variables accountName, accountKey = os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY") - } // Create a default request pipeline using your storage account name and account key. credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) if err != nil { - return nil, fmt.Errorf("Invalid credentials with error: " + err.Error()) + return nil, fmt.Errorf("invalid credentials : %w", err) } - p := azblob.NewPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: 10 * time.Minute}}) - a := azure.NewAdapter(p, accountName) + p := azblob.NewPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: params.TryTimeout}}) - return a, nil + return azure.NewAdapter(p, accountName), nil } diff --git a/block/params/block.go b/block/params/block.go index 64aed4fa4ac..43bfe0c8fce 100644 --- a/block/params/block.go +++ b/block/params/block.go @@ -35,4 +35,5 @@ type GS struct { type Azure struct { StorageAccount string StorageAccessKey string + TryTimeout time.Duration } diff --git a/config/config.go b/config/config.go index ec58e0e5b4d..3288ba9d459 100644 --- a/config/config.go +++ b/config/config.go @@ -62,6 +62,8 @@ const ( MetaStoreType = "metastore.type" MetaStoreHiveURI = "metastore.hive.uri" MetastoreGlueCatalogID = "metastore.glue.catalog_id" + + DefaultAzureTryTimeout = 10 * time.Minute ) var ( @@ -104,6 +106,7 @@ const ( BlockstoreS3StreamingChunkSizeKey = "blockstore.s3.streaming_chunk_size" BlockstoreS3StreamingChunkTimeoutKey = "blockstore.s3.streaming_chunk_timeout" BlockstoreS3MaxRetriesKey = "blockstore.s3.max_retries" + BlockstoreAzureTryTimeout = "blockstore.azure.try_timout" CommittedLocalCacheSizeBytesKey = "committed.local_cache.size_bytes" CommittedLocalCacheDirKey = "committed.local_cache.dir" @@ -166,6 +169,8 @@ func setDefaults() { viper.SetDefault(StatsEnabledKey, DefaultStatsEnabled) viper.SetDefault(StatsAddressKey, DefaultStatsAddr) viper.SetDefault(StatsFlushIntervalKey, DefaultStatsFlushInterval) + + viper.SetDefault(BlockstoreAzureTryTimeout, DefaultAzureTryTimeout) } type Configurator interface { @@ -320,6 +325,7 @@ func (c *Config) GetBlockAdapterAzureParams() (blockparams.Azure, error) { return blockparams.Azure{ StorageAccount: viper.GetString("blockstore.azure.storage_account"), StorageAccessKey: viper.GetString("blockstore.azure.storage_access_key"), + TryTimeout: viper.GetDuration(BlockstoreAzureTryTimeout), }, nil } From 08759a1398e542fa524d84628ab7838bc8ecb0b7 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Tue, 16 Feb 2021 08:19:13 +0200 Subject: [PATCH 09/15] Code review changes --- block/azure/adapter.go | 255 ++++++-------------------- block/azure/chunkwriting.go | 32 ---- block/azure/multipart_block_writer.go | 150 ++++++++++++++- block/factory/build.go | 8 +- block/params/block.go | 1 + config/config.go | 14 +- 6 files changed, 212 insertions(+), 248 deletions(-) diff --git a/block/azure/adapter.go b/block/azure/adapter.go index 45f7d1a6f3b..bde59f25833 100644 --- a/block/azure/adapter.go +++ b/block/azure/adapter.go @@ -1,26 +1,16 @@ package azure import ( - "bufio" "context" - "encoding/base64" - "encoding/hex" "errors" "fmt" "io" "net/http" "net/url" - "sort" - "strconv" - "strings" "time" - guuid "github.com/google/uuid" - "github.com/Azure/azure-pipeline-go/pipeline" - "github.com/Azure/azure-storage-blob-go/azblob" - "github.com/treeverse/lakefs/block" "github.com/treeverse/lakefs/logging" ) @@ -34,25 +24,26 @@ const ( sizeSuffix = "_size" idSuffix = "_id" _1MiB = 1024 * 1024 + MaxBuffers = 1 defaultMaxRetryRequests = 0 ) type Adapter struct { ctx context.Context - p pipeline.Pipeline - accountName string + pipeline pipeline.Pipeline configurations configurations + serviceURL string } type configurations struct { retryReaderOptions azblob.RetryReaderOptions } -func NewAdapter(pipeline pipeline.Pipeline, accountName string, opts ...func(a *Adapter)) *Adapter { +func NewAdapter(pipeline pipeline.Pipeline, serviceURL string, opts ...func(a *Adapter)) *Adapter { a := &Adapter{ ctx: context.Background(), - accountName: accountName, - p: pipeline, + serviceURL: serviceURL, + pipeline: pipeline, configurations: configurations{retryReaderOptions: azblob.RetryReaderOptions{MaxRetryRequests: defaultMaxRetryRequests}}, } for _, opt := range opts { @@ -63,16 +54,12 @@ func NewAdapter(pipeline pipeline.Pipeline, accountName string, opts ...func(a * func (a *Adapter) WithContext(ctx context.Context) block.Adapter { return &Adapter{ - p: a.p, - accountName: a.accountName, - ctx: ctx, + pipeline: a.pipeline, + serviceURL: a.serviceURL, + ctx: ctx, } } -func (a *Adapter) log() logging.Logger { - return logging.FromContext(a.ctx) -} - func resolveNamespace(obj block.ObjectPointer) (block.QualifiedKey, error) { qualifiedKey, err := block.ResolveNamespace(obj.StorageNamespace, obj.Identifier) if err != nil { @@ -100,30 +87,11 @@ func (a *Adapter) GenerateInventory(ctx context.Context, logger logging.Logger, } func (a *Adapter) getContainerURL(containerName string) azblob.ContainerURL { - URL, _ := url.Parse( - fmt.Sprintf("https://%s.blob.core.windows.net/%s", a.accountName, containerName)) - - return azblob.NewContainerURL(*URL, a.p) -} - -func (a *Adapter) getBlobURL(containerName, fileName string) azblob.BlobURL { - containerURL := a.getContainerURL(containerName) - - return containerURL.NewBlobURL(fileName) -} - -func (a *Adapter) getBlockBlobURL(containerName, fileName string) azblob.BlockBlobURL { - containerURL := a.getContainerURL(containerName) - - return containerURL.NewBlockBlobURL(fileName) -} - -func (a *Adapter) getIDURL(containerName, fileName string) azblob.BlockBlobURL { - return a.getBlockBlobURL(containerName, fileName+idSuffix) -} - -func (a *Adapter) getSizeURL(containerName, fileName string) azblob.BlockBlobURL { - return a.getBlockBlobURL(containerName, fileName+sizeSuffix) + u, err := url.Parse(fmt.Sprintf("%s/%s", a.serviceURL, containerName)) + if err != nil { + panic(err) + } + return azblob.NewContainerURL(*u, a.pipeline) } func translatePutOpts(opts block.PutOpts) azblob.UploadStreamToBlockBlobOptions { @@ -142,7 +110,8 @@ func (a *Adapter) Put(obj block.ObjectPointer, sizeBytes int64, reader io.Reader if err != nil { return err } - blobURL := a.getBlockBlobURL(qualifiedKey.StorageNamespace, qualifiedKey.Key) + container := a.getContainerURL(qualifiedKey.StorageNamespace) + blobURL := container.NewBlockBlobURL(qualifiedKey.Key) _, err = azblob.UploadStreamToBlockBlob(a.ctx, reader, blobURL, translatePutOpts(opts)) return err @@ -167,7 +136,8 @@ func (a *Adapter) Download(obj block.ObjectPointer, offset, count int64) (io.Rea if err != nil { return nil, err } - blobURL := a.getBlobURL(qualifiedKey.StorageNamespace, qualifiedKey.Key) + container := a.getContainerURL(qualifiedKey.StorageNamespace) + blobURL := container.NewBlobURL(qualifiedKey.Key) keyOptions := azblob.ClientProvidedKeyOptions{} downloadResponse, err := blobURL.Download(a.ctx, offset, count, azblob.BlobAccessConditions{}, false, keyOptions) @@ -175,7 +145,6 @@ func (a *Adapter) Download(obj block.ObjectPointer, offset, count int64) (io.Rea if err != nil { return nil, err } - // NOTE: automatically retries are performed if the connection fails bodyStream := downloadResponse.Body(a.configurations.retryReaderOptions) return bodyStream, nil } @@ -216,12 +185,15 @@ func (a *Adapter) Exists(obj block.ObjectPointer) (bool, error) { return false, err } - blobURL := a.getBlobURL(qualifiedKey.StorageNamespace, qualifiedKey.Key) + container := a.getContainerURL(qualifiedKey.StorageNamespace) + blobURL := container.NewBlobURL(qualifiedKey.Key) + _, err = blobURL.GetProperties(a.ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) - if err != nil { - if err.(azblob.StorageError).ServiceCode() == azblob.ServiceCodeBlobNotFound { - return false, nil - } + var storageErr azblob.StorageError + + if errors.As(err, &storageErr) && storageErr.ServiceCode() == azblob.ServiceCodeBlobNotFound { + return false, nil + } else if err != nil { return false, err } return true, nil @@ -236,7 +208,9 @@ func (a *Adapter) GetProperties(obj block.ObjectPointer) (block.Properties, erro return block.Properties{}, err } - blobURL := a.getBlobURL(qualifiedKey.StorageNamespace, qualifiedKey.Key) + container := a.getContainerURL(qualifiedKey.StorageNamespace) + blobURL := container.NewBlobURL(qualifiedKey.Key) + props, err := blobURL.GetProperties(a.ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) if err != nil { return block.Properties{}, err @@ -254,9 +228,9 @@ func (a *Adapter) Remove(obj block.ObjectPointer) error { return err } - containerName := qualifiedKey.StorageNamespace - fileName := qualifiedKey.Key - blobURL := a.getBlobURL(containerName, fileName) + container := a.getContainerURL(qualifiedKey.StorageNamespace) + blobURL := container.NewBlobURL(qualifiedKey.Key) + _, err = blobURL.Delete(a.ctx, "", azblob.BlobAccessConditions{}) return err } @@ -273,9 +247,11 @@ func (a *Adapter) Copy(sourceObj, destinationObj block.ObjectPointer) error { if err != nil { return err } + sourceContainer := a.getContainerURL(qualifiedSourceKey.StorageNamespace) + sourceURL := sourceContainer.NewBlobURL(qualifiedSourceKey.Key) - sourceURL := a.getBlobURL(qualifiedSourceKey.StorageNamespace, qualifiedSourceKey.Key) - destinationURL := a.getBlobURL(qualifiedDestinationKey.StorageNamespace, qualifiedDestinationKey.Key) + destinationContainer := a.getContainerURL(qualifiedDestinationKey.StorageNamespace) + destinationURL := destinationContainer.NewBlobURL(qualifiedDestinationKey.Key) _, err = destinationURL.StartCopyFromURL(a.ctx, sourceURL.URL(), azblob.Metadata{}, azblob.ModifiedAccessConditions{}, azblob.BlobAccessConditions{}, azblob.AccessTierNone, azblob.BlobTagsMap{}) return err } @@ -302,17 +278,18 @@ func (a *Adapter) UploadPart(obj block.ObjectPointer, size int64, reader io.Read return "", err } - containerName := qualifiedKey.StorageNamespace - objName := qualifiedKey.Key - - blobURL := a.getBlockBlobURL(containerName, objName) - blobIDsURL := a.getIDURL(containerName, objName) - blobSizesURL := a.getSizeURL(containerName, objName) - + container := a.getContainerURL(qualifiedKey.StorageNamespace) hashReader := block.NewHashingReader(reader, block.HashFunctionMD5) - multipartBlockWriter := NewMultipartBlockWriter(hashReader, blobURL, blobIDsURL, blobSizesURL) - _, err = copyFromReader(a.ctx, hashReader, multipartBlockWriter, azblob.UploadStreamToBlockBlobOptions{}) + transferManager, err := azblob.NewStaticBuffer(_1MiB, MaxBuffers) + if err != nil { + return "", err + } + defer transferManager.Close() + multipartBlockWriter := NewMultipartBlockWriter(hashReader, container, qualifiedKey.Key) + _, err = copyFromReader(a.ctx, hashReader, multipartBlockWriter, azblob.UploadStreamToBlockBlobOptions{ + TransferManager: transferManager, + }) if err != nil { return "", err } @@ -333,13 +310,6 @@ func (a *Adapter) UploadCopyPartRange(sourceObj, destinationObj block.ObjectPoin return a.copyPartRange(sourceObj, destinationObj, startPosition, endPosition-startPosition+1) } -func generateRandomBlockID() string { - uu := guuid.New() - u := [64]byte{} - copy(u[:], uu[:]) - return base64.StdEncoding.EncodeToString(u[:]) -} - func (a *Adapter) copyPartRange(sourceObj, destinationObj block.ObjectPointer, startPosition, count int64) (string, error) { qualifiedSourceKey, err := resolveNamespace(sourceObj) if err != nil { @@ -351,43 +321,11 @@ func (a *Adapter) copyPartRange(sourceObj, destinationObj block.ObjectPointer, s return "", err } - containerName := qualifiedDestinationKey.StorageNamespace - objName := qualifiedDestinationKey.Key - - blobURL := a.getBlockBlobURL(containerName, objName) + destinationContainer := a.getContainerURL(qualifiedDestinationKey.StorageNamespace) + sourceContainer := a.getContainerURL(qualifiedSourceKey.StorageNamespace) + sourceBlobURL := sourceContainer.NewBlockBlobURL(qualifiedSourceKey.Key) - sourceBlobURL := a.getBlockBlobURL(qualifiedSourceKey.StorageNamespace, qualifiedSourceKey.Key) - - base64BlockID := generateRandomBlockID() - _, err = blobURL.StageBlockFromURL(a.ctx, base64BlockID, sourceBlobURL.URL(), startPosition, count, azblob.LeaseAccessConditions{}, azblob.ModifiedAccessConditions{}, azblob.ClientProvidedKeyOptions{}) - if err != nil { - return "", err - } - - // add size and id to etag - response, err := sourceBlobURL.GetProperties(a.ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) - if err != nil { - return "", err - } - etag := "\"" + hex.EncodeToString(response.ContentMD5()) + "\"" - size := response.ContentLength() - base64Etag := base64.StdEncoding.EncodeToString([]byte(etag)) - // stage id data - blobIDsURL := a.getIDURL(containerName, objName) - _, err = blobIDsURL.StageBlock(a.ctx, base64Etag, strings.NewReader(base64BlockID+"\n"), azblob.LeaseAccessConditions{}, nil, azblob.ClientProvidedKeyOptions{}) - if err != nil { - return "", fmt.Errorf("failed staging part data: %w", err) - } - - // stage size data - sizeData := strconv.Itoa(int(size)) + "\n" - blobSizesURL := a.getSizeURL(containerName, objName) - _, err = blobSizesURL.StageBlock(a.ctx, base64Etag, strings.NewReader(sizeData), azblob.LeaseAccessConditions{}, nil, azblob.ClientProvidedKeyOptions{}) - if err != nil { - return "", fmt.Errorf("failed staging part data: %w", err) - } - - return etag, nil + return copyPartRange(a.ctx, destinationContainer, qualifiedDestinationKey.Key, sourceBlobURL, startPosition, count) } func (a *Adapter) AbortMultiPartUpload(_ block.ObjectPointer, _ string) error { @@ -410,96 +348,7 @@ func (a *Adapter) CompleteMultiPartUpload(obj block.ObjectPointer, uploadID stri if err != nil { return nil, 0, err } - containerName := qualifiedKey.StorageNamespace - _ = a.log().WithFields(logging.Fields{ - "upload_id": uploadID, - "qualified_ns": qualifiedKey.StorageNamespace, - "qualified_key": qualifiedKey.Key, - "key": obj.Identifier, - }) - - parts := multipartList.Part - sort.Slice(parts, func(i, j int) bool { - return *parts[i].PartNumber < *parts[j].PartNumber - }) - // extract staging blockIDs - metaBlockIDs := make([]string, 0) - for _, part := range multipartList.Part { - base64Etag := base64.StdEncoding.EncodeToString([]byte(*part.ETag)) - metaBlockIDs = append(metaBlockIDs, base64Etag) - } - - fileName := qualifiedKey.Key - stageBlockIDs, err := a.getMultipartIDs(containerName, fileName, metaBlockIDs) - - size, err := a.getMultipartSize(containerName, fileName, metaBlockIDs) - - blobURL := a.getBlockBlobURL(containerName, fileName) - - res, err := blobURL.CommitBlockList(a.ctx, stageBlockIDs, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, azblob.AccessTierNone, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}) - if err != nil { - return nil, 0, err - } - etag := string(res.ETag()) - etag = etag[1 : len(etag)-2] - // list bucket parts and validate request match - return &etag, int64(size), nil -} - -func (a *Adapter) getMultipartIDs(containerName, fileName string, base64BlockIDs []string) ([]string, error) { - blobURL := a.getIDURL(containerName, fileName) - _, err := blobURL.CommitBlockList(a.ctx, base64BlockIDs, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, azblob.AccessTierNone, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}) - if err != nil { - return nil, err - } - - downloadResponse, err := blobURL.Download(a.ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) - if err != nil { - return nil, err - } - bodyStream := downloadResponse.Body(a.configurations.retryReaderOptions) - scanner := bufio.NewScanner(bodyStream) - ids := make([]string, 0) - for scanner.Scan() { - id := scanner.Text() - ids = append(ids, id) - } + containerURL := a.getContainerURL(qualifiedKey.StorageNamespace) - // remove - _, err = blobURL.Delete(a.ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) - if err != nil { - a.log().WithError(err).Warn("Failed to delete multipart ids data file") - } - return ids, nil -} - -func (a *Adapter) getMultipartSize(containerName, fileName string, base64BlockIDs []string) (int, error) { - sizeURL := a.getSizeURL(containerName, fileName) - _, err := sizeURL.CommitBlockList(a.ctx, base64BlockIDs, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, "", azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}) - if err != nil { - return 0, err - } - - downloadResponse, err := sizeURL.Download(a.ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) - if err != nil { - return 0, err - } - bodyStream := downloadResponse.Body(a.configurations.retryReaderOptions) - scanner := bufio.NewScanner(bodyStream) - size := 0 - for scanner.Scan() { - s := scanner.Text() - stageSize, err := strconv.Atoi(s) - if err != nil { - return 0, err - } - size += stageSize - } - - // remove - _, err = sizeURL.Delete(a.ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) - if err != nil { - a.log().WithError(err).Warn("Failed to delete multipart size data file") - } - return size, nil + return CompleteMultipart(a.ctx, multipartList.Part, containerURL, qualifiedKey.Key, a.configurations.retryReaderOptions) } diff --git a/block/azure/chunkwriting.go b/block/azure/chunkwriting.go index 0f5e24de16b..9dc4141a68f 100644 --- a/block/azure/chunkwriting.go +++ b/block/azure/chunkwriting.go @@ -29,40 +29,8 @@ type blockWriter interface { CommitBlockList(context.Context, []string, azblob.BlobHTTPHeaders, azblob.Metadata, azblob.BlobAccessConditions, azblob.AccessTierType, azblob.BlobTagsMap, azblob.ClientProvidedKeyOptions) (*azblob.BlockBlobCommitBlockListResponse, error) } -func defaults(u *azblob.UploadStreamToBlockBlobOptions) error { - if u.TransferManager != nil { - return nil - } - - if u.MaxBuffers == 0 { - u.MaxBuffers = 1 - } - - if u.BufferSize < _1MiB { - u.BufferSize = _1MiB - } - - var err error - u.TransferManager, err = azblob.NewStaticBuffer(u.BufferSize, u.MaxBuffers) - if err != nil { - return fmt.Errorf("bug: default transfer manager could not be created: %w", err) - } - return nil -} - // copyFromReader copies a source io.Reader to blob storage using concurrent uploads. -// TODO(someone): The existing model provides a buffer size and buffer limit as limiting factors. The buffer size is probably -// useless other than needing to be above some number, as the network stack is going to hack up the buffer over some size. The -// max buffers is providing a cap on how much memory we use (by multiplying it times the buffer size) and how many go routines can upload -// at a time. I think having a single max memory dial would be more efficient. We can choose an internal buffer size that works -// well, 4 MiB or 8 MiB, and autoscale to as many goroutines within the memory limit. This gives a single dial to tweak and we can -// choose a max value for the memory setting based on internal transfers within Azure (which will give us the maximum throughput model). -// We can even provide a utility to dial this number in for customer networks to optimize their copies. func copyFromReader(ctx context.Context, from io.Reader, to blockWriter, o azblob.UploadStreamToBlockBlobOptions) (*azblob.BlockBlobCommitBlockListResponse, error) { - if err := defaults(&o); err != nil { - return nil, err - } - ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/block/azure/multipart_block_writer.go b/block/azure/multipart_block_writer.go index 3a896f62366..1b8ca6be499 100644 --- a/block/azure/multipart_block_writer.go +++ b/block/azure/multipart_block_writer.go @@ -1,17 +1,21 @@ package azure import ( + "bufio" "context" "encoding/base64" "encoding/hex" "fmt" "io" + "sort" "strconv" "strings" - "github.com/treeverse/lakefs/block" - "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/google/uuid" + "github.com/treeverse/lakefs/block" + "github.com/treeverse/lakefs/logging" ) type MultipartBlockWriter struct { @@ -23,14 +27,15 @@ type MultipartBlockWriter struct { etag string } -func NewMultipartBlockWriter(reader *block.HashingReader, to, toIDs, toSizes azblob.BlockBlobURL) *MultipartBlockWriter { +func NewMultipartBlockWriter(reader *block.HashingReader, containerURL azblob.ContainerURL, objName string) *MultipartBlockWriter { return &MultipartBlockWriter{ reader: reader, - to: to, - toIDs: toIDs, - toSizes: toSizes, + to: containerURL.NewBlockBlobURL(objName), + toIDs: containerURL.NewBlockBlobURL(objName + idSuffix), + toSizes: containerURL.NewBlockBlobURL(objName + sizeSuffix), } } + func (m *MultipartBlockWriter) StageBlock(ctx context.Context, s string, seeker io.ReadSeeker, conditions azblob.LeaseAccessConditions, bytes []byte, options azblob.ClientProvidedKeyOptions) (*azblob.BlockBlobStageBlockResponse, error) { return m.to.StageBlock(ctx, s, seeker, conditions, bytes, options) } @@ -54,3 +59,136 @@ func (m *MultipartBlockWriter) CommitBlockList(ctx context.Context, ids []string return &azblob.BlockBlobCommitBlockListResponse{}, err } + +func CompleteMultipart(ctx context.Context, parts []*s3.CompletedPart, container azblob.ContainerURL, objName string, retryOptions azblob.RetryReaderOptions) (*string, int64, error) { + sort.Slice(parts, func(i, j int) bool { + return *parts[i].PartNumber < *parts[j].PartNumber + }) + // extract staging blockIDs + metaBlockIDs := make([]string, len(parts)) + for i, part := range parts { + base64Etag := base64.StdEncoding.EncodeToString([]byte(*part.ETag)) + metaBlockIDs[i] = base64Etag + } + + stageBlockIDs, err := getMultipartIDs(ctx, container, objName, metaBlockIDs, retryOptions) + if err != nil { + return nil, 0, err + } + size, err := getMultipartSize(ctx, container, objName, metaBlockIDs, retryOptions) + if err != nil { + return nil, 0, err + } + blobURL := container.NewBlockBlobURL(objName) + + res, err := blobURL.CommitBlockList(ctx, stageBlockIDs, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, azblob.AccessTierNone, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return nil, 0, err + } + etag := string(res.ETag()) + return &etag, int64(size), nil +} + +func getMultipartIDs(ctx context.Context, container azblob.ContainerURL, objName string, base64BlockIDs []string, retryOptions azblob.RetryReaderOptions) ([]string, error) { + blobURL := container.NewBlockBlobURL(objName + idSuffix) + _, err := blobURL.CommitBlockList(ctx, base64BlockIDs, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, azblob.AccessTierNone, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return nil, err + } + + downloadResponse, err := blobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return nil, err + } + bodyStream := downloadResponse.Body(retryOptions) + defer func() { + _ = bodyStream.Close() + }() + scanner := bufio.NewScanner(bodyStream) + ids := make([]string, 0) + for scanner.Scan() { + id := scanner.Text() + ids = append(ids, id) + } + + // remove + _, err = blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) + if err != nil { + logging.Default().WithContext(ctx).WithField("blob_url", blobURL.String()).WithError(err).Warn("Failed to delete multipart ids data file") + } + return ids, nil +} + +func getMultipartSize(ctx context.Context, container azblob.ContainerURL, objName string, base64BlockIDs []string, retryOptions azblob.RetryReaderOptions) (int, error) { + blobURL := container.NewBlockBlobURL(objName + sizeSuffix) + _, err := blobURL.CommitBlockList(ctx, base64BlockIDs, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{}, "", azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return 0, err + } + + downloadResponse, err := blobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return 0, err + } + bodyStream := downloadResponse.Body(retryOptions) + defer func() { + _ = bodyStream.Close() + }() + scanner := bufio.NewScanner(bodyStream) + size := 0 + for scanner.Scan() { + s := scanner.Text() + stageSize, err := strconv.Atoi(s) + if err != nil { + return 0, err + } + size += stageSize + } + + // remove + _, err = blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{}) + if err != nil { + logging.Default().WithContext(ctx).WithField("blob_url", blobURL.String()).WithError(err).Warn("Failed to delete multipart size data file") + } + return size, nil +} + +func copyPartRange(ctx context.Context, destinationContainer azblob.ContainerURL, destinationObjName string, sourceBlobURL azblob.BlockBlobURL, startPosition, count int64) (string, error) { + base64BlockID := generateRandomBlockID() + _, err := sourceBlobURL.StageBlockFromURL(ctx, base64BlockID, sourceBlobURL.URL(), startPosition, count, azblob.LeaseAccessConditions{}, azblob.ModifiedAccessConditions{}, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return "", err + } + + // add size and id to etag + response, err := sourceBlobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return "", err + } + etag := "\"" + hex.EncodeToString(response.ContentMD5()) + "\"" + size := response.ContentLength() + base64Etag := base64.StdEncoding.EncodeToString([]byte(etag)) + // stage id data + blobIDsURL := destinationContainer.NewBlockBlobURL(destinationObjName + idSuffix) + _, err = blobIDsURL.StageBlock(ctx, base64Etag, strings.NewReader(base64BlockID+"\n"), azblob.LeaseAccessConditions{}, nil, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return "", fmt.Errorf("failed staging part data: %w", err) + } + + // stage size data + sizeData := strconv.Itoa(int(size)) + "\n" + blobSizesURL := destinationContainer.NewBlockBlobURL(destinationObjName + sizeSuffix) + _, err = blobSizesURL.StageBlock(ctx, base64Etag, strings.NewReader(sizeData), azblob.LeaseAccessConditions{}, nil, azblob.ClientProvidedKeyOptions{}) + if err != nil { + return "", fmt.Errorf("failed staging part data: %w", err) + } + + return etag, nil +} + +func generateRandomBlockID() string { + uu := uuid.New() + u := [64]byte{} + copy(u[:], uu[:]) + return base64.StdEncoding.EncodeToString(u[:]) +} diff --git a/block/factory/build.go b/block/factory/build.go index 73b7e6c1a7d..21e587025c7 100644 --- a/block/factory/build.go +++ b/block/factory/build.go @@ -130,7 +130,11 @@ func buildAzureAdapter(params params.Azure) (*azure.Adapter, error) { if err != nil { return nil, fmt.Errorf("invalid credentials : %w", err) } - p := azblob.NewPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: params.TryTimeout}}) + pipeline := azblob.NewPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: params.TryTimeout}}) + serviceURL := params.ServiceURL - return azure.NewAdapter(p, accountName), nil + if serviceURL == "" { + serviceURL = fmt.Sprintf("https://%s.blob.core.windows.net", accountName) + } + return azure.NewAdapter(pipeline, serviceURL), nil } diff --git a/block/params/block.go b/block/params/block.go index 43bfe0c8fce..ce493740c94 100644 --- a/block/params/block.go +++ b/block/params/block.go @@ -35,5 +35,6 @@ type GS struct { type Azure struct { StorageAccount string StorageAccessKey string + ServiceURL string TryTimeout time.Duration } diff --git a/config/config.go b/config/config.go index 3288ba9d459..b3a33753ef1 100644 --- a/config/config.go +++ b/config/config.go @@ -106,8 +106,11 @@ const ( BlockstoreS3StreamingChunkSizeKey = "blockstore.s3.streaming_chunk_size" BlockstoreS3StreamingChunkTimeoutKey = "blockstore.s3.streaming_chunk_timeout" BlockstoreS3MaxRetriesKey = "blockstore.s3.max_retries" - BlockstoreAzureTryTimeout = "blockstore.azure.try_timout" + BlockstoreAzureTryTimeoutKey = "blockstore.azure.try_timeout" + BlockstoreAzureStorageAccountKey = "blockstore.azure.storage_account" + BlockstoreAzureStorageAccessKey = "blockstore.azure.storage_access_key" + BlockstoreAzureServiceURL = "blockstore.azure.service_url" CommittedLocalCacheSizeBytesKey = "committed.local_cache.size_bytes" CommittedLocalCacheDirKey = "committed.local_cache.dir" CommittedLocalCacheNumUploadersKey = "committed.local_cache.max_uploaders_per_writer" @@ -170,7 +173,7 @@ func setDefaults() { viper.SetDefault(StatsAddressKey, DefaultStatsAddr) viper.SetDefault(StatsFlushIntervalKey, DefaultStatsFlushInterval) - viper.SetDefault(BlockstoreAzureTryTimeout, DefaultAzureTryTimeout) + viper.SetDefault(BlockstoreAzureTryTimeoutKey, DefaultAzureTryTimeout) } type Configurator interface { @@ -323,9 +326,10 @@ func (c *Config) GetBlockAdapterGSParams() (blockparams.GS, error) { } func (c *Config) GetBlockAdapterAzureParams() (blockparams.Azure, error) { return blockparams.Azure{ - StorageAccount: viper.GetString("blockstore.azure.storage_account"), - StorageAccessKey: viper.GetString("blockstore.azure.storage_access_key"), - TryTimeout: viper.GetDuration(BlockstoreAzureTryTimeout), + StorageAccount: viper.GetString(BlockstoreAzureStorageAccountKey), + StorageAccessKey: viper.GetString(BlockstoreAzureStorageAccessKey), + ServiceURL: viper.GetString(BlockstoreAzureServiceURL), + TryTimeout: viper.GetDuration(BlockstoreAzureTryTimeoutKey), }, nil } From 0c56256c88cff072517475b5d408717ed80f7942 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Tue, 16 Feb 2021 10:17:25 +0200 Subject: [PATCH 10/15] Test nessie azure before s3 --- .github/workflows/nessie.yaml | 52 ++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/.github/workflows/nessie.yaml b/.github/workflows/nessie.yaml index 35284aa2314..449b594e52a 100644 --- a/.github/workflows/nessie.yaml +++ b/.github/workflows/nessie.yaml @@ -127,6 +127,33 @@ jobs: - name: Login to Amazon ECR id: login-ecr uses: aws-actions/amazon-ecr-login@v1 + + - name: Run lakeFS Azure + env: + TAG: ${{ steps.version.outputs.tag }} + # Setting Account_ID as a secret as a way to avoid specifying it here + REPO: ${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.us-east-1.amazonaws.com + LAKEFS_CATALOGER_TYPE: ${{ matrix.cataloger }} + LAKEFS_STATS_ENABLED: "false" + LAKEFS_BLOCKSTORE_TYPE: wasb + LAKEFS_GATEWAYS_S3_DOMAIN_NAME: s3.local.lakefs.io:8000 + DOCKER_REG: ${{ steps.login-ecr.outputs.registry }} + AWS_ACCESS_KEY_ID: "" + AWS_SECRET_ACCESS_KEY: "" + LAKEFS_AZURE_STORAGE_ACCOUNT: ${{ secrets.LAKEFS_AZURE_STORAGE_ACCOUNT }} + LAKEFS_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_AZURE_STORAGE_ACCESS_KEY }} + run: | + docker-compose -f nessie/ops/docker-compose.yaml down -v + docker-compose -f nessie/ops/docker-compose.yaml up --quiet-pull -d + - name: Run Nessie Azure + env: + NESSIE_STORAGE_NAMESPACE: wasb://nessie-system-testing/${{ github.run_number }}-${{ matrix.cataloger }} + run: go test -v ./nessie --system-tests + - name: lakeFS Logs on Azure failure + if: ${{ failure() }} + continue-on-error: true + run: docker-compose -f nessie/ops/docker-compose.yaml logs --tail=1000 lakefs + - name: Run lakeFS S3 env: TAG: ${{ steps.version.outputs.tag }} @@ -208,31 +235,6 @@ jobs: if: ${{ failure() }} continue-on-error: true run: docker-compose -f nessie/ops/docker-compose.yaml logs --tail=1000 lakefs - - name: Run lakeFS Azure - env: - TAG: ${{ steps.version.outputs.tag }} - # Setting Account_ID as a secret as a way to avoid specifying it here - REPO: ${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.us-east-1.amazonaws.com - LAKEFS_CATALOGER_TYPE: ${{ matrix.cataloger }} - LAKEFS_STATS_ENABLED: "false" - LAKEFS_BLOCKSTORE_TYPE: wasb - LAKEFS_GATEWAYS_S3_DOMAIN_NAME: s3.local.lakefs.io:8000 - DOCKER_REG: ${{ steps.login-ecr.outputs.registry }} - AWS_ACCESS_KEY_ID: "" - AWS_SECRET_ACCESS_KEY: "" - LAKEFS_AZURE_STORAGE_ACCOUNT: ${{ secrets.LAKEFS_AZURE_STORAGE_ACCOUNT }} - LAKEFS_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_AZURE_STORAGE_ACCESS_KEY }} - run: | - docker-compose -f nessie/ops/docker-compose.yaml down -v - docker-compose -f nessie/ops/docker-compose.yaml up --quiet-pull -d - - name: Run Nessie Azure - env: - NESSIE_STORAGE_NAMESPACE: wasb://nessie-system-testing/${{ github.run_number }}-${{ matrix.cataloger }} - run: go test -v ./nessie --system-tests - - name: lakeFS Logs on Azure failure - if: ${{ failure() }} - continue-on-error: true - run: docker-compose -f nessie/ops/docker-compose.yaml logs --tail=1000 lakefs - name: Publish coverage uses: codecov/codecov-action@v1 with: From 4f227f42aeb8baede92942af67d6c496fe03e429 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Tue, 16 Feb 2021 11:04:18 +0200 Subject: [PATCH 11/15] Test nessie with azure --- block/factory/build.go | 1 + 1 file changed, 1 insertion(+) diff --git a/block/factory/build.go b/block/factory/build.go index 21e587025c7..a8abd8464f0 100644 --- a/block/factory/build.go +++ b/block/factory/build.go @@ -125,6 +125,7 @@ func buildAzureAdapter(params params.Azure) (*azure.Adapter, error) { // fallback to Azure environment variables accountName, accountKey = os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY") } + logging.Default().WithField("account_name", accountName).Warn("Test to see account name from nessie") // Create a default request pipeline using your storage account name and account key. credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) if err != nil { From 73049ced5fcd2e52b5127e95990803839a75009c Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Tue, 16 Feb 2021 12:06:28 +0200 Subject: [PATCH 12/15] Fix azure nessie environment variables --- .github/workflows/nessie.yaml | 52 ++++++++++++++++------------------ nessie/ops/docker-compose.yaml | 4 +-- 2 files changed, 27 insertions(+), 29 deletions(-) diff --git a/.github/workflows/nessie.yaml b/.github/workflows/nessie.yaml index 449b594e52a..9f71b7c7dc7 100644 --- a/.github/workflows/nessie.yaml +++ b/.github/workflows/nessie.yaml @@ -127,33 +127,6 @@ jobs: - name: Login to Amazon ECR id: login-ecr uses: aws-actions/amazon-ecr-login@v1 - - - name: Run lakeFS Azure - env: - TAG: ${{ steps.version.outputs.tag }} - # Setting Account_ID as a secret as a way to avoid specifying it here - REPO: ${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.us-east-1.amazonaws.com - LAKEFS_CATALOGER_TYPE: ${{ matrix.cataloger }} - LAKEFS_STATS_ENABLED: "false" - LAKEFS_BLOCKSTORE_TYPE: wasb - LAKEFS_GATEWAYS_S3_DOMAIN_NAME: s3.local.lakefs.io:8000 - DOCKER_REG: ${{ steps.login-ecr.outputs.registry }} - AWS_ACCESS_KEY_ID: "" - AWS_SECRET_ACCESS_KEY: "" - LAKEFS_AZURE_STORAGE_ACCOUNT: ${{ secrets.LAKEFS_AZURE_STORAGE_ACCOUNT }} - LAKEFS_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_AZURE_STORAGE_ACCESS_KEY }} - run: | - docker-compose -f nessie/ops/docker-compose.yaml down -v - docker-compose -f nessie/ops/docker-compose.yaml up --quiet-pull -d - - name: Run Nessie Azure - env: - NESSIE_STORAGE_NAMESPACE: wasb://nessie-system-testing/${{ github.run_number }}-${{ matrix.cataloger }} - run: go test -v ./nessie --system-tests - - name: lakeFS Logs on Azure failure - if: ${{ failure() }} - continue-on-error: true - run: docker-compose -f nessie/ops/docker-compose.yaml logs --tail=1000 lakefs - - name: Run lakeFS S3 env: TAG: ${{ steps.version.outputs.tag }} @@ -235,6 +208,31 @@ jobs: if: ${{ failure() }} continue-on-error: true run: docker-compose -f nessie/ops/docker-compose.yaml logs --tail=1000 lakefs + - name: Run lakeFS Azure + env: + TAG: ${{ steps.version.outputs.tag }} + # Setting Account_ID as a secret as a way to avoid specifying it here + REPO: ${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.us-east-1.amazonaws.com + LAKEFS_CATALOGER_TYPE: ${{ matrix.cataloger }} + LAKEFS_STATS_ENABLED: "false" + LAKEFS_BLOCKSTORE_TYPE: wasb + LAKEFS_GATEWAYS_S3_DOMAIN_NAME: s3.local.lakefs.io:8000 + DOCKER_REG: ${{ steps.login-ecr.outputs.registry }} + AWS_ACCESS_KEY_ID: "" + AWS_SECRET_ACCESS_KEY: "" + LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCOUNT: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCOUNT }} + LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY }} + run: | + docker-compose -f nessie/ops/docker-compose.yaml down -v + docker-compose -f nessie/ops/docker-compose.yaml up --quiet-pull -d + - name: Run Nessie Azure + env: + NESSIE_STORAGE_NAMESPACE: wasb://nessie-system-testing/${{ github.run_number }}-${{ matrix.cataloger }} + run: go test -v ./nessie --system-tests + - name: lakeFS Logs on Azure failure + if: ${{ failure() }} + continue-on-error: true + run: docker-compose -f nessie/ops/docker-compose.yaml logs --tail=1000 lakefs - name: Publish coverage uses: codecov/codecov-action@v1 with: diff --git a/nessie/ops/docker-compose.yaml b/nessie/ops/docker-compose.yaml index 12457aa5ada..a7c7b1d8e7b 100644 --- a/nessie/ops/docker-compose.yaml +++ b/nessie/ops/docker-compose.yaml @@ -18,8 +18,8 @@ services: - LAKEFS_BLOCKSTORE_GS_CREDENTIALS_JSON - LAKEFS_STATS_ENABLED - LAKEFS_CATALOGER_TYPE - - LAKEFS_AZURE_STORAGE_ACCOUNT - - LAKEFS_AZURE_STORAGE_ACCESS_KEY + - LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCOUNT + - LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY entrypoint: ["/app/wait-for", "postgres:5432", "--", "/app/lakefs", "run"] postgres: image: "postgres:11" From d670bfc1047ef2e72cddb7574462aee76c38e663 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Tue, 16 Feb 2021 13:23:45 +0200 Subject: [PATCH 13/15] Remove log for nessie azure check --- block/factory/build.go | 1 - 1 file changed, 1 deletion(-) diff --git a/block/factory/build.go b/block/factory/build.go index a8abd8464f0..21e587025c7 100644 --- a/block/factory/build.go +++ b/block/factory/build.go @@ -125,7 +125,6 @@ func buildAzureAdapter(params params.Azure) (*azure.Adapter, error) { // fallback to Azure environment variables accountName, accountKey = os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY") } - logging.Default().WithField("account_name", accountName).Warn("Test to see account name from nessie") // Create a default request pipeline using your storage account name and account key. credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) if err != nil { From 4fd7e4fcd562c4baad1b944288691f5c3088b9ea Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Tue, 16 Feb 2021 15:23:16 +0200 Subject: [PATCH 14/15] Fix copyFromReader panic --- block/azure/adapter.go | 16 ++++++++++++++-- block/azure/chunkwriting.go | 3 +-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/block/azure/adapter.go b/block/azure/adapter.go index bde59f25833..8c707dd9527 100644 --- a/block/azure/adapter.go +++ b/block/azure/adapter.go @@ -113,8 +113,20 @@ func (a *Adapter) Put(obj block.ObjectPointer, sizeBytes int64, reader io.Reader container := a.getContainerURL(qualifiedKey.StorageNamespace) blobURL := container.NewBlockBlobURL(qualifiedKey.Key) - _, err = azblob.UploadStreamToBlockBlob(a.ctx, reader, blobURL, translatePutOpts(opts)) - return err + // TODO(Guys): remove this work around once azure fixes panic issue and use azblob.UploadStreamToBlockBlob + transferManager, err := azblob.NewStaticBuffer(_1MiB, MaxBuffers) + if err != nil { + return err + } + uploadOpts := translatePutOpts(opts) + uploadOpts.TransferManager = transferManager + defer transferManager.Close() + resp, err := copyFromReader(a.ctx, reader, blobURL, uploadOpts) + if err != nil { + return err + } + _ = resp == nil // this is done in order to ignore "result 0 is never used" error ( copyFromReader is copied from azure, and we want to keep it with minimum changes) + return nil } func (a *Adapter) Get(obj block.ObjectPointer, _ int64) (io.ReadCloser, error) { diff --git a/block/azure/chunkwriting.go b/block/azure/chunkwriting.go index 9dc4141a68f..fcb52a3590a 100644 --- a/block/azure/chunkwriting.go +++ b/block/azure/chunkwriting.go @@ -51,6 +51,7 @@ func copyFromReader(ctx context.Context, from io.Reader, to blockWriter, o azblo break } } + cp.wg.Wait() // If the error is not EOF, then we have a problem. if err != nil && !errors.Is(err, io.EOF) { return nil, err @@ -171,8 +172,6 @@ func (c *copier) write(chunk copierChunk) { // close commits our blocks to blob storage and closes our writer. func (c *copier) close() error { - c.wg.Wait() - if err := c.getErr(); err != nil { return err } From 49526842d5d95c9952c637c4a4f2871853d857e0 Mon Sep 17 00:00:00 2001 From: guyhardonag Date: Tue, 16 Feb 2021 15:36:17 +0200 Subject: [PATCH 15/15] Decrease multipart test size --- nessie/multipart_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nessie/multipart_test.go b/nessie/multipart_test.go index 7cbff0fb4e8..f49761354ac 100644 --- a/nessie/multipart_test.go +++ b/nessie/multipart_test.go @@ -16,8 +16,8 @@ import ( ) const ( - multipartNumberOfParts = 20 - multipartPartSize = 6 * 1024 * 1024 + multipartNumberOfParts = 7 + multipartPartSize = 5 * 1024 * 1024 ) func TestMultipartUpload(t *testing.T) {