Skip to content

Commit

Permalink
Let BlobAccessDirectoryFetcher use util.VisitProtoBytesFields()
Browse files Browse the repository at this point in the history
On the client side we can also easily run into maximum message size
limit when accessing Tree objects through the virtual file system. This
change reimplements BlobAccessDirectoryFetcher in such a way that it
does streaming against Tree objects where possible.

A nice optimisation is that we can now slice Trees into individual
Directories without unmarshaling them. Only when the size of the
Directory matches that of the one that is requested, do we need to
duplicate and unmarshal. This should speed up ingestion of Tree objects
significantly.
  • Loading branch information
EdSchouten committed Sep 29, 2022
1 parent c208e94 commit 85954ef
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 82 deletions.
8 changes: 5 additions & 3 deletions cmd/bb_worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,16 @@ func main() {
}
globalContentAddressableStorage = re_blobstore.NewExistencePreconditionBlobAccess(globalContentAddressableStorage)

// Cached read access for directory objects stored in the
// Cached read access for Directory objects stored in the
// Content Addressable Storage. All workers make use of the same
// cache, to increase the hit rate.
// cache, to increase the hit rate. This process does not read
// Tree objects.
directoryFetcher, err := cas.NewCachingDirectoryFetcherFromConfiguration(
configuration.DirectoryCache,
cas.NewBlobAccessDirectoryFetcher(
globalContentAddressableStorage,
int(configuration.MaximumMessageSizeBytes)))
/* maximumDirectorySizeBytes = */ int(configuration.MaximumMessageSizeBytes),
/* maximumTreeSizeBytes = */ 0))
if err != nil {
log.Fatal("Failed to create caching directory fetcher: ", err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cas/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"@com_github_buildbarn_bb_storage//pkg/util",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//encoding/protowire",
"@org_golang_google_protobuf//proto",
],
)
Expand Down
169 changes: 97 additions & 72 deletions pkg/cas/blob_access_directory_fetcher.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package cas

import (
"bytes"
"context"
"io"

remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/buildbarn/bb-storage/pkg/blobstore"
Expand All @@ -13,47 +13,78 @@ import (

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protowire"
)

type blobAccessDirectoryFetcher struct {
blobAccess blobstore.BlobAccess
maximumMessageSizeBytes int
slicer treeBlobSlicer
blobAccess blobstore.BlobAccess
slicer treeBlobSlicer
maximumTreeSizeBytes int64
}

// NewBlobAccessDirectoryFetcher creates a DirectoryFetcher that reads
// Directory objects from a BlobAccess based store.
func NewBlobAccessDirectoryFetcher(blobAccess blobstore.BlobAccess, maximumMessageSizeBytes int) DirectoryFetcher {
func NewBlobAccessDirectoryFetcher(blobAccess blobstore.BlobAccess, maximumDirectorySizeBytes int, maximumTreeSizeBytes int64) DirectoryFetcher {
return &blobAccessDirectoryFetcher{
blobAccess: blobAccess,
slicer: treeBlobSlicer{
maximumMessageSizeBytes: maximumMessageSizeBytes,
maximumDirectorySizeBytes: maximumDirectorySizeBytes,
},
maximumTreeSizeBytes: maximumTreeSizeBytes,
}
}

func (df *blobAccessDirectoryFetcher) GetDirectory(ctx context.Context, directoryDigest digest.Digest) (*remoteexecution.Directory, error) {
m, err := df.blobAccess.Get(ctx, directoryDigest).ToProto(&remoteexecution.Directory{}, df.slicer.maximumMessageSizeBytes)
m, err := df.blobAccess.Get(ctx, directoryDigest).ToProto(&remoteexecution.Directory{}, df.slicer.maximumDirectorySizeBytes)
if err != nil {
return nil, err
}
return m.(*remoteexecution.Directory), nil
}

func (df *blobAccessDirectoryFetcher) GetTreeRootDirectory(ctx context.Context, treeDigest digest.Digest) (*remoteexecution.Directory, error) {
m, err := df.blobAccess.Get(ctx, treeDigest).ToProto(&remoteexecution.Tree{}, df.slicer.maximumMessageSizeBytes)
if err != nil {
if treeDigest.GetSizeBytes() > df.maximumTreeSizeBytes {
return nil, status.Errorf(codes.InvalidArgument, "Tree exceeds the maximum permitted size of %d bytes", df.maximumTreeSizeBytes)
}

r := df.blobAccess.Get(ctx, treeDigest).ToReader()
defer r.Close()

var rootDirectory *remoteexecution.Directory
if err := util.VisitProtoBytesFields(r, func(fieldNumber protowire.Number, offsetBytes, sizeBytes int64, fieldReader io.Reader) error {
if fieldNumber == blobstore.TreeRootFieldNumber {
if rootDirectory != nil {
return status.Error(codes.InvalidArgument, "Tree contains multiple root directories")
}
m, err := buffer.NewProtoBufferFromReader(
&remoteexecution.Directory{},
io.NopCloser(fieldReader),
buffer.UserProvided,
).ToProto(&remoteexecution.Directory{}, df.slicer.maximumDirectorySizeBytes)
if err != nil {
return err
}
rootDirectory = m.(*remoteexecution.Directory)
}
return nil
}); err != nil {
if _, copyErr := io.Copy(io.Discard, r); copyErr != nil {
copyErr = err
}
return nil, err
}
tree := m.(*remoteexecution.Tree)
if tree.Root == nil {
if rootDirectory == nil {
return nil, status.Error(codes.InvalidArgument, "Tree does not contain a root directory")
}
return tree.Root, nil
return rootDirectory, nil
}

func (df *blobAccessDirectoryFetcher) GetTreeChildDirectory(ctx context.Context, treeDigest, childDigest digest.Digest) (*remoteexecution.Directory, error) {
m, err := df.blobAccess.GetFromComposite(ctx, treeDigest, childDigest, &df.slicer).ToProto(&remoteexecution.Directory{}, df.slicer.maximumMessageSizeBytes)
if treeDigest.GetSizeBytes() > df.maximumTreeSizeBytes {
return nil, status.Errorf(codes.InvalidArgument, "Tree exceeds the maximum permitted size of %d bytes", df.maximumTreeSizeBytes)
}

m, err := df.blobAccess.GetFromComposite(ctx, treeDigest, childDigest, &df.slicer).ToProto(&remoteexecution.Directory{}, df.slicer.maximumDirectorySizeBytes)
if err != nil {
return nil, err
}
Expand All @@ -66,76 +97,70 @@ func (df *blobAccessDirectoryFetcher) GetTreeChildDirectory(ctx context.Context,
// contents of the Tree just once, but to create entries in its index
// that refer to each of the Directories contained within.
type treeBlobSlicer struct {
maximumMessageSizeBytes int
maximumDirectorySizeBytes int
}

func (bs *treeBlobSlicer) Slice(b buffer.Buffer, requestedChildDigest digest.Digest) (buffer.Buffer, []slicing.BlobSlice) {
// Fetch the Tree object, both in binary and message form.
bData, bMessage := b.CloneCopy(bs.maximumMessageSizeBytes)
treeData, err := bData.ToByteSlice(bs.maximumMessageSizeBytes)
if err != nil {
bMessage.Discard()
return buffer.NewBufferFromError(err), nil
}
treeMessage, err := bMessage.ToProto(&remoteexecution.Tree{}, bs.maximumMessageSizeBytes)
if err != nil {
return buffer.NewBufferFromError(err), nil
}
tree := treeMessage.(*remoteexecution.Tree)
r := b.ToReader()
defer r.Close()

requestedSizeBytes := requestedChildDigest.GetSizeBytes()
digestFunction := requestedChildDigest.GetDigestFunction()
slices := make([]slicing.BlobSlice, 0, len(tree.Children))
treeDataOffset := 0
var slices []slicing.BlobSlice
var bRequested buffer.Buffer
for childIndex, child := range tree.Children {
bData := buffer.NewProtoBufferFromProto(child, buffer.UserProvided)
setRequested := bRequested == nil
if setRequested {
bData, bRequested = bData.CloneCopy(bs.maximumMessageSizeBytes)
}

childData, err := bData.ToByteSlice(bs.maximumMessageSizeBytes)
if err != nil {
bRequested.Discard()
return buffer.NewBufferFromError(util.StatusWrapfWithCode(err, codes.InvalidArgument, "Child directory at index %d", childIndex)), slices
if err := util.VisitProtoBytesFields(r, func(fieldNumber protowire.Number, offsetBytes, sizeBytes int64, fieldReader io.Reader) error {
if fieldNumber == blobstore.TreeChildrenFieldNumber {
var childDigest digest.Digest
if bRequested == nil && sizeBytes == requestedSizeBytes {
// This directory has the same size as
// the one that is requested, so we may
// need to return it. Duplicate it.
b1, b2 := buffer.NewProtoBufferFromReader(
&remoteexecution.Directory{},
io.NopCloser(fieldReader),
buffer.UserProvided,
).CloneCopy(bs.maximumDirectorySizeBytes)

childDigestGenerator := digestFunction.NewGenerator()
if err := b1.IntoWriter(childDigestGenerator); err != nil {
b2.Discard()
return err
}
childDigest = childDigestGenerator.Sum()

if childDigest == requestedChildDigest {
// Found the directory that was
// requested. Return it.
bRequested = b2
} else {
b2.Discard()
}
} else {
// The directory's size doesn't match,
// so we can compute its checksum
// without unmarshaling it.
childDigestGenerator := digestFunction.NewGenerator()
if _, err := io.Copy(childDigestGenerator, fieldReader); err != nil {
return err
}
childDigest = childDigestGenerator.Sum()
}
slices = append(slices, slicing.BlobSlice{
Digest: childDigest,
OffsetBytes: offsetBytes,
SizeBytes: sizeBytes,
})
}

// Obtain the region at which the Directory message is
// stored within the Tree message.
skipBytes := bytes.Index(treeData, childData)
if skipBytes < 0 {
return nil
}); err != nil {
if bRequested != nil {
bRequested.Discard()
return buffer.NewBufferFromError(util.StatusWrapfWithCode(err, codes.InvalidArgument, "Child directory at index %d is not in canonical form", childIndex)), slices
}
childOffsetBytes := treeDataOffset + skipBytes

// Assume that Directory objects in the marshaled Tree
// object are stored in the same order as the
// unmarshaled list. This permits bytes.Index() calls to
// run in linear amortized time.
treeData = treeData[skipBytes+len(childData):]
treeDataOffset += skipBytes + len(childData)

// Create a slice for the Directory.
childDigestGenerator := digestFunction.NewGenerator()
if _, err := childDigestGenerator.Write(childData); err != nil {
panic(err)
}
childDigest := childDigestGenerator.Sum()
slices = append(slices, slicing.BlobSlice{
Digest: childDigestGenerator.Sum(),
OffsetBytes: int64(childOffsetBytes),
SizeBytes: int64(len(childData)),
})

if setRequested && childDigest != requestedChildDigest {
// Current Directory is not the one that was
// requested by the caller.
bRequested.Discard()
bRequested = nil
if _, copyErr := io.Copy(io.Discard, r); copyErr != nil {
copyErr = err
}
return buffer.NewBufferFromError(err), nil
}

if bRequested == nil {
bRequested = buffer.NewBufferFromError(status.Error(codes.InvalidArgument, "Requested child directory is not contained in the tree"))
}
Expand Down
27 changes: 20 additions & 7 deletions pkg/cas/blob_access_directory_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestBlobAccessDirectoryFetcherGetDirectory(t *testing.T) {
ctrl, ctx := gomock.WithContext(context.Background(), t)

blobAccess := mock.NewMockBlobAccess(ctrl)
directoryFetcher := cas.NewBlobAccessDirectoryFetcher(blobAccess, 1000)
directoryFetcher := cas.NewBlobAccessDirectoryFetcher(blobAccess, 1000, 10000)

t.Run("IOError", func(t *testing.T) {
// Failures reading the Directory object should be propagated.
Expand Down Expand Up @@ -74,14 +74,19 @@ func TestBlobAccessDirectoryFetcherGetTreeRootDirectory(t *testing.T) {
ctrl, ctx := gomock.WithContext(context.Background(), t)

blobAccess := mock.NewMockBlobAccess(ctrl)
directoryFetcher := cas.NewBlobAccessDirectoryFetcher(blobAccess, 1000)
directoryFetcher := cas.NewBlobAccessDirectoryFetcher(blobAccess, 1000, 10000)

t.Run("TooBig", func(t *testing.T) {
_, err := directoryFetcher.GetTreeRootDirectory(ctx, digest.MustNewDigest("example", "f5f634611dd11ccba54c7b9d9607c3c2", 100000))
testutil.RequireEqualStatus(t, status.Error(codes.InvalidArgument, "Tree exceeds the maximum permitted size of 10000 bytes"), err)
})

t.Run("IOError", func(t *testing.T) {
// Failures reading the Tree object should be propagated.
treeDigest := digest.MustNewDigest("example", "756b15c8f94b519e96135dcfde0e58c5", 50)

r := mock.NewMockFileReader(ctrl)
r.EXPECT().ReadAt(gomock.Any(), gomock.Any()).Return(0, status.Error(codes.Internal, "I/O error"))
r.EXPECT().ReadAt(gomock.Any(), gomock.Any()).Return(0, status.Error(codes.Internal, "I/O error")).AnyTimes()
r.EXPECT().Close()
blobAccess.EXPECT().Get(ctx, treeDigest).Return(buffer.NewValidatedBufferFromReaderAt(r, 100))

Expand All @@ -97,7 +102,7 @@ func TestBlobAccessDirectoryFetcherGetTreeRootDirectory(t *testing.T) {
blobAccess.EXPECT().Get(ctx, treeDigest).Return(buffer.NewValidatedBufferFromByteSlice([]byte("This is not a Tree object")))

_, err := directoryFetcher.GetTreeRootDirectory(ctx, treeDigest)
testutil.RequirePrefixedStatus(t, status.Error(codes.InvalidArgument, "Failed to unmarshal message: "), err)
testutil.RequireEqualStatus(t, status.Error(codes.InvalidArgument, "Field with number 10 at offset 0 has type 4, while 2 was expected"), err)
})

t.Run("MissingRootDirectory", func(t *testing.T) {
Expand Down Expand Up @@ -138,15 +143,23 @@ func TestBlobAccessDirectoryFetcherGetTreeChildDirectory(t *testing.T) {
ctrl, ctx := gomock.WithContext(context.Background(), t)

blobAccess := mock.NewMockBlobAccess(ctrl)
directoryFetcher := cas.NewBlobAccessDirectoryFetcher(blobAccess, 1000)
directoryFetcher := cas.NewBlobAccessDirectoryFetcher(blobAccess, 1000, 10000)

t.Run("TooBig", func(t *testing.T) {
_, err := directoryFetcher.GetTreeChildDirectory(
ctx,
digest.MustNewDigest("example", "5959bc9570aa7909a09163bb2201f4af", 100000),
digest.MustNewDigest("example", "2c09e7b2ad516c4cd9fc5c244ae08794", 100))
testutil.RequireEqualStatus(t, status.Error(codes.InvalidArgument, "Tree exceeds the maximum permitted size of 10000 bytes"), err)
})

t.Run("IOError", func(t *testing.T) {
// Failures reading the Tree object should be propagated.
treeDigest := digest.MustNewDigest("example", "40d8f0c70941162ee9dfacf8863d23f5", 100)
directoryDigest := digest.MustNewDigest("example", "756b15c8f94b519e96135dcfde0e58c5", 50)

r := mock.NewMockFileReader(ctrl)
r.EXPECT().ReadAt(gomock.Any(), gomock.Any()).Return(0, status.Error(codes.Internal, "I/O error"))
r.EXPECT().ReadAt(gomock.Any(), gomock.Any()).Return(0, status.Error(codes.Internal, "I/O error")).AnyTimes()
r.EXPECT().Close()
blobAccess.EXPECT().GetFromComposite(ctx, treeDigest, directoryDigest, gomock.Any()).
DoAndReturn(func(ctx context.Context, treeDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer {
Expand Down Expand Up @@ -181,7 +194,7 @@ func TestBlobAccessDirectoryFetcherGetTreeChildDirectory(t *testing.T) {
ctx,
treeDigest,
directoryDigest)
testutil.RequirePrefixedStatus(t, status.Error(codes.InvalidArgument, "Failed to unmarshal message: "), err)
testutil.RequireEqualStatus(t, status.Error(codes.InvalidArgument, "Field with number 10 at offset 0 has type 4, while 2 was expected"), err)
})

t.Run("ValidTree", func(t *testing.T) {
Expand Down

0 comments on commit 85954ef

Please sign in to comment.