Skip to content

Commit 5f34271

Browse files
authored
HADOOP-17475. ABFS : add high performance listStatusIterator (#2548)
The ABFS connector now implements listStatusIterator() with asynchronous prefetching of the next page(s) of results. For listing large directories this can provide tangible speedups. If for any reason this needs to be disabled, set fs.azure.enable.abfslistiterator to false. Contributed by Bilahari T H.
1 parent c174141 commit 5f34271

File tree

9 files changed

+643
-10
lines changed

9 files changed

+643
-10
lines changed

hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,13 @@
7474
<Class name="org.apache.hadoop.fs.azure.FileMetadata" />
7575
<Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS" />
7676
</Match>
77+
78+
<!-- continuation is returned from an external http call. Keeping this
79+
outside synchronized block since the same is costly. -->
80+
<Match>
81+
<Class name="org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator" />
82+
<Field name="continuation" />
83+
<Bug pattern="IS2_INCONSISTENT_SYNC" />
84+
</Match>
85+
7786
</FindBugsFilter>

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,10 @@ public class AbfsConfiguration{
275275
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
276276
private long sasTokenRenewPeriodForStreamsInSeconds;
277277

278+
@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
279+
FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR)
280+
private boolean enableAbfsListIterator;
281+
278282
public AbfsConfiguration(final Configuration rawConfig, String accountName)
279283
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
280284
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
@@ -896,6 +900,10 @@ public int getMaxWriteRequestsToQueue() {
896900
return this.maxWriteRequestsToQueue;
897901
}
898902

903+
public boolean enableAbfsListIterator() {
904+
return this.enableAbfsListIterator;
905+
}
906+
899907
@VisibleForTesting
900908
void setReadBufferSize(int bufferSize) {
901909
this.readBufferSize = bufferSize;
@@ -961,4 +969,9 @@ public void setOptimizeFooterRead(boolean optimizeFooterRead) {
961969
this.optimizeFooterRead = optimizeFooterRead;
962970
}
963971

972+
@VisibleForTesting
973+
public void setEnableAbfsListIterator(boolean enableAbfsListIterator) {
974+
this.enableAbfsListIterator = enableAbfsListIterator;
975+
}
976+
964977
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import org.apache.commons.lang3.ArrayUtils;
4747
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
4848
import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
49+
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
50+
import org.apache.hadoop.fs.RemoteIterator;
4951
import org.apache.hadoop.classification.InterfaceStability;
5052
import org.apache.hadoop.conf.Configuration;
5153
import org.apache.hadoop.fs.BlockLocation;
@@ -79,6 +81,7 @@
7981
import org.apache.hadoop.security.AccessControlException;
8082
import org.apache.hadoop.security.token.Token;
8183
import org.apache.hadoop.security.UserGroupInformation;
84+
import org.apache.hadoop.util.functional.RemoteIterators;
8285
import org.apache.hadoop.util.Progressable;
8386

8487
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
@@ -983,6 +986,19 @@ public boolean exists(Path f) throws IOException {
983986
return super.exists(f);
984987
}
985988

989+
@Override
990+
public RemoteIterator<FileStatus> listStatusIterator(Path path)
991+
throws IOException {
992+
LOG.debug("AzureBlobFileSystem.listStatusIterator path : {}", path);
993+
if (abfsStore.getAbfsConfiguration().enableAbfsListIterator()) {
994+
AbfsListStatusRemoteIterator abfsLsItr =
995+
new AbfsListStatusRemoteIterator(getFileStatus(path), abfsStore);
996+
return RemoteIterators.typeCastingRemoteIterator(abfsLsItr);
997+
} else {
998+
return super.listStatusIterator(path);
999+
}
1000+
}
1001+
9861002
private FileStatus tryGetFileStatus(final Path f) {
9871003
try {
9881004
return getFileStatus(f);

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
103103
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
104104
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
105+
import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
105106
import org.apache.hadoop.fs.azurebfs.utils.Base64;
106107
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
107108
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
@@ -131,7 +132,7 @@
131132
*/
132133
@InterfaceAudience.Public
133134
@InterfaceStability.Evolving
134-
public class AzureBlobFileSystemStore implements Closeable {
135+
public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
135136
private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystemStore.class);
136137

137138
private AbfsClient client;
@@ -838,6 +839,7 @@ public FileStatus getFileStatus(final Path path) throws IOException {
838839
* @param path The list path.
839840
* @return the entries in the path.
840841
* */
842+
@Override
841843
public FileStatus[] listStatus(final Path path) throws IOException {
842844
return listStatus(path, null);
843845
}
@@ -854,7 +856,17 @@ public FileStatus[] listStatus(final Path path) throws IOException {
854856
* @return the entries in the path start from "startFrom" in lexical order.
855857
* */
856858
@InterfaceStability.Unstable
859+
@Override
857860
public FileStatus[] listStatus(final Path path, final String startFrom) throws IOException {
861+
List<FileStatus> fileStatuses = new ArrayList<>();
862+
listStatus(path, startFrom, fileStatuses, true, null);
863+
return fileStatuses.toArray(new FileStatus[fileStatuses.size()]);
864+
}
865+
866+
@Override
867+
public String listStatus(final Path path, final String startFrom,
868+
List<FileStatus> fileStatuses, final boolean fetchAll,
869+
String continuation) throws IOException {
858870
final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
859871
long countAggregate = 0;
860872
boolean shouldContinue = true;
@@ -865,16 +877,16 @@ public FileStatus[] listStatus(final Path path, final String startFrom) throws I
865877
startFrom);
866878

867879
final String relativePath = getRelativePath(path);
868-
String continuation = null;
869880

870-
// generate continuation token if a valid startFrom is provided.
871-
if (startFrom != null && !startFrom.isEmpty()) {
872-
continuation = getIsNamespaceEnabled()
873-
? generateContinuationTokenForXns(startFrom)
874-
: generateContinuationTokenForNonXns(relativePath, startFrom);
881+
if (continuation == null || continuation.isEmpty()) {
882+
// generate continuation token if a valid startFrom is provided.
883+
if (startFrom != null && !startFrom.isEmpty()) {
884+
continuation = getIsNamespaceEnabled()
885+
? generateContinuationTokenForXns(startFrom)
886+
: generateContinuationTokenForNonXns(relativePath, startFrom);
887+
}
875888
}
876889

877-
ArrayList<FileStatus> fileStatuses = new ArrayList<>();
878890
do {
879891
try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
880892
AbfsRestOperation op = client.listPath(relativePath, false,
@@ -928,15 +940,16 @@ public FileStatus[] listStatus(final Path path, final String startFrom) throws I
928940

929941
perfInfo.registerSuccess(true);
930942
countAggregate++;
931-
shouldContinue = continuation != null && !continuation.isEmpty();
943+
shouldContinue =
944+
fetchAll && continuation != null && !continuation.isEmpty();
932945

933946
if (!shouldContinue) {
934947
perfInfo.registerAggregates(startAggregate, countAggregate);
935948
}
936949
}
937950
} while (shouldContinue);
938951

939-
return fileStatuses.toArray(new FileStatus[fileStatuses.size()]);
952+
return continuation;
940953
}
941954

942955
// generate continuation token for xns account

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ public final class ConfigurationKeys {
130130
public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement";
131131
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider";
132132
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
133+
/** Setting this true will make the driver use it's own RemoteIterator implementation */
134+
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
133135

134136
/** End point of ABFS account: {@value}. */
135137
public static final String AZURE_ABFS_ENDPOINT = "fs.azure.abfs.endpoint";

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,5 +101,7 @@ public final class FileSystemConfigurations {
101101
public static final boolean DEFAULT_DELETE_CONSIDERED_IDEMPOTENT = true;
102102
public static final int DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS = 5 * 60 * 1000; // 5 mins
103103

104+
public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
105+
104106
private FileSystemConfigurations() {}
105107
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.services;
20+
21+
import java.io.IOException;
22+
import java.util.ArrayList;
23+
import java.util.Collections;
24+
import java.util.Iterator;
25+
import java.util.List;
26+
import java.util.NoSuchElementException;
27+
import java.util.concurrent.ArrayBlockingQueue;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.TimeUnit;
30+
import javax.activation.UnsupportedDataTypeException;
31+
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
import org.apache.hadoop.fs.FileStatus;
36+
import org.apache.hadoop.fs.RemoteIterator;
37+
38+
public class AbfsListStatusRemoteIterator
39+
implements RemoteIterator<FileStatus> {
40+
41+
private static final Logger LOG = LoggerFactory
42+
.getLogger(AbfsListStatusRemoteIterator.class);
43+
44+
private static final boolean FETCH_ALL_FALSE = false;
45+
private static final int MAX_QUEUE_SIZE = 10;
46+
private static final long POLL_WAIT_TIME_IN_MS = 250;
47+
48+
private final FileStatus fileStatus;
49+
private final ListingSupport listingSupport;
50+
private final ArrayBlockingQueue<Object> iteratorsQueue;
51+
52+
private volatile boolean isAsyncInProgress = false;
53+
private boolean isIterationComplete = false;
54+
private String continuation;
55+
private Iterator<FileStatus> currIterator;
56+
57+
public AbfsListStatusRemoteIterator(final FileStatus fileStatus,
58+
final ListingSupport listingSupport) {
59+
this.fileStatus = fileStatus;
60+
this.listingSupport = listingSupport;
61+
iteratorsQueue = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
62+
currIterator = Collections.emptyIterator();
63+
fetchBatchesAsync();
64+
}
65+
66+
@Override
67+
public boolean hasNext() throws IOException {
68+
if (currIterator.hasNext()) {
69+
return true;
70+
}
71+
currIterator = getNextIterator();
72+
return currIterator.hasNext();
73+
}
74+
75+
@Override
76+
public FileStatus next() throws IOException {
77+
if (!this.hasNext()) {
78+
throw new NoSuchElementException();
79+
}
80+
return currIterator.next();
81+
}
82+
83+
private Iterator<FileStatus> getNextIterator() throws IOException {
84+
fetchBatchesAsync();
85+
try {
86+
Object obj = null;
87+
while (obj == null
88+
&& (!isIterationComplete || !iteratorsQueue.isEmpty())) {
89+
obj = iteratorsQueue.poll(POLL_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS);
90+
}
91+
if (obj == null) {
92+
return Collections.emptyIterator();
93+
} else if (obj instanceof Iterator) {
94+
return (Iterator<FileStatus>) obj;
95+
} else if (obj instanceof IOException) {
96+
throw (IOException) obj;
97+
} else {
98+
throw new UnsupportedDataTypeException();
99+
}
100+
} catch (InterruptedException e) {
101+
Thread.currentThread().interrupt();
102+
LOG.error("Thread got interrupted: {}", e);
103+
throw new IOException(e);
104+
}
105+
}
106+
107+
private void fetchBatchesAsync() {
108+
if (isAsyncInProgress || isIterationComplete) {
109+
return;
110+
}
111+
synchronized (this) {
112+
if (isAsyncInProgress || isIterationComplete) {
113+
return;
114+
}
115+
isAsyncInProgress = true;
116+
}
117+
CompletableFuture.runAsync(() -> asyncOp());
118+
}
119+
120+
private void asyncOp() {
121+
try {
122+
while (!isIterationComplete && iteratorsQueue.size() <= MAX_QUEUE_SIZE) {
123+
addNextBatchIteratorToQueue();
124+
}
125+
} catch (IOException ioe) {
126+
LOG.error("Fetching filestatuses failed", ioe);
127+
try {
128+
iteratorsQueue.put(ioe);
129+
} catch (InterruptedException interruptedException) {
130+
Thread.currentThread().interrupt();
131+
LOG.error("Thread got interrupted: {}", interruptedException);
132+
}
133+
} catch (InterruptedException e) {
134+
Thread.currentThread().interrupt();
135+
LOG.error("Thread got interrupted: {}", e);
136+
} finally {
137+
synchronized (this) {
138+
isAsyncInProgress = false;
139+
}
140+
}
141+
}
142+
143+
private void addNextBatchIteratorToQueue()
144+
throws IOException, InterruptedException {
145+
List<FileStatus> fileStatuses = new ArrayList<>();
146+
continuation = listingSupport
147+
.listStatus(fileStatus.getPath(), null, fileStatuses, FETCH_ALL_FALSE,
148+
continuation);
149+
if (!fileStatuses.isEmpty()) {
150+
iteratorsQueue.put(fileStatuses.iterator());
151+
}
152+
synchronized (this) {
153+
if (continuation == null || continuation.isEmpty()) {
154+
isIterationComplete = true;
155+
}
156+
}
157+
}
158+
159+
}

0 commit comments

Comments
 (0)