Skip to content

Commit

Permalink
Let CompletenessCheckingBlobAccess use util.VisitProtoBytesFields()
Browse files Browse the repository at this point in the history
Now that we can process REv2 Tree objects in a streaming fashion, let's
reimplement CompletenessCheckingBlobAccess to make use of it. What's
interesting is that we can now treat the root and child directories
identically, as it's just the field number that's different.

What's slightly scary is that by streaming the Tree object, we're
processing the data speculatively. If it turns out the object read from
the CAS is corrupted, results to any of the calls to FindMissing() made
in response to that Tree should be suppressed. Only when the Tree object
has been read in its entirety is it safe to return the original error
response.

This change introduces a small change to the configuration schema. If
you previous had this:

    backend: { completenessChecking: ... },

You will now need to write something along these lines:

    backend: { completenessChecking: {
      backend: ...,
      maximumTotalTreeSizeBytes: 64 * 1024 * 1024,
    } },
  • Loading branch information
EdSchouten committed Sep 29, 2022
1 parent de33e3d commit 1b84fa8
Show file tree
Hide file tree
Showing 6 changed files with 910 additions and 707 deletions.
2 changes: 2 additions & 0 deletions pkg/blobstore/completenesschecking/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:execution",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//encoding/protowire",
],
)

Expand All @@ -31,5 +32,6 @@ go_test(
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//proto",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package completenesschecking

import (
"context"
"io"

remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/buildbarn/bb-storage/pkg/blobstore"
Expand All @@ -12,6 +13,7 @@ import (

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protowire"
)

// findMissingQueue is a helper for calling BlobAccess.FindMissing() in
Expand Down Expand Up @@ -57,20 +59,6 @@ func (q *findMissingQueue) add(blobDigest *remoteexecution.Digest) error {
return nil
}

// AddDirectory adds all digests contained with a directory to the list
// of digests pending to be checked for existence.
func (q *findMissingQueue) addDirectory(directory *remoteexecution.Directory) error {
if directory == nil {
return nil
}
for _, child := range directory.Files {
if err := q.add(child.Digest); err != nil {
return err
}
}
return nil
}

// Finalize by checking the last batch of digests for existence.
func (q *findMissingQueue) finalize() error {
missing, err := q.contentAddressableStorage.FindMissing(q.context, q.pending.Build())
Expand All @@ -88,6 +76,7 @@ type completenessCheckingBlobAccess struct {
contentAddressableStorage blobstore.BlobAccess
batchSize int
maximumMessageSizeBytes int
maximumTotalTreeSizeBytes int64
}

// NewCompletenessCheckingBlobAccess creates a wrapper around
Expand All @@ -104,12 +93,13 @@ type completenessCheckingBlobAccess struct {
// needs to be rebuilt. By calling it, Bazel indicates that all
// associated output files must remain present during the build for
// forward progress to be made.
func NewCompletenessCheckingBlobAccess(actionCache, contentAddressableStorage blobstore.BlobAccess, batchSize, maximumMessageSizeBytes int) blobstore.BlobAccess {
func NewCompletenessCheckingBlobAccess(actionCache, contentAddressableStorage blobstore.BlobAccess, batchSize, maximumMessageSizeBytes int, maximumTotalTreeSizeBytes int64) blobstore.BlobAccess {
return &completenessCheckingBlobAccess{
BlobAccess: actionCache,
contentAddressableStorage: contentAddressableStorage,
batchSize: batchSize,
maximumMessageSizeBytes: maximumMessageSizeBytes,
maximumTotalTreeSizeBytes: maximumTotalTreeSizeBytes,
}
}

Expand Down Expand Up @@ -147,27 +137,49 @@ func (ba *completenessCheckingBlobAccess) checkCompleteness(ctx context.Context,
// Iterate over all remoteexecution.Digest fields contained
// within output directories (remoteexecution.Tree objects)
// referenced by the ActionResult.
remainingTreeSizeBytes := ba.maximumTotalTreeSizeBytes
for _, outputDirectory := range actionResult.OutputDirectories {
treeDigest, err := findMissingQueue.deriveDigest(outputDirectory.TreeDigest)
if err != nil {
return err
}
treeMessage, err := ba.contentAddressableStorage.Get(ctx, treeDigest).ToProto(&remoteexecution.Tree{}, ba.maximumMessageSizeBytes)
if err != nil {
if status.Code(err) == codes.InvalidArgument {
return util.StatusWrapfWithCode(err, codes.NotFound, "Failed to fetch output directory %#v", outputDirectory.Path)
}
return util.StatusWrapf(err, "Failed to fetch output directory %#v", outputDirectory.Path)
}
tree := treeMessage.(*remoteexecution.Tree)
if err := findMissingQueue.addDirectory(tree.Root); err != nil {
return err
sizeBytes := treeDigest.GetSizeBytes()
if sizeBytes > remainingTreeSizeBytes {
return status.Errorf(codes.NotFound, "Combined size of all output directories exceeds maximum limit of %d bytes", ba.maximumTotalTreeSizeBytes)
}
for _, child := range tree.Children {
if err := findMissingQueue.addDirectory(child); err != nil {
return err
remainingTreeSizeBytes -= sizeBytes

r := ba.contentAddressableStorage.Get(ctx, treeDigest).ToReader()
if err := util.VisitProtoBytesFields(r, func(fieldNumber protowire.Number, offsetBytes, sizeBytes int64, fieldReader io.Reader) error {
if fieldNumber == blobstore.TreeRootFieldNumber || fieldNumber == blobstore.TreeChildrenFieldNumber {
directoryMessage, err := buffer.NewProtoBufferFromReader(
&remoteexecution.Directory{},
io.NopCloser(fieldReader),
buffer.UserProvided,
).ToProto(&remoteexecution.Directory{}, ba.maximumMessageSizeBytes)
if err != nil {
return err
}
directory := directoryMessage.(*remoteexecution.Directory)
for _, child := range directory.Files {
if err := findMissingQueue.add(child.Digest); err != nil {
return err
}
}
}
return nil
}); err != nil {
// Any errors generated above may be caused by
// data corruption on the Tree object. Force
// reading the Tree until completion, and prefer
// read errors over any errors generated above.
if _, copyErr := io.Copy(io.Discard, r); copyErr != nil {
err = copyErr
}
r.Close()
return util.StatusWrapf(err, "Output directory %#v", outputDirectory.Path)
}
r.Close()
}
return findMissingQueue.finalize()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package completenesschecking_test

import (
"context"
"io"
"testing"

remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
Expand All @@ -15,6 +16,7 @@ import (

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

func TestCompletenessCheckingBlobAccess(t *testing.T) {
Expand All @@ -25,8 +27,9 @@ func TestCompletenessCheckingBlobAccess(t *testing.T) {
completenessCheckingBlobAccess := completenesschecking.NewCompletenessCheckingBlobAccess(
actionCache,
contentAddressableStorage,
5,
1000)
/* batchSize = */ 5,
/* maximumMessageSizeBytes = */ 1000,
/* maximumTotalTreeSizeBytes = */ 10000)

actionDigest := digest.MustNewDigest("hello", "d41d8cd98f00b204e9800998ecf8427e", 123)

Expand Down Expand Up @@ -141,7 +144,7 @@ func TestCompletenessCheckingBlobAccess(t *testing.T) {
).Return(buffer.NewBufferFromError(status.Error(codes.Internal, "Hard disk has a case of the Mondays")))

_, err := completenessCheckingBlobAccess.Get(ctx, actionDigest).ToProto(&remoteexecution.ActionResult{}, 1000)
testutil.RequireEqualStatus(t, status.Error(codes.Internal, "Failed to fetch output directory \"bazel-out/foo\": Hard disk has a case of the Mondays"), err)
testutil.RequireEqualStatus(t, status.Error(codes.Internal, "Output directory \"bazel-out/foo\": Hard disk has a case of the Mondays"), err)
})

t.Run("GetTreeTooLarge", func(t *testing.T) {
Expand All @@ -164,15 +167,102 @@ func TestCompletenessCheckingBlobAccess(t *testing.T) {
},
},
buffer.BackendProvided(dataIntegrityCallback.Call)))
reader := mock.NewMockReadAtCloser(ctrl)
contentAddressableStorage.EXPECT().Get(

_, err := completenessCheckingBlobAccess.Get(ctx, actionDigest).ToProto(&remoteexecution.ActionResult{}, 1000)
testutil.RequireEqualStatus(t, status.Error(codes.NotFound, "Combined size of all output directories exceeds maximum limit of 10000 bytes"), err)
})

t.Run("GetTreeDataCorruption", func(t *testing.T) {
// Because Tree objects are processed in a streaming
// fashion, it may be the case that we call
// FindMissing() against the CAS, even though we later
// discover that the Tree object was corrupted.
//
// This means that even if FindMissing() reports objects
// as being absent, we cannot terminate immediately. We
// must process the Tree object in its entirety.
dataIntegrityCallback1 := mock.NewMockDataIntegrityCallback(ctrl)
dataIntegrityCallback1.EXPECT().Call(true)
actionCache.EXPECT().Get(ctx, actionDigest).Return(
buffer.NewProtoBufferFromProto(
&remoteexecution.ActionResult{
OutputDirectories: []*remoteexecution.OutputDirectory{
{
Path: "bazel-out/foo",
TreeDigest: &remoteexecution.Digest{
Hash: "8f0450aa5f4602d93968daba6f2e7611",
SizeBytes: 4000,
},
},
},
},
buffer.BackendProvided(dataIntegrityCallback1.Call)))

treeReader := mock.NewMockReadCloser(ctrl)
treeReader.EXPECT().Read(gomock.Any()).
DoAndReturn(func(p []byte) (int, error) {
treeData, err := proto.Marshal(&remoteexecution.Tree{
Root: &remoteexecution.Directory{
Files: []*remoteexecution.FileNode{
{
Digest: &remoteexecution.Digest{
Hash: "024ced29f1fdef2f644f34a071ade5be",
SizeBytes: 1,
},
},
{
Digest: &remoteexecution.Digest{
Hash: "8b3b146b1c4df062a2dc35168cbf4ce6",
SizeBytes: 2,
},
},
{
Digest: &remoteexecution.Digest{
Hash: "4a4a6ebb3f8b062653cb957cbdc047d9",
SizeBytes: 3,
},
},
{
Digest: &remoteexecution.Digest{
Hash: "69778ed3e4dcf4e0c40df49e4ca5bd37",
SizeBytes: 4,
},
},
{
Digest: &remoteexecution.Digest{
Hash: "ff7816e0353299e801a30e37aee1758c",
SizeBytes: 5,
},
},
},
},
})
require.NoError(t, err)
return copy(p, treeData), nil
})
treeReader.EXPECT().Read(gomock.Any()).
DoAndReturn(func(p []byte) (int, error) {
return copy(p, "Garbage"), io.EOF
})
treeReader.EXPECT().Close()
dataIntegrityCallback2 := mock.NewMockDataIntegrityCallback(ctrl)
dataIntegrityCallback2.EXPECT().Call(false)
treeDigest := digest.MustNewDigest("hello", "8f0450aa5f4602d93968daba6f2e7611", 4000)
contentAddressableStorage.EXPECT().Get(ctx, treeDigest).Return(
buffer.NewCASBufferFromReader(treeDigest, treeReader, buffer.BackendProvided(dataIntegrityCallback2.Call)))
contentAddressableStorage.EXPECT().FindMissing(
ctx,
digest.MustNewDigest("hello", "7ef23d85401d061552b188ae0a87d7f8", 1024*1024*1024),
).Return(buffer.NewValidatedBufferFromReaderAt(reader, 1024*1024*1024))
reader.EXPECT().Close()
digest.NewSetBuilder().
Add(treeDigest).
Add(digest.MustNewDigest("hello", "024ced29f1fdef2f644f34a071ade5be", 1)).
Add(digest.MustNewDigest("hello", "8b3b146b1c4df062a2dc35168cbf4ce6", 2)).
Add(digest.MustNewDigest("hello", "4a4a6ebb3f8b062653cb957cbdc047d9", 3)).
Add(digest.MustNewDigest("hello", "69778ed3e4dcf4e0c40df49e4ca5bd37", 4)).
Build(),
).Return(digest.MustNewDigest("hello", "4a4a6ebb3f8b062653cb957cbdc047d9", 3).ToSingletonSet(), nil)

_, err := completenessCheckingBlobAccess.Get(ctx, actionDigest).ToProto(&remoteexecution.ActionResult{}, 1000)
testutil.RequireEqualStatus(t, status.Error(codes.NotFound, "Failed to fetch output directory \"bazel-out/foo\": Buffer is 1073741824 bytes in size, while a maximum of 1000 bytes is permitted"), err)
testutil.RequireEqualStatus(t, status.Error(codes.Internal, "Output directory \"bazel-out/foo\": Buffer is 210 bytes in size, while 4000 bytes were expected"), err)
})

t.Run("Success", func(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/blobstore/configuration/ac_blob_access_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (bac *acBlobAccessCreator) NewCustomBlobAccess(configuration *pb.BlobAccess
if bac.contentAddressableStorage == nil {
return BlobAccessInfo{}, "", status.Error(codes.InvalidArgument, "Action Cache completeness checking can only be enabled if a Content Addressable Storage is configured")
}
base, err := nestedCreator.NewNestedBlobAccess(backend.CompletenessChecking, bac)
base, err := nestedCreator.NewNestedBlobAccess(backend.CompletenessChecking.Backend, bac)
if err != nil {
return BlobAccessInfo{}, "", err
}
Expand All @@ -94,7 +94,8 @@ func (bac *acBlobAccessCreator) NewCustomBlobAccess(configuration *pb.BlobAccess
base.BlobAccess,
bac.contentAddressableStorage.BlobAccess,
blobstore.RecommendedFindMissingDigestsCount,
bac.maximumMessageSizeBytes),
bac.maximumMessageSizeBytes,
backend.CompletenessChecking.MaximumTotalTreeSizeBytes),
DigestKeyFormat: base.DigestKeyFormat.Combine(bac.contentAddressableStorage.DigestKeyFormat),
}, "completeness_checking", nil
case *pb.BlobAccessConfiguration_Grpc:
Expand Down
Loading

0 comments on commit 1b84fa8

Please sign in to comment.