Skip to content

Commit f16470a

Browse files
authored
Merge branch 'apache:trunk' into HADOOP-19571
2 parents ad21951 + 66adc68 commit f16470a

File tree

13 files changed

+365
-110
lines changed

13 files changed

+365
-110
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import software.amazon.awssdk.transfer.s3.model.FileUpload;
5858
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
5959

60+
import org.apache.commons.lang3.StringUtils;
6061
import org.apache.hadoop.conf.Configuration;
6162
import org.apache.hadoop.fs.FileSystem;
6263
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -287,10 +288,11 @@ public boolean inputStreamHasCapability(final String capability) {
287288
* Initialize dir allocator if not already initialized.
288289
*/
289290
private void initLocalDirAllocator() {
290-
String bufferDir = getConfig().get(BUFFER_DIR) != null
291-
? BUFFER_DIR
292-
: HADOOP_TMP_DIR;
293-
directoryAllocator = new LocalDirAllocator(bufferDir);
291+
String key = BUFFER_DIR;
292+
if (StringUtils.isEmpty(getConfig().getTrimmed(key))) {
293+
key = HADOOP_TMP_DIR;
294+
}
295+
directoryAllocator = new LocalDirAllocator(key);
294296
}
295297

296298
/** Acquire write capacity for rate limiting {@inheritDoc}. */

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.commons.lang3.reflect.FieldUtils;
5050
import org.apache.hadoop.conf.Configuration;
5151
import org.apache.hadoop.fs.FileStatus;
52+
import org.apache.hadoop.fs.LocalDirAllocator;
5253
import org.apache.hadoop.fs.Path;
5354
import org.apache.hadoop.fs.contract.ContractTestUtils;
5455
import org.apache.hadoop.fs.s3a.auth.STSClientFactory;
@@ -63,6 +64,7 @@
6364
import org.apache.hadoop.util.VersionInfo;
6465
import org.apache.http.HttpStatus;
6566

67+
import static java.lang.String.format;
6668
import static java.util.Objects.requireNonNull;
6769
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
6870
import static org.apache.hadoop.fs.s3a.Constants.*;
@@ -485,12 +487,29 @@ public void testCloseIdempotent() throws Throwable {
485487

486488
@Test
487489
public void testDirectoryAllocatorDefval() throws Throwable {
490+
removeAllocatorContexts();
488491
conf = new Configuration();
489-
conf.unset(Constants.BUFFER_DIR);
490-
fs = S3ATestUtils.createTestFileSystem(conf);
491-
File tmp = createTemporaryFileForWriting();
492-
assertTrue("not found: " + tmp, tmp.exists());
493-
tmp.delete();
492+
final String bucketName = getTestBucketName(conf);
493+
final String blank = " ";
494+
conf.set(Constants.BUFFER_DIR, blank);
495+
conf.set(format("fs.s3a.bucket.%s.buffer.dir", bucketName), blank);
496+
try {
497+
fs = S3ATestUtils.createTestFileSystem(conf);
498+
final Configuration fsConf = fs.getConf();
499+
Assertions.assertThat(fsConf.get(Constants.BUFFER_DIR))
500+
.describedAs("Config option %s", Constants.BUFFER_DIR)
501+
.isEqualTo(blank);
502+
File tmp = createTemporaryFileForWriting();
503+
assertTrue("not found: " + tmp, tmp.exists());
504+
tmp.delete();
505+
} finally {
506+
removeAllocatorContexts();
507+
}
508+
}
509+
510+
private static void removeAllocatorContexts() {
511+
LocalDirAllocator.removeContext(BUFFER_DIR);
512+
LocalDirAllocator.removeContext(HADOOP_TMP_DIR);
494513
}
495514

496515
/**
@@ -504,13 +523,21 @@ private File createTemporaryFileForWriting() throws IOException {
504523

505524
@Test
506525
public void testDirectoryAllocatorRR() throws Throwable {
526+
removeAllocatorContexts();
507527
File dir1 = GenericTestUtils.getRandomizedTestDir();
508528
File dir2 = GenericTestUtils.getRandomizedTestDir();
509529
dir1.mkdirs();
510530
dir2.mkdirs();
511531
conf = new Configuration();
512-
conf.set(Constants.BUFFER_DIR, dir1 + ", " + dir2);
532+
final String bucketName = getTestBucketName(conf);
533+
final String dirs = dir1 + ", " + dir2;
534+
conf.set(Constants.BUFFER_DIR, dirs);
535+
conf.set(format("fs.s3a.bucket.%s.buffer.dir", bucketName), dirs);
513536
fs = S3ATestUtils.createTestFileSystem(conf);
537+
final Configuration fsConf = fs.getConf();
538+
Assertions.assertThat(fsConf.get(Constants.BUFFER_DIR))
539+
.describedAs("Config option %s", Constants.BUFFER_DIR)
540+
.isEqualTo(dirs);
514541
File tmp1 = createTemporaryFileForWriting();
515542
tmp1.delete();
516543
File tmp2 = createTemporaryFileForWriting();
@@ -552,10 +579,10 @@ public S3AFileSystem run() throws Exception{
552579
private static <T> T getField(Object target, Class<T> fieldType,
553580
String fieldName) throws IllegalAccessException {
554581
Object obj = FieldUtils.readField(target, fieldName, true);
555-
assertNotNull(String.format(
582+
assertNotNull(format(
556583
"Could not read field named %s in object with class %s.", fieldName,
557584
target.getClass().getName()), obj);
558-
assertTrue(String.format(
585+
assertTrue(format(
559586
"Unexpected type found for field named %s, expected %s, actual %s.",
560587
fieldName, fieldType.getName(), obj.getClass().getName()),
561588
fieldType.isAssignableFrom(obj.getClass()));

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@
7676
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
7777
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
7878
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
79-
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
8079
import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
8180
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
8281
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
@@ -118,7 +117,6 @@
118117
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
119118
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
120119
import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
121-
import org.apache.hadoop.fs.azurebfs.utils.ListUtils;
122120
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
123121
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
124122
import org.apache.hadoop.fs.impl.BackReference;
@@ -1298,13 +1296,8 @@ public String listStatus(final Path path, final String startFrom,
12981296
}
12991297
} while (shouldContinue);
13001298

1301-
if (listingClient instanceof AbfsBlobClient) {
1302-
fileStatuses.addAll(ListUtils.getUniqueListResult(fileStatusList));
1303-
LOG.debug("ListBlob API returned a total of {} elements including duplicates."
1304-
+ "Number of unique Elements are {}", fileStatusList.size(), fileStatuses.size());
1305-
} else {
1306-
fileStatuses.addAll(fileStatusList);
1307-
}
1299+
fileStatuses.addAll(listingClient.postListProcessing(
1300+
relativePath, fileStatusList, tracingContext, uri));
13081301

13091302
return continuation;
13101303
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

Lines changed: 45 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151

5252
import org.apache.commons.io.IOUtils;
5353
import org.apache.commons.lang3.StringUtils;
54+
import org.apache.hadoop.fs.FileStatus;
5455
import org.apache.hadoop.fs.FileSystem;
5556
import org.apache.hadoop.classification.VisibleForTesting;
5657
import org.apache.hadoop.fs.Path;
@@ -77,6 +78,7 @@
7778
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
7879
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
7980
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
81+
import org.apache.hadoop.fs.azurebfs.utils.ListUtils;
8082
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
8183

8284
import static java.net.HttpURLConnection.HTTP_CONFLICT;
@@ -348,13 +350,9 @@ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext)
348350
*/
349351
@Override
350352
public ListResponseData listPath(final String relativePath, final boolean recursive,
351-
final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri) throws IOException {
352-
return listPath(relativePath, recursive, listMaxResults, continuation, tracingContext, uri, true);
353-
}
354-
355-
public ListResponseData listPath(final String relativePath, final boolean recursive,
356-
final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri, boolean is404CheckRequired)
353+
final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri)
357354
throws AzureBlobFileSystemException {
355+
358356
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
359357

360358
AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
@@ -400,37 +398,46 @@ public ListResponseData listPath(final String relativePath, final boolean recurs
400398
listResponseData.setOp(retryListOp);
401399
}
402400
}
401+
return listResponseData;
402+
}
403403

404-
if (isEmptyListResults(listResponseData) && is404CheckRequired) {
404+
/**
405+
* Post-processing of the list operation on Blob endpoint.
406+
* There are two client handing to be done on list output.
407+
* 1. Empty List returned on server could potentially mean path is a file.
408+
* 2. There can be duplicates returned from the server for explicit non-empty directory.
409+
* @param relativePath relative path to be listed.
410+
* @param fileStatuses list of file statuses returned from the server.
411+
* @param tracingContext tracing context to trace server calls.
412+
* @param uri URI to be used for path conversion.
413+
* @return rectified list of file statuses.
414+
* @throws AzureBlobFileSystemException if any failure occurs.
415+
*/
416+
@Override
417+
public List<FileStatus> postListProcessing(String relativePath, List<FileStatus> fileStatuses,
418+
TracingContext tracingContext, URI uri) throws AzureBlobFileSystemException {
419+
List<FileStatus> rectifiedFileStatuses = new ArrayList<>();
420+
if (fileStatuses.isEmpty() && !ROOT_PATH.equals(relativePath)) {
405421
// If the list operation returns no paths, we need to check if the path is a file.
406422
// If it is a file, we need to return the file in the list.
423+
// If it is a directory or root path, we need to return an empty list.
407424
// If it is a non-existing path, we need to throw a FileNotFoundException.
408-
if (relativePath.equals(ROOT_PATH)) {
409-
// Root Always exists as directory. It can be an empty listing.
410-
return listResponseData;
411-
}
412425
AbfsRestOperation pathStatus = this.getPathStatus(relativePath, tracingContext, null, false);
413426
BlobListResultSchema listResultSchema = getListResultSchemaFromPathStatus(relativePath, pathStatus);
414-
LOG.debug("ListBlob attempted on a file path. Returning file status.");
415-
List<VersionedFileStatus> fileStatusList = new ArrayList<>();
427+
LOG.debug("ListStatus attempted on a file path {}. Returning file status.", relativePath);
416428
for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
417-
fileStatusList.add(getVersionedFileStatusFromEntry(entry, uri));
429+
rectifiedFileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
418430
}
419-
AbfsRestOperation listOp = getAbfsRestOperation(
420-
AbfsRestOperationType.ListBlobs,
421-
HTTP_METHOD_GET,
422-
url,
423-
requestHeaders);
424-
listOp.hardSetGetListStatusResult(HTTP_OK, listResultSchema);
425-
listResponseData.setFileStatusList(fileStatusList);
426-
listResponseData.setContinuationToken(null);
427-
listResponseData.setRenamePendingJsonPaths(null);
428-
listResponseData.setOp(listOp);
429-
return listResponseData;
431+
} else {
432+
// Remove duplicates from the non-empty list output only.
433+
rectifiedFileStatuses.addAll(ListUtils.getUniqueListResult(fileStatuses));
434+
LOG.debug(
435+
"ListBlob API returned a total of {} elements including duplicates."
436+
+ "Number of unique Elements are {}", fileStatuses.size(),
437+
rectifiedFileStatuses.size());
430438
}
431-
return listResponseData;
439+
return rectifiedFileStatuses;
432440
}
433-
434441
/**
435442
* Filter the paths for which no rename redo operation is performed.
436443
* Update BlobListResultSchema path with filtered entries.
@@ -2013,6 +2020,8 @@ private static String decodeMetadataAttribute(String encoded)
20132020

20142021
/**
20152022
* Checks if the listing of the specified path is non-empty.
2023+
* Since listing is incomplete as long as continuation token is returned by server,
2024+
* we need to iterate until either we get one entry or continuation token becomes null.
20162025
*
20172026
* @param path The path to be listed.
20182027
* @param tracingContext The tracing context for tracking the operation.
@@ -2024,26 +2033,15 @@ public boolean isNonEmptyDirectory(String path,
20242033
TracingContext tracingContext) throws AzureBlobFileSystemException {
20252034
// This method is only called internally to determine state of a path
20262035
// and hence don't need identity transformation to happen.
2027-
ListResponseData listResponseData = listPath(path, false, 1, null, tracingContext, null, false);
2028-
return !isEmptyListResults(listResponseData);
2029-
}
2030-
2031-
/**
2032-
* Check if the list call returned empty results without any continuation token.
2033-
* @param listResponseData The response of listing API from the server.
2034-
* @return True if empty results without continuation token.
2035-
*/
2036-
private boolean isEmptyListResults(ListResponseData listResponseData) {
2037-
AbfsHttpOperation result = listResponseData.getOp().getResult();
2038-
boolean isEmptyList = result != null && result.getStatusCode() == HTTP_OK && // List Call was successful
2039-
result.getListResultSchema() != null && // Parsing of list response was successful
2040-
listResponseData.getFileStatusList().isEmpty() && listResponseData.getRenamePendingJsonPaths().isEmpty() &&// No paths were returned
2041-
StringUtils.isEmpty(listResponseData.getContinuationToken()); // No continuation token was returned
2042-
if (isEmptyList) {
2043-
LOG.debug("List call returned empty results without any continuation token.");
2044-
return true;
2045-
}
2046-
return false;
2036+
String continuationToken = null;
2037+
List<FileStatus> fileStatusList = new ArrayList<>();
2038+
// We need to loop on continuation token until we get an entry or continuation token becomes null.
2039+
do {
2040+
ListResponseData listResponseData = listPath(path, false, 1, null, tracingContext, null);
2041+
fileStatusList.addAll(listResponseData.getFileStatusList());
2042+
continuationToken = listResponseData.getContinuationToken();
2043+
} while (StringUtils.isNotEmpty(continuationToken) && fileStatusList.isEmpty());
2044+
return !fileStatusList.isEmpty();
20472045
}
20482046

20492047
/**

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.commons.lang3.StringUtils;
5252
import org.apache.hadoop.classification.VisibleForTesting;
5353
import org.apache.hadoop.conf.Configuration;
54+
import org.apache.hadoop.fs.FileStatus;
5455
import org.apache.hadoop.fs.FileSystem;
5556
import org.apache.hadoop.fs.Path;
5657
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
@@ -164,6 +165,8 @@ public abstract class AbfsClient implements Closeable {
164165
private String clientProvidedEncryptionKey = null;
165166
private String clientProvidedEncryptionKeySHA = null;
166167
private final IdentityTransformerInterface identityTransformer;
168+
private final String userName;
169+
private String primaryUserGroup;
167170

168171
private final String accountName;
169172
private final AuthType authType;
@@ -305,6 +308,22 @@ private AbfsClient(final URL baseUrl,
305308
throw new IOException(e);
306309
}
307310
LOG.trace("IdentityTransformer init complete");
311+
312+
UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
313+
this.userName = userGroupInformation.getShortUserName();
314+
LOG.trace("UGI init complete");
315+
if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
316+
try {
317+
this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
318+
} catch (IOException ex) {
319+
LOG.error("Failed to get primary group for {}, using user name as primary group name", userName);
320+
this.primaryUserGroup = userName;
321+
}
322+
} else {
323+
//Provide a default group name
324+
this.primaryUserGroup = userName;
325+
}
326+
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
308327
}
309328

310329
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
@@ -524,6 +543,18 @@ public abstract AbfsRestOperation setFilesystemProperties(Hashtable<String, Stri
524543
public abstract ListResponseData listPath(String relativePath, boolean recursive,
525544
int listMaxResults, String continuation, TracingContext tracingContext, URI uri) throws IOException;
526545

546+
/**
547+
* Post-processing of the list operation.
548+
* @param relativePath which is used to list the blobs.
549+
* @param fileStatuses list of file statuses to be processed.
550+
* @param tracingContext for tracing the server calls.
551+
* @param uri to be used for the path conversion.
552+
* @return list of file statuses to be returned.
553+
* @throws AzureBlobFileSystemException if rest operation fails.
554+
*/
555+
public abstract List<FileStatus> postListProcessing(String relativePath,
556+
List<FileStatus> fileStatuses, TracingContext tracingContext, URI uri) throws AzureBlobFileSystemException;
557+
527558
/**
528559
* Retrieves user-defined metadata on filesystem.
529560
* @param tracingContext for tracing the server calls.
@@ -1772,37 +1803,6 @@ protected AbfsRestOperation getSuccessOp(final AbfsRestOperationType operationTy
17721803
return successOp;
17731804
}
17741805

1775-
/**
1776-
* Get the primary user group name.
1777-
* @return primary user group name
1778-
* @throws AzureBlobFileSystemException if unable to get the primary user group
1779-
*/
1780-
private String getPrimaryUserGroup() throws AzureBlobFileSystemException {
1781-
if (!getAbfsConfiguration().getSkipUserGroupMetadataDuringInitialization()) {
1782-
try {
1783-
return UserGroupInformation.getCurrentUser().getPrimaryGroupName();
1784-
} catch (IOException ex) {
1785-
LOG.error("Failed to get primary group for {}, using user name as primary group name",
1786-
getPrimaryUser());
1787-
}
1788-
}
1789-
//Provide a default group name
1790-
return getPrimaryUser();
1791-
}
1792-
1793-
/**
1794-
* Get the primary username.
1795-
* @return primary username
1796-
* @throws AzureBlobFileSystemException if unable to get the primary user
1797-
*/
1798-
private String getPrimaryUser() throws AzureBlobFileSystemException {
1799-
try {
1800-
return UserGroupInformation.getCurrentUser().getUserName();
1801-
} catch (IOException ex) {
1802-
throw new AbfsDriverException(ex);
1803-
}
1804-
}
1805-
18061806
/**
18071807
* Creates a VersionedFileStatus object from the ListResultEntrySchema.
18081808
* @param entry ListResultEntrySchema object.
@@ -1816,10 +1816,10 @@ protected VersionedFileStatus getVersionedFileStatusFromEntry(
18161816
String owner = null, group = null;
18171817
try{
18181818
if (identityTransformer != null) {
1819-
owner = identityTransformer.transformIdentityForGetRequest(
1820-
entry.owner(), true, getPrimaryUser());
1821-
group = identityTransformer.transformIdentityForGetRequest(
1822-
entry.group(), false, getPrimaryUserGroup());
1819+
owner = identityTransformer.transformIdentityForGetRequest(entry.owner(),
1820+
true, userName);
1821+
group = identityTransformer.transformIdentityForGetRequest(entry.group(),
1822+
false, primaryUserGroup);
18231823
}
18241824
} catch (IOException ex) {
18251825
LOG.error("Failed to get owner/group for path {}", entry.name(), ex);

0 commit comments

Comments
 (0)