Skip to content

Commit

Permalink
Handle ListBlobs returning less items than requested with cont token (#…
Browse files Browse the repository at this point in the history
…82)

Both in s3 and azure, it is possible that when we call listPrefix with limit=N, we might get a result whose size is smaller than N, but still has a continuation token. This behaviour does not hurt when we are listing the fuse files under a directory. But when doing dir checks i.e. 1) testing if the given path is a directory 2) if a given directory is empty, this can make goofys wrongly think a directory is empty or a given prefix is not a directory.

Add a wrapper in list.go, that does this: If the backend returns less items than requested and has a continuation token, it will use the continuation token to fetch more items.
  • Loading branch information
dotslash authored and kahing committed Aug 15, 2021
1 parent e0dd66b commit 11a5478
Showing 1 changed file with 55 additions and 5 deletions.
60 changes: 55 additions & 5 deletions internal/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (dh *DirHandle) listObjects(prefix string) (resp *ListBlobsOutput, err erro
// is nothing left to list or the last listed entry has all characters > "/"
// Relavant test case: TestReadDirDash
func listBlobsSafe(cloud StorageBackend, param *ListBlobsInput) (*ListBlobsOutput, error) {
res, err := cloud.ListBlobs(param)
res, err := listBlobsWrapper(cloud, param)
if err != nil {
return nil, err
}
Expand All @@ -406,7 +406,7 @@ func listBlobsSafe(cloud StorageBackend, param *ListBlobsInput) (*ListBlobsOutpu
// Get the continuation token from the result.
ContinuationToken: res.NextContinuationToken,
}
nextRes, err := cloud.ListBlobs(nextReq)
nextRes, err := listBlobsWrapper(cloud, nextReq)
if err != nil {
return nil, err
}
Expand All @@ -426,6 +426,56 @@ func listBlobsSafe(cloud StorageBackend, param *ListBlobsInput) (*ListBlobsOutpu
return res, nil
}


// Both in s3 and azure, it is possible that when we call listPrefix with limit=N, we might get a
// result whose size is smaller than N, but still has a continuation token. This behaviour does not
// hurt when we are listing the fuse files under a directory. But when doing dir checks i.e.
// 1) testing if the given path is a directory 2) if a given directory is empty, this can make
// goofys wrongly think a directory is empty or a given prefix is not a directory.
//
// If the backend returns less items than requested and has a continuation token, this will use the
// continuation token to fetch more items.
func listBlobsWrapper(cloud StorageBackend, param *ListBlobsInput) (*ListBlobsOutput, error) {
targetNumElements := NilUint32(param.MaxKeys)
ret, err := cloud.ListBlobs(param)
if targetNumElements == 0 {
// If MaxKeys is not specified (or is 0), we don't need any of the following special handling.
return ret, err
} else if err != nil {
return nil, err
}

for {
curNumElements := uint32(len(ret.Prefixes) + len(ret.Items))
if curNumElements >= targetNumElements {
break // We got all we want. Nothing else to do.
} else if ret.NextContinuationToken == nil {
break // We got all blobs under the prefix.
}

internalResp, err := cloud.ListBlobs(&ListBlobsInput{
Prefix: param.Prefix,
Delimiter: param.Delimiter,
MaxKeys: PUInt32(targetNumElements - curNumElements),
ContinuationToken: ret.NextContinuationToken,
// We will not set StartAfter for page requests. Only the first request might have it.
})
if err != nil {
return nil, err
}

ret = &ListBlobsOutput{
Prefixes: append(ret.Prefixes, internalResp.Prefixes...),
Items: append(ret.Items, internalResp.Items...),
NextContinuationToken: internalResp.NextContinuationToken,
IsTruncated: internalResp.IsTruncated,
RequestId: internalResp.RequestId,
}
}
return ret, nil
}


// LOCKS_REQUIRED(dh.mu)
// LOCKS_EXCLUDED(dh.inode.mu)
// LOCKS_EXCLUDED(dh.inode.fs)
Expand Down Expand Up @@ -972,7 +1022,7 @@ func (parent *Inode) isEmptyDir(fs *Goofys, name string) (isDir bool, err error)
cloud, key := parent.cloud()
key = appendChildName(key, name) + "/"

resp, err := cloud.ListBlobs(&ListBlobsInput{
resp, err := listBlobsWrapper(cloud, &ListBlobsInput{
Delimiter: aws.String("/"),
MaxKeys: PUInt32(2),
Prefix: &key,
Expand Down Expand Up @@ -1306,9 +1356,9 @@ func (parent *Inode) LookUpInodeDir(name string, c chan ListBlobsOutput, errc ch
cloud, key := parent.cloud()
key = appendChildName(key, name) + "/"

resp, err := cloud.ListBlobs(&ListBlobsInput{
resp, err := listBlobsWrapper(cloud, &ListBlobsInput{
Delimiter: aws.String("/"),
// Ideally one result should be sufficient. But when azure hierarchial
// Ideally one result should be sufficient. But when azure hierarchical
// namespaces are enabled, azblob returns "a" when we list blobs under "a/".
// In such cases we remove "a" from the result. So request for 2 blobs.
MaxKeys: PUInt32(2),
Expand Down

0 comments on commit 11a5478

Please sign in to comment.