Skip to content

Commit

Permalink
Use the new util.AcquireSemaphore() function
Browse files Browse the repository at this point in the history
This function was added to bb-storage, but there are also some places in
bb-remote-execution where its use is relevant.
  • Loading branch information
EdSchouten committed Feb 22, 2024
1 parent ed4190a commit 08c2e43
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 22 deletions.
43 changes: 23 additions & 20 deletions pkg/blobstore/batched_store_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,29 +78,32 @@ func (ba *batchedStoreBlobAccess) flushLocked(ctx context.Context) {
}

// Upload the missing ones.
group, groupCtx := errgroup.WithContext(ctx)
for _, digest := range missing.Items() {
key := digest.GetKey(ba.blobKeyFormat)
if pendingPutOperation, ok := ba.pendingPutOperations[key]; ok {
if groupCtx.Err() != nil || ba.putSemaphore.Acquire(groupCtx, 1) != nil {
break
}
delete(ba.pendingPutOperations, key)
group.Go(func() error {
err := ba.BlobAccess.Put(groupCtx, pendingPutOperation.digest, pendingPutOperation.b)
ba.putSemaphore.Release(1)
if err != nil {
return util.StatusWrapf(err, "Failed to store previous blob %s", pendingPutOperation.digest)
if !missing.Empty() {
group, groupCtx := errgroup.WithContext(ctx)
group.Go(func() error {
for _, digest := range missing.Items() {
key := digest.GetKey(ba.blobKeyFormat)
if pendingPutOperation, ok := ba.pendingPutOperations[key]; ok {
if err := util.AcquireSemaphore(groupCtx, ba.putSemaphore, 1); err != nil {
return err
}
delete(ba.pendingPutOperations, key)
group.Go(func() error {
err := ba.BlobAccess.Put(groupCtx, pendingPutOperation.digest, pendingPutOperation.b)
ba.putSemaphore.Release(1)
if err != nil {
return util.StatusWrapf(err, "Failed to store previous blob %s", pendingPutOperation.digest)
}
return nil
})
}
return nil
})
}
return nil
})
if err := group.Wait(); err != nil {
ba.flushError = err
}
}
if err := group.Wait(); err != nil {
ba.flushError = err
} else if err := util.StatusFromContext(ctx); err != nil {
ba.flushError = err
}
}

func (ba *batchedStoreBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer.Buffer) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/builder/prefetching_build_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ func (dp *directoryPrefetcher) prefetchRecursively(pathTrace *path.Trace, direct
// carry its weight just yet. We should revisit
// this once we support chunking/decomposition,
// as in that case it is insufficient.
if dp.context.Err() != nil || dp.fileReadSemaphore.Acquire(dp.context, 1) != nil {
return util.StatusFromContext(dp.context)
if err := util.AcquireSemaphore(dp.context, dp.fileReadSemaphore, 1); err != nil {
return err
}
dp.group.Go(func() error {
var b [1]byte
Expand Down

0 comments on commit 08c2e43

Please sign in to comment.