Skip to content

Commit

Permalink
Handle ListBlobs returning less items than requested with cont token
Browse files Browse the repository at this point in the history
Both in s3 and azure, it is possible that when we call listPrefix with limit=N, we might get a result whose size is smaller than N, but still has a continuation token. This behaviour does not hurt when we are listing the fuse files under a directory. But when doing dir checks i.e. 1) testing if the given path is a directory 2) if a given directory is empty, this can make goofys wrongly think a directory is empty or a given prefix is not a directory.

Add a wrapper in list.go, that does this: If the backend returns less items than requested and has a continuation token, it will use the continuation token to fetch more items.
  • Loading branch information
voyvodov committed Sep 18, 2024
1 parent 8107a99 commit 33d0a7b
Showing 1 changed file with 57 additions and 6 deletions.
63 changes: 57 additions & 6 deletions internal/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (dh *DirHandle) listObjects(prefix string) (resp *ListBlobsOutput, err erro
// is nothing left to list or the last listed entry has all characters > "/"
// Relavant test case: TestReadDirDash
func listBlobsSafe(cloud StorageBackend, param *ListBlobsInput) (*ListBlobsOutput, error) {
res, err := cloud.ListBlobs(param)
res, err := listBlobsWrapper(cloud, param)
if err != nil {
return nil, err
}
Expand All @@ -406,7 +406,7 @@ func listBlobsSafe(cloud StorageBackend, param *ListBlobsInput) (*ListBlobsOutpu
// Get the continuation token from the result.
ContinuationToken: res.NextContinuationToken,
}
nextRes, err := cloud.ListBlobs(nextReq)
nextRes, err := listBlobsWrapper(cloud, nextReq)
if err != nil {
return nil, err
}
Expand All @@ -426,6 +426,54 @@ func listBlobsSafe(cloud StorageBackend, param *ListBlobsInput) (*ListBlobsOutpu
return res, nil
}

// Both in s3 and azure, it is possible that when we call listPrefix with limit=N, we might get a
// result whose size is smaller than N, but still has a continuation token. This behaviour does not
// hurt when we are listing the fuse files under a directory. But when doing dir checks i.e.
// 1) testing if the given path is a directory 2) if a given directory is empty, this can make
// goofys wrongly think a directory is empty or a given prefix is not a directory.
//
// If the backend returns less items than requested and has a continuation token, this will use the
// continuation token to fetch more items.
func listBlobsWrapper(cloud StorageBackend, param *ListBlobsInput) (*ListBlobsOutput, error) {
targetNumElements := NilUint32(param.MaxKeys)
ret, err := cloud.ListBlobs(param)
if targetNumElements == 0 {
// If MaxKeys is not specified (or is 0), we don't need any of the following special handling.
return ret, err
} else if err != nil {
return nil, err
}

for {
curNumElements := uint32(len(ret.Prefixes) + len(ret.Items))
if curNumElements >= targetNumElements {
break // We got all we want. Nothing else to do.
} else if ret.NextContinuationToken == nil {
break // We got all blobs under the prefix.
}

internalResp, err := cloud.ListBlobs(&ListBlobsInput{
Prefix: param.Prefix,
Delimiter: param.Delimiter,
MaxKeys: PUInt32(targetNumElements - curNumElements),
ContinuationToken: ret.NextContinuationToken,
// We will not set StartAfter for page requests. Only the first request might have it.
})
if err != nil {
return nil, err
}

ret = &ListBlobsOutput{
Prefixes: append(ret.Prefixes, internalResp.Prefixes...),
Items: append(ret.Items, internalResp.Items...),
NextContinuationToken: internalResp.NextContinuationToken,
IsTruncated: internalResp.IsTruncated,
RequestId: internalResp.RequestId,
}
}
return ret, nil
}

// LOCKS_REQUIRED(dh.mu)
// LOCKS_EXCLUDED(dh.inode.mu)
// LOCKS_EXCLUDED(dh.inode.fs)
Expand Down Expand Up @@ -972,7 +1020,7 @@ func (parent *Inode) isEmptyDir(fs *Goofys, name string) (isDir bool, err error)
cloud, key := parent.cloud()
key = appendChildName(key, name) + "/"

resp, err := cloud.ListBlobs(&ListBlobsInput{
resp, err := listBlobsWrapper(cloud, &ListBlobsInput{
Delimiter: aws.String("/"),
MaxKeys: PUInt32(2),
Prefix: &key,
Expand Down Expand Up @@ -1306,10 +1354,13 @@ func (parent *Inode) LookUpInodeDir(name string, c chan ListBlobsOutput, errc ch
cloud, key := parent.cloud()
key = appendChildName(key, name) + "/"

resp, err := cloud.ListBlobs(&ListBlobsInput{
resp, err := listBlobsWrapper(cloud, &ListBlobsInput{
Delimiter: aws.String("/"),
MaxKeys: PUInt32(1),
Prefix: &key,
// Ideally one result should be sufficient. But when azure hierarchical
// namespaces are enabled, azblob returns "a" when we list blobs under "a/".
// In such cases we remove "a" from the result. So request for 2 blobs.
MaxKeys: PUInt32(1),
Prefix: &key,
})

if err != nil {
Expand Down

0 comments on commit 33d0a7b

Please sign in to comment.