From 08c2e437c8e2872f128c847aca569382bd540297 Mon Sep 17 00:00:00 2001 From: Ed Schouten Date: Thu, 22 Feb 2024 09:19:08 +0100 Subject: [PATCH] Use the new util.AcquireSemaphore() function This function was added to bb-storage, but there are also some places in bb-remote-execution where its use is relevant. --- pkg/blobstore/batched_store_blob_access.go | 43 ++++++++++++---------- pkg/builder/prefetching_build_executor.go | 4 +- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/pkg/blobstore/batched_store_blob_access.go b/pkg/blobstore/batched_store_blob_access.go index 5207bace..e57f5c35 100644 --- a/pkg/blobstore/batched_store_blob_access.go +++ b/pkg/blobstore/batched_store_blob_access.go @@ -78,29 +78,32 @@ func (ba *batchedStoreBlobAccess) flushLocked(ctx context.Context) { } // Upload the missing ones. - group, groupCtx := errgroup.WithContext(ctx) - for _, digest := range missing.Items() { - key := digest.GetKey(ba.blobKeyFormat) - if pendingPutOperation, ok := ba.pendingPutOperations[key]; ok { - if groupCtx.Err() != nil || ba.putSemaphore.Acquire(groupCtx, 1) != nil { - break - } - delete(ba.pendingPutOperations, key) - group.Go(func() error { - err := ba.BlobAccess.Put(groupCtx, pendingPutOperation.digest, pendingPutOperation.b) - ba.putSemaphore.Release(1) - if err != nil { - return util.StatusWrapf(err, "Failed to store previous blob %s", pendingPutOperation.digest) + if !missing.Empty() { + group, groupCtx := errgroup.WithContext(ctx) + group.Go(func() error { + for _, digest := range missing.Items() { + key := digest.GetKey(ba.blobKeyFormat) + if pendingPutOperation, ok := ba.pendingPutOperations[key]; ok { + if err := util.AcquireSemaphore(groupCtx, ba.putSemaphore, 1); err != nil { + return err + } + delete(ba.pendingPutOperations, key) + group.Go(func() error { + err := ba.BlobAccess.Put(groupCtx, pendingPutOperation.digest, pendingPutOperation.b) + ba.putSemaphore.Release(1) + if err != nil { + return util.StatusWrapf(err, "Failed to store previous blob %s", pendingPutOperation.digest) + } + return nil + }) } - return nil - }) + } + return nil + }) + if err := group.Wait(); err != nil { + ba.flushError = err } } - if err := group.Wait(); err != nil { - ba.flushError = err - } else if err := util.StatusFromContext(ctx); err != nil { - ba.flushError = err - } } func (ba *batchedStoreBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { diff --git a/pkg/builder/prefetching_build_executor.go b/pkg/builder/prefetching_build_executor.go index b4f44508..e8bf2f90 100644 --- a/pkg/builder/prefetching_build_executor.go +++ b/pkg/builder/prefetching_build_executor.go @@ -240,8 +240,8 @@ func (dp *directoryPrefetcher) prefetchRecursively(pathTrace *path.Trace, direct // carry its weight just yet. We should revisit // this once we support chunking/decomposition, // as in that case it is insufficient. - if dp.context.Err() != nil || dp.fileReadSemaphore.Acquire(dp.context, 1) != nil { - return util.StatusFromContext(dp.context) + if err := util.AcquireSemaphore(dp.context, dp.fileReadSemaphore, 1); err != nil { + return err } dp.group.Go(func() error { var b [1]byte