Skip to content

Commit eb803ec

Browse files
mukund-thakursteveloughran
authored andcommitted
CDPD-13683 HADOOP-17023. Tune S3AFileSystem.listStatus() (apache#2257)
S3AFileSystem.listStatus() is optimized for invocations where the path supplied is a non-empty directory. The number of S3 requests is significantly reduced, saving time, money, and reducing the risk of S3 throttling. Contributed by Mukund Thakur. Change-Id: Ib4daf53dc74586390b379cc93000d61839c48a2e
1 parent 556d728 commit eb803ec

File tree

9 files changed

+612
-454
lines changed

9 files changed

+612
-454
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.amazonaws.services.s3.model.S3ObjectSummary;
2525
import com.google.common.annotations.VisibleForTesting;
2626

27+
import org.apache.commons.lang3.tuple.Triple;
2728
import org.apache.hadoop.classification.InterfaceAudience;
2829
import org.apache.hadoop.fs.FileStatus;
2930
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -102,6 +103,19 @@ ProvidedFileStatusIterator createProvidedFileStatusIterator(
102103
return new ProvidedFileStatusIterator(fileStatuses, filter, acceptor);
103104
}
104105

106+
/**
107+
* Create a FileStatus iterator against a provided list of file status.
108+
* @param fileStatuses array of file status.
109+
* @return the file status iterator.
110+
*/
111+
@VisibleForTesting
112+
public static ProvidedFileStatusIterator toProvidedFileStatusIterator(
113+
S3AFileStatus[] fileStatuses) {
114+
return new ProvidedFileStatusIterator(fileStatuses,
115+
ACCEPT_ALL,
116+
Listing.ACCEPT_ALL_BUT_S3N);
117+
}
118+
105119
/**
106120
* Create a FileStatus iterator against a path, with a given list object
107121
* request.
@@ -250,7 +264,7 @@ public RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
250264
if (!forceNonAuthoritativeMS &&
251265
allowAuthoritative &&
252266
metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
253-
S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
267+
S3AFileStatus[] statuses = S3AUtils.iteratorToStatuses(
254268
metadataStoreListFilesIterator, tombstones);
255269
cachedFilesIterator = createProvidedFileStatusIterator(
256270
statuses, ACCEPT_ALL, acceptor);
@@ -329,6 +343,56 @@ public RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
329343
tombstones);
330344
}
331345

346+
/**
347+
* Calculate list of file statuses assuming path
348+
* to be a non-empty directory.
349+
* @param path input path.
350+
* @return Triple of file statuses, metaData, auth flag.
351+
* @throws IOException Any IO problems.
352+
*/
353+
public Triple<RemoteIterator<S3AFileStatus>, DirListingMetadata, Boolean>
354+
getFileStatusesAssumingNonEmptyDir(Path path)
355+
throws IOException {
356+
String key = pathToKey(path);
357+
List<S3AFileStatus> result;
358+
if (!key.isEmpty()) {
359+
key = key + '/';
360+
}
361+
362+
boolean allowAuthoritative = listingOperationCallbacks
363+
.allowAuthoritative(path);
364+
DirListingMetadata dirMeta =
365+
S3Guard.listChildrenWithTtl(
366+
getStoreContext().getMetadataStore(),
367+
path,
368+
listingOperationCallbacks.getUpdatedTtlTimeProvider(),
369+
allowAuthoritative);
370+
// In auth mode return directly with auth flag.
371+
if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
372+
ProvidedFileStatusIterator mfsItr = createProvidedFileStatusIterator(
373+
S3Guard.dirMetaToStatuses(dirMeta),
374+
ACCEPT_ALL,
375+
Listing.ACCEPT_ALL_BUT_S3N);
376+
return Triple.of(mfsItr,
377+
dirMeta, Boolean.TRUE);
378+
}
379+
380+
S3ListRequest request = createListObjectsRequest(key, "/");
381+
LOG.debug("listStatus: doing listObjects for directory {}", key);
382+
383+
FileStatusListingIterator filesItr = createFileStatusListingIterator(
384+
path,
385+
request,
386+
ACCEPT_ALL,
387+
new Listing.AcceptAllButSelfAndS3nDirs(path));
388+
389+
// return the results obtained from s3.
390+
return Triple.of(
391+
filesItr,
392+
dirMeta,
393+
Boolean.FALSE);
394+
}
395+
332396
public S3ListRequest createListObjectsRequest(String key, String delimiter) {
333397
return listingOperationCallbacks.createListObjectsRequest(key, delimiter);
334398
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 46 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.Collections;
3636
import java.util.Date;
3737
import java.util.EnumSet;
38+
import java.util.HashSet;
3839
import java.util.List;
3940
import java.util.Map;
4041
import java.util.Optional;
@@ -182,6 +183,7 @@
182183
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
183184
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
184185
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup;
186+
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.dirMetaToStatuses;
185187
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
186188

187189
/**
@@ -2627,7 +2629,9 @@ void maybeCreateFakeParentDirectory(Path path)
26272629
*/
26282630
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
26292631
IOException {
2630-
return once("listStatus", f.toString(), () -> innerListStatus(f));
2632+
return once("listStatus",
2633+
f.toString(),
2634+
() -> iteratorToStatuses(innerListStatus(f), new HashSet<>()));
26312635
}
26322636

26332637
/**
@@ -2640,51 +2644,52 @@ public FileStatus[] listStatus(Path f) throws FileNotFoundException,
26402644
* @throws IOException due to an IO problem.
26412645
* @throws AmazonClientException on failures inside the AWS SDK
26422646
*/
2643-
private S3AFileStatus[] innerListStatus(Path f) throws FileNotFoundException,
2644-
IOException, AmazonClientException {
2647+
private RemoteIterator<S3AFileStatus> innerListStatus(Path f)
2648+
throws FileNotFoundException,
2649+
IOException, AmazonClientException {
26452650
Path path = qualify(f);
2646-
String key = pathToKey(path);
26472651
LOG.debug("List status for path: {}", path);
26482652
entryPoint(INVOCATION_LIST_STATUS);
26492653

2650-
List<S3AFileStatus> result;
2651-
final S3AFileStatus fileStatus = innerGetFileStatus(path, false,
2652-
StatusProbeEnum.ALL);
2653-
2654-
if (fileStatus.isDirectory()) {
2655-
if (!key.isEmpty()) {
2656-
key = key + '/';
2654+
Triple<RemoteIterator<S3AFileStatus>, DirListingMetadata, Boolean>
2655+
statusesAssumingNonEmptyDir = listing
2656+
.getFileStatusesAssumingNonEmptyDir(path);
2657+
2658+
if (!statusesAssumingNonEmptyDir.getLeft().hasNext() &&
2659+
statusesAssumingNonEmptyDir.getRight()) {
2660+
// We are sure that this is an empty directory in auth mode.
2661+
return statusesAssumingNonEmptyDir.getLeft();
2662+
} else if (!statusesAssumingNonEmptyDir.getLeft().hasNext()) {
2663+
// We may have an empty dir, or may have file or may have nothing.
2664+
// So we call innerGetFileStatus to get the status, this may throw
2665+
// FileNotFoundException if we have nothing.
2666+
// So We are guaranteed to have either a dir marker or a file.
2667+
final S3AFileStatus fileStatus = innerGetFileStatus(path, false,
2668+
StatusProbeEnum.ALL);
2669+
// If it is a file return directly.
2670+
if (fileStatus.isFile()) {
2671+
LOG.debug("Adding: rd (not a dir): {}", path);
2672+
S3AFileStatus[] stats = new S3AFileStatus[1];
2673+
stats[0] = fileStatus;
2674+
return listing.createProvidedFileStatusIterator(
2675+
stats,
2676+
ACCEPT_ALL,
2677+
Listing.ACCEPT_ALL_BUT_S3N);
26572678
}
2658-
2659-
boolean allowAuthoritative = allowAuthoritative(f);
2660-
DirListingMetadata dirMeta =
2661-
S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
2662-
allowAuthoritative);
2663-
if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
2664-
return S3Guard.dirMetaToStatuses(dirMeta);
2665-
}
2666-
2667-
S3ListRequest request = createListObjectsRequest(key, "/");
2668-
LOG.debug("listStatus: doing listObjects for directory {}", key);
2669-
2670-
Listing.FileStatusListingIterator files =
2671-
listing.createFileStatusListingIterator(path,
2672-
request,
2673-
ACCEPT_ALL,
2674-
new Listing.AcceptAllButSelfAndS3nDirs(path));
2675-
result = new ArrayList<>(files.getBatchSize());
2676-
while (files.hasNext()) {
2677-
result.add(files.next());
2678-
}
2679-
// merge the results. This will update the store as needed
2680-
return S3Guard.dirListingUnion(metadataStore, path, result, dirMeta,
2681-
allowAuthoritative, ttlTimeProvider);
2682-
} else {
2683-
LOG.debug("Adding: rd (not a dir): {}", path);
2684-
S3AFileStatus[] stats = new S3AFileStatus[1];
2685-
stats[0]= fileStatus;
2686-
return stats;
26872679
}
2680+
// Here we have a directory which may or may not be empty.
2681+
// So we update the metastore and return.
2682+
return S3Guard.dirListingUnion(
2683+
metadataStore,
2684+
path,
2685+
statusesAssumingNonEmptyDir.getLeft(),
2686+
statusesAssumingNonEmptyDir.getMiddle(),
2687+
allowAuthoritative(path),
2688+
ttlTimeProvider, p ->
2689+
listing.createProvidedFileStatusIterator(
2690+
dirMetaToStatuses(statusesAssumingNonEmptyDir.getMiddle()),
2691+
ACCEPT_ALL,
2692+
Listing.ACCEPT_ALL_BUT_S3N));
26882693
}
26892694

26902695
/**
@@ -4472,7 +4477,7 @@ private RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
44724477
: null;
44734478
final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
44744479
listing.createProvidedFileStatusIterator(
4475-
S3Guard.dirMetaToStatuses(meta), filter, acceptor);
4480+
dirMetaToStatuses(meta), filter, acceptor);
44764481
return (allowAuthoritative && meta != null
44774482
&& meta.isAuthoritative())
44784483
? listing.createLocatedFileStatusIterator(

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.hadoop.classification.InterfaceAudience;
4343
import org.apache.hadoop.classification.InterfaceStability;
4444
import org.apache.hadoop.conf.Configuration;
45+
import org.apache.hadoop.fs.FileStatus;
4546
import org.apache.hadoop.fs.FileSystem;
4647
import org.apache.hadoop.fs.LocatedFileStatus;
4748
import org.apache.hadoop.fs.Path;
@@ -1416,6 +1417,30 @@ private static void initUserAgent(Configuration conf,
14161417
awsConf.setUserAgentPrefix(userAgent);
14171418
}
14181419

1420+
/**
1421+
* Convert the data of an iterator of {@link S3AFileStatus} to
1422+
* an array. Given tombstones are filtered out. If the iterator
1423+
* does return any item, an empty array is returned.
1424+
* @param iterator a non-null iterator
1425+
* @param tombstones
1426+
* @return a possibly-empty array of file status entries
1427+
* @throws IOException
1428+
*/
1429+
public static S3AFileStatus[] iteratorToStatuses(
1430+
RemoteIterator<S3AFileStatus> iterator, Set<Path> tombstones)
1431+
throws IOException {
1432+
List<FileStatus> statuses = new ArrayList<>();
1433+
1434+
while (iterator.hasNext()) {
1435+
S3AFileStatus status = iterator.next();
1436+
if (!tombstones.contains(status.getPath())) {
1437+
statuses.add(status);
1438+
}
1439+
}
1440+
1441+
return statuses.toArray(new S3AFileStatus[0]);
1442+
}
1443+
14191444
/**
14201445
* An interface for use in lambda-expressions working with
14211446
* directory tree listings.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -295,30 +295,6 @@ public static BulkOperationState initiateBulkWrite(
295295
}
296296
}
297297

298-
/**
299-
* Convert the data of an iterator of {@link S3AFileStatus} to
300-
* an array. Given tombstones are filtered out. If the iterator
301-
* does return any item, an empty array is returned.
302-
* @param iterator a non-null iterator
303-
* @param tombstones
304-
* @return a possibly-empty array of file status entries
305-
* @throws IOException
306-
*/
307-
public static S3AFileStatus[] iteratorToStatuses(
308-
RemoteIterator<S3AFileStatus> iterator, Set<Path> tombstones)
309-
throws IOException {
310-
List<FileStatus> statuses = new ArrayList<>();
311-
312-
while (iterator.hasNext()) {
313-
S3AFileStatus status = iterator.next();
314-
if (!tombstones.contains(status.getPath())) {
315-
statuses.add(status);
316-
}
317-
}
318-
319-
return statuses.toArray(new S3AFileStatus[0]);
320-
}
321-
322298
/**
323299
* Convert the data of a directory listing to an array of {@link FileStatus}
324300
* entries. Tombstones are filtered out at this point. If the listing is null
@@ -359,17 +335,22 @@ public static S3AFileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) {
359335
* @param dirMeta Directory listing from MetadataStore. May be null.
360336
* @param isAuthoritative State of authoritative mode
361337
* @param timeProvider Time provider to use when updating entries
338+
* @param toStatusItr function to convert array of file status to
339+
* RemoteIterator.
362340
* @return Final result of directory listing.
363341
* @throws IOException if metadata store update failed
364342
*/
365-
public static S3AFileStatus[] dirListingUnion(MetadataStore ms, Path path,
366-
List<S3AFileStatus> backingStatuses, DirListingMetadata dirMeta,
367-
boolean isAuthoritative, ITtlTimeProvider timeProvider)
368-
throws IOException {
343+
public static RemoteIterator<S3AFileStatus> dirListingUnion(
344+
MetadataStore ms, Path path,
345+
RemoteIterator<S3AFileStatus> backingStatuses,
346+
DirListingMetadata dirMeta, boolean isAuthoritative,
347+
ITtlTimeProvider timeProvider,
348+
Function<S3AFileStatus[], RemoteIterator<S3AFileStatus>> toStatusItr)
349+
throws IOException {
369350

370351
// Fast-path for NullMetadataStore
371352
if (isNullMetadataStore(ms)) {
372-
return backingStatuses.toArray(new S3AFileStatus[backingStatuses.size()]);
353+
return backingStatuses;
373354
}
374355

375356
assertQualified(path);
@@ -410,7 +391,7 @@ public static S3AFileStatus[] dirListingUnion(MetadataStore ms, Path path,
410391
}
411392
IOUtils.cleanupWithLogger(LOG, operationState);
412393

413-
return dirMetaToStatuses(dirMeta);
394+
return toStatusItr.apply(dirMetaToStatuses(dirMeta));
414395
}
415396

416397
/**
@@ -429,7 +410,7 @@ public static S3AFileStatus[] dirListingUnion(MetadataStore ms, Path path,
429410
private static void authoritativeUnion(
430411
final MetadataStore ms,
431412
final Path path,
432-
final List<S3AFileStatus> backingStatuses,
413+
final RemoteIterator<S3AFileStatus> backingStatuses,
433414
final DirListingMetadata dirMeta,
434415
final ITtlTimeProvider timeProvider,
435416
final BulkOperationState operationState) throws IOException {
@@ -440,7 +421,8 @@ private static void authoritativeUnion(
440421
Set<Path> deleted = dirMeta.listTombstones();
441422
final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
442423
.collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
443-
for (S3AFileStatus s : backingStatuses) {
424+
while (backingStatuses.hasNext()) {
425+
S3AFileStatus s = backingStatuses.next();
444426
final Path statusPath = s.getPath();
445427
if (deleted.contains(statusPath)) {
446428
continue;
@@ -493,16 +475,17 @@ private static void authoritativeUnion(
493475
private static void nonAuthoritativeUnion(
494476
final MetadataStore ms,
495477
final Path path,
496-
final List<S3AFileStatus> backingStatuses,
478+
final RemoteIterator<S3AFileStatus> backingStatuses,
497479
final DirListingMetadata dirMeta,
498480
final ITtlTimeProvider timeProvider,
499481
final BulkOperationState operationState) throws IOException {
500-
List<PathMetadata> entriesToAdd = new ArrayList<>(backingStatuses.size());
482+
List<PathMetadata> entriesToAdd = new ArrayList<>();
501483
Set<Path> deleted = dirMeta.listTombstones();
502484

503485
final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
504486
.collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
505-
for (S3AFileStatus s : backingStatuses) {
487+
while (backingStatuses.hasNext()) {
488+
S3AFileStatus s = backingStatuses.next();
506489
final Path statusPath = s.getPath();
507490
if (deleted.contains(statusPath)) {
508491
continue;

0 commit comments

Comments
 (0)