Skip to content

Commit 66adc68

Browse files
authored
HADOOP-19572. [ABFS][BugFix] Empty Page Issue on Subsequent ListBlob call with NextMarker (#7698)
Contributed by Anuj Modi Reviewed by Anmol Asrani, Manish Bhatt, Manika Joshi Signed off by Anuj Mod<anujmodi@apache.org>
1 parent d491f0b commit 66adc68

File tree

11 files changed

+324
-98
lines changed

11 files changed

+324
-98
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@
7676
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
7777
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
7878
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
79-
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
8079
import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
8180
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
8281
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
@@ -118,7 +117,6 @@
118117
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
119118
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
120119
import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
121-
import org.apache.hadoop.fs.azurebfs.utils.ListUtils;
122120
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
123121
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
124122
import org.apache.hadoop.fs.impl.BackReference;
@@ -1298,13 +1296,8 @@ public String listStatus(final Path path, final String startFrom,
12981296
}
12991297
} while (shouldContinue);
13001298

1301-
if (listingClient instanceof AbfsBlobClient) {
1302-
fileStatuses.addAll(ListUtils.getUniqueListResult(fileStatusList));
1303-
LOG.debug("ListBlob API returned a total of {} elements including duplicates."
1304-
+ "Number of unique Elements are {}", fileStatusList.size(), fileStatuses.size());
1305-
} else {
1306-
fileStatuses.addAll(fileStatusList);
1307-
}
1299+
fileStatuses.addAll(listingClient.postListProcessing(
1300+
relativePath, fileStatusList, tracingContext, uri));
13081301

13091302
return continuation;
13101303
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

Lines changed: 45 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151

5252
import org.apache.commons.io.IOUtils;
5353
import org.apache.commons.lang3.StringUtils;
54+
import org.apache.hadoop.fs.FileStatus;
5455
import org.apache.hadoop.fs.FileSystem;
5556
import org.apache.hadoop.classification.VisibleForTesting;
5657
import org.apache.hadoop.fs.Path;
@@ -77,6 +78,7 @@
7778
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
7879
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
7980
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
81+
import org.apache.hadoop.fs.azurebfs.utils.ListUtils;
8082
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
8183

8284
import static java.net.HttpURLConnection.HTTP_CONFLICT;
@@ -348,13 +350,9 @@ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext)
348350
*/
349351
@Override
350352
public ListResponseData listPath(final String relativePath, final boolean recursive,
351-
final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri) throws IOException {
352-
return listPath(relativePath, recursive, listMaxResults, continuation, tracingContext, uri, true);
353-
}
354-
355-
public ListResponseData listPath(final String relativePath, final boolean recursive,
356-
final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri, boolean is404CheckRequired)
353+
final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri)
357354
throws AzureBlobFileSystemException {
355+
358356
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
359357

360358
AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
@@ -400,37 +398,46 @@ public ListResponseData listPath(final String relativePath, final boolean recurs
400398
listResponseData.setOp(retryListOp);
401399
}
402400
}
401+
return listResponseData;
402+
}
403403

404-
if (isEmptyListResults(listResponseData) && is404CheckRequired) {
404+
/**
405+
* Post-processing of the list operation on Blob endpoint.
406+
* There are two client handing to be done on list output.
407+
* 1. Empty List returned on server could potentially mean path is a file.
408+
* 2. There can be duplicates returned from the server for explicit non-empty directory.
409+
* @param relativePath relative path to be listed.
410+
* @param fileStatuses list of file statuses returned from the server.
411+
* @param tracingContext tracing context to trace server calls.
412+
* @param uri URI to be used for path conversion.
413+
* @return rectified list of file statuses.
414+
* @throws AzureBlobFileSystemException if any failure occurs.
415+
*/
416+
@Override
417+
public List<FileStatus> postListProcessing(String relativePath, List<FileStatus> fileStatuses,
418+
TracingContext tracingContext, URI uri) throws AzureBlobFileSystemException {
419+
List<FileStatus> rectifiedFileStatuses = new ArrayList<>();
420+
if (fileStatuses.isEmpty() && !ROOT_PATH.equals(relativePath)) {
405421
// If the list operation returns no paths, we need to check if the path is a file.
406422
// If it is a file, we need to return the file in the list.
423+
// If it is a directory or root path, we need to return an empty list.
407424
// If it is a non-existing path, we need to throw a FileNotFoundException.
408-
if (relativePath.equals(ROOT_PATH)) {
409-
// Root Always exists as directory. It can be an empty listing.
410-
return listResponseData;
411-
}
412425
AbfsRestOperation pathStatus = this.getPathStatus(relativePath, tracingContext, null, false);
413426
BlobListResultSchema listResultSchema = getListResultSchemaFromPathStatus(relativePath, pathStatus);
414-
LOG.debug("ListBlob attempted on a file path. Returning file status.");
415-
List<VersionedFileStatus> fileStatusList = new ArrayList<>();
427+
LOG.debug("ListStatus attempted on a file path {}. Returning file status.", relativePath);
416428
for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
417-
fileStatusList.add(getVersionedFileStatusFromEntry(entry, uri));
429+
rectifiedFileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
418430
}
419-
AbfsRestOperation listOp = getAbfsRestOperation(
420-
AbfsRestOperationType.ListBlobs,
421-
HTTP_METHOD_GET,
422-
url,
423-
requestHeaders);
424-
listOp.hardSetGetListStatusResult(HTTP_OK, listResultSchema);
425-
listResponseData.setFileStatusList(fileStatusList);
426-
listResponseData.setContinuationToken(null);
427-
listResponseData.setRenamePendingJsonPaths(null);
428-
listResponseData.setOp(listOp);
429-
return listResponseData;
431+
} else {
432+
// Remove duplicates from the non-empty list output only.
433+
rectifiedFileStatuses.addAll(ListUtils.getUniqueListResult(fileStatuses));
434+
LOG.debug(
435+
"ListBlob API returned a total of {} elements including duplicates."
436+
+ "Number of unique Elements are {}", fileStatuses.size(),
437+
rectifiedFileStatuses.size());
430438
}
431-
return listResponseData;
439+
return rectifiedFileStatuses;
432440
}
433-
434441
/**
435442
* Filter the paths for which no rename redo operation is performed.
436443
* Update BlobListResultSchema path with filtered entries.
@@ -2013,6 +2020,8 @@ private static String decodeMetadataAttribute(String encoded)
20132020

20142021
/**
20152022
* Checks if the listing of the specified path is non-empty.
2023+
* Since listing is incomplete as long as continuation token is returned by server,
2024+
* we need to iterate until either we get one entry or continuation token becomes null.
20162025
*
20172026
* @param path The path to be listed.
20182027
* @param tracingContext The tracing context for tracking the operation.
@@ -2024,26 +2033,15 @@ public boolean isNonEmptyDirectory(String path,
20242033
TracingContext tracingContext) throws AzureBlobFileSystemException {
20252034
// This method is only called internally to determine state of a path
20262035
// and hence don't need identity transformation to happen.
2027-
ListResponseData listResponseData = listPath(path, false, 1, null, tracingContext, null, false);
2028-
return !isEmptyListResults(listResponseData);
2029-
}
2030-
2031-
/**
2032-
* Check if the list call returned empty results without any continuation token.
2033-
* @param listResponseData The response of listing API from the server.
2034-
* @return True if empty results without continuation token.
2035-
*/
2036-
private boolean isEmptyListResults(ListResponseData listResponseData) {
2037-
AbfsHttpOperation result = listResponseData.getOp().getResult();
2038-
boolean isEmptyList = result != null && result.getStatusCode() == HTTP_OK && // List Call was successful
2039-
result.getListResultSchema() != null && // Parsing of list response was successful
2040-
listResponseData.getFileStatusList().isEmpty() && listResponseData.getRenamePendingJsonPaths().isEmpty() &&// No paths were returned
2041-
StringUtils.isEmpty(listResponseData.getContinuationToken()); // No continuation token was returned
2042-
if (isEmptyList) {
2043-
LOG.debug("List call returned empty results without any continuation token.");
2044-
return true;
2045-
}
2046-
return false;
2036+
String continuationToken = null;
2037+
List<FileStatus> fileStatusList = new ArrayList<>();
2038+
// We need to loop on continuation token until we get an entry or continuation token becomes null.
2039+
do {
2040+
ListResponseData listResponseData = listPath(path, false, 1, null, tracingContext, null);
2041+
fileStatusList.addAll(listResponseData.getFileStatusList());
2042+
continuationToken = listResponseData.getContinuationToken();
2043+
} while (StringUtils.isNotEmpty(continuationToken) && fileStatusList.isEmpty());
2044+
return !fileStatusList.isEmpty();
20472045
}
20482046

20492047
/**

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.commons.lang3.StringUtils;
5252
import org.apache.hadoop.classification.VisibleForTesting;
5353
import org.apache.hadoop.conf.Configuration;
54+
import org.apache.hadoop.fs.FileStatus;
5455
import org.apache.hadoop.fs.FileSystem;
5556
import org.apache.hadoop.fs.Path;
5657
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
@@ -164,6 +165,8 @@ public abstract class AbfsClient implements Closeable {
164165
private String clientProvidedEncryptionKey = null;
165166
private String clientProvidedEncryptionKeySHA = null;
166167
private final IdentityTransformerInterface identityTransformer;
168+
private final String userName;
169+
private String primaryUserGroup;
167170

168171
private final String accountName;
169172
private final AuthType authType;
@@ -305,6 +308,22 @@ private AbfsClient(final URL baseUrl,
305308
throw new IOException(e);
306309
}
307310
LOG.trace("IdentityTransformer init complete");
311+
312+
UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
313+
this.userName = userGroupInformation.getShortUserName();
314+
LOG.trace("UGI init complete");
315+
if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
316+
try {
317+
this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
318+
} catch (IOException ex) {
319+
LOG.error("Failed to get primary group for {}, using user name as primary group name", userName);
320+
this.primaryUserGroup = userName;
321+
}
322+
} else {
323+
//Provide a default group name
324+
this.primaryUserGroup = userName;
325+
}
326+
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
308327
}
309328

310329
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
@@ -524,6 +543,18 @@ public abstract AbfsRestOperation setFilesystemProperties(Hashtable<String, Stri
524543
public abstract ListResponseData listPath(String relativePath, boolean recursive,
525544
int listMaxResults, String continuation, TracingContext tracingContext, URI uri) throws IOException;
526545

546+
/**
547+
* Post-processing of the list operation.
548+
* @param relativePath which is used to list the blobs.
549+
* @param fileStatuses list of file statuses to be processed.
550+
* @param tracingContext for tracing the server calls.
551+
* @param uri to be used for the path conversion.
552+
* @return list of file statuses to be returned.
553+
* @throws AzureBlobFileSystemException if rest operation fails.
554+
*/
555+
public abstract List<FileStatus> postListProcessing(String relativePath,
556+
List<FileStatus> fileStatuses, TracingContext tracingContext, URI uri) throws AzureBlobFileSystemException;
557+
527558
/**
528559
* Retrieves user-defined metadata on filesystem.
529560
* @param tracingContext for tracing the server calls.
@@ -1772,37 +1803,6 @@ protected AbfsRestOperation getSuccessOp(final AbfsRestOperationType operationTy
17721803
return successOp;
17731804
}
17741805

1775-
/**
1776-
* Get the primary user group name.
1777-
* @return primary user group name
1778-
* @throws AzureBlobFileSystemException if unable to get the primary user group
1779-
*/
1780-
private String getPrimaryUserGroup() throws AzureBlobFileSystemException {
1781-
if (!getAbfsConfiguration().getSkipUserGroupMetadataDuringInitialization()) {
1782-
try {
1783-
return UserGroupInformation.getCurrentUser().getPrimaryGroupName();
1784-
} catch (IOException ex) {
1785-
LOG.error("Failed to get primary group for {}, using user name as primary group name",
1786-
getPrimaryUser());
1787-
}
1788-
}
1789-
//Provide a default group name
1790-
return getPrimaryUser();
1791-
}
1792-
1793-
/**
1794-
* Get the primary username.
1795-
* @return primary username
1796-
* @throws AzureBlobFileSystemException if unable to get the primary user
1797-
*/
1798-
private String getPrimaryUser() throws AzureBlobFileSystemException {
1799-
try {
1800-
return UserGroupInformation.getCurrentUser().getUserName();
1801-
} catch (IOException ex) {
1802-
throw new AbfsDriverException(ex);
1803-
}
1804-
}
1805-
18061806
/**
18071807
* Creates a VersionedFileStatus object from the ListResultEntrySchema.
18081808
* @param entry ListResultEntrySchema object.
@@ -1816,10 +1816,10 @@ protected VersionedFileStatus getVersionedFileStatusFromEntry(
18161816
String owner = null, group = null;
18171817
try{
18181818
if (identityTransformer != null) {
1819-
owner = identityTransformer.transformIdentityForGetRequest(
1820-
entry.owner(), true, getPrimaryUser());
1821-
group = identityTransformer.transformIdentityForGetRequest(
1822-
entry.group(), false, getPrimaryUserGroup());
1819+
owner = identityTransformer.transformIdentityForGetRequest(entry.owner(),
1820+
true, userName);
1821+
group = identityTransformer.transformIdentityForGetRequest(entry.group(),
1822+
false, primaryUserGroup);
18231823
}
18241824
} catch (IOException ex) {
18251825
LOG.error("Failed to get owner/group for path {}", entry.name(), ex);

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.fasterxml.jackson.databind.ObjectMapper;
4646

4747
import org.apache.hadoop.classification.VisibleForTesting;
48+
import org.apache.hadoop.fs.FileStatus;
4849
import org.apache.hadoop.fs.FileSystem;
4950
import org.apache.hadoop.fs.Path;
5051
import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -348,6 +349,21 @@ public ListResponseData listPath(final String relativePath,
348349
return listResponseData;
349350
}
350351

352+
/**
353+
* Non-functional implementation.
354+
* Client side handling to remove duplicates not needed in DFSClient.
355+
* @param relativePath on which listing was attempted.
356+
* @param fileStatuses result of listing operation.
357+
* @param tracingContext for tracing the server calls.
358+
* @param uri to be used for path conversion.
359+
* @return fileStatuses as it is without any processing.
360+
*/
361+
@Override
362+
public List<FileStatus> postListProcessing(String relativePath,
363+
List<FileStatus> fileStatuses, TracingContext tracingContext, URI uri){
364+
return fileStatuses;
365+
}
366+
351367
/**
352368
* Get Rest Operation for API
353369
* <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create">

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -445,8 +445,8 @@ public void testSetFileSystemProperties() throws Exception {
445445
//Test list and delete operation on implicit paths
446446
public void testListAndDeleteImplicitPaths() throws Exception {
447447
AzureBlobFileSystem fs = getFileSystem();
448-
AbfsBlobClient client = ((AbfsBlobClient) getFileSystem().getAbfsClient());
449448
assumeBlobServiceType();
449+
AbfsBlobClient client = ((AbfsBlobClient) getFileSystem().getAbfsClient());
450450

451451
Path file1 = new Path("/testDir/dir1/file1");
452452
Path file2 = new Path("/testDir/dir1/file2");
@@ -458,7 +458,7 @@ public void testListAndDeleteImplicitPaths() throws Exception {
458458

459459
AbfsRestOperation op = client.listPath(
460460
implicitDir.toString(), false, 2, null,
461-
getTestTracingContext(getFileSystem(), false), null, false).getOp();
461+
getTestTracingContext(getFileSystem(), false), null).getOp();
462462
List<? extends ListResultEntrySchema> list = op.getResult()
463463
.getListResultSchema()
464464
.paths();

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
316316
doCallRealMethod().when((AbfsBlobClient) mockClient)
317317
.listPath(Mockito.nullable(String.class), Mockito.anyBoolean(),
318318
Mockito.anyInt(), Mockito.nullable(String.class),
319-
Mockito.nullable(TracingContext.class), Mockito.nullable(URI.class), Mockito.anyBoolean());
319+
Mockito.nullable(TracingContext.class), Mockito.nullable(URI.class));
320320
doCallRealMethod().when((AbfsBlobClient) mockClient)
321321
.getPathStatus(Mockito.nullable(String.class), Mockito.nullable(TracingContext.class),
322322
Mockito.nullable(ContextEncryptionAdapter.class), Mockito.anyBoolean());

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ public void testListStatusIsCalledForImplicitPathOnBlobEndpoint() throws Excepti
259259
fs.getFileStatus(implicitPath);
260260

261261
Mockito.verify(abfsClient, Mockito.times(1)).getPathStatus(any(), eq(false), any(), any());
262-
Mockito.verify(abfsClient, Mockito.times(1)).listPath(any(), eq(false), eq(1), any(), any(), any(), eq(false));
262+
Mockito.verify(abfsClient, Mockito.times(1)).listPath(any(), eq(false), eq(1), any(), any(), any());
263263
}
264264

265265
/**

0 commit comments

Comments
 (0)