Skip to content

Commit c9d65aa

Browse files
committed
review comments part 2: move executor->abfsStore
1 parent a718cbd commit c9d65aa

File tree

4 files changed

+38
-28
lines changed

4 files changed

+38
-28
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,6 @@ public class AzureBlobFileSystem extends FileSystem
127127
private String clientCorrelationId;
128128
private TracingHeaderFormat tracingHeaderFormat;
129129
private Listener listener;
130-
private final ExecutorService contentSummaryExecutorService = Executors.newCachedThreadPool();
131-
// private final ExecutorService contentSummaryExecutorService = new ThreadPoolExecutor(
132-
// 0, 16, 5, TimeUnit.SECONDS, new SynchronousQueue<>());
133130

134131
@Override
135132
public void initialize(URI uri, Configuration configuration)
@@ -453,8 +450,7 @@ public ContentSummary getContentSummary(Path path) throws IOException {
453450
TracingContext tracingContext = new TracingContext(clientCorrelationId,
454451
fileSystemId, FSOperationType.GET_CONTENT_SUMMARY, true,
455452
tracingHeaderFormat, listener);
456-
return (new ContentSummaryProcessor(abfsStore,
457-
contentSummaryExecutorService)).getContentSummary(path,
453+
return (new ContentSummaryProcessor(abfsStore)).getContentSummary(path,
458454
tracingContext);
459455
} catch (InterruptedException e) {
460456
LOG.debug("Thread interrupted");

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import java.util.Set;
5252
import java.util.WeakHashMap;
5353
import java.util.concurrent.ExecutionException;
54+
import java.util.concurrent.ExecutorService;
55+
import java.util.concurrent.Executors;
5456

5557
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
5658
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
@@ -166,6 +168,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
166168
private final IdentityTransformerInterface identityTransformer;
167169
private final AbfsPerfTracker abfsPerfTracker;
168170
private final AbfsCounters abfsCounters;
171+
private final ExecutorService contentSummaryExecutorService = Executors.newCachedThreadPool();
169172

170173
/**
171174
* The set of directories where we should store files as append blobs.
@@ -1669,6 +1672,10 @@ private AbfsPerfInfo startTracking(String callerName, String calleeName) {
16691672
return new AbfsPerfInfo(abfsPerfTracker, callerName, calleeName);
16701673
}
16711674

1675+
public ExecutorService getContentSummaryExecutorService() {
1676+
return contentSummaryExecutorService;
1677+
}
1678+
16721679
private static class VersionedFileStatus extends FileStatus {
16731680
private final String version;
16741681

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ContentSummaryProcessor.java

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,20 @@
2929
import java.util.concurrent.atomic.AtomicInteger;
3030
import java.util.concurrent.atomic.AtomicLong;
3131

32-
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
3332
import org.slf4j.Logger;
3433
import org.slf4j.LoggerFactory;
3534

3635
import org.apache.hadoop.fs.ContentSummary;
3736
import org.apache.hadoop.fs.FileStatus;
3837
import org.apache.hadoop.fs.Path;
38+
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
39+
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
3940

41+
/**
42+
* Class to carry out parallelized recursive listing on a given path to
43+
* collect directory and file count/size information, as part of the
44+
* implementation for the Filesystem method getContentSummary
45+
*/
4046
public class ContentSummaryProcessor {
4147
private static final int MAX_THREAD_COUNT = 16;
4248
private static final int POLL_TIMEOUT = 100;
@@ -56,33 +62,34 @@ public class ContentSummaryProcessor {
5662
* @param abfsStore Instance of AzureBlobFileSystemStore, used to make
5763
* listStatus calls to server
5864
*/
59-
public ContentSummaryProcessor(ListingSupport abfsStore,
60-
ExecutorService executorService) {
65+
public ContentSummaryProcessor(ListingSupport abfsStore) {
6166
this.abfsStore = abfsStore;
62-
this.executorService = executorService;
67+
this.executorService = ((AzureBlobFileSystemStore) abfsStore).getContentSummaryExecutorService();
6368
completionService = new ExecutorCompletionService<>(this.executorService);
6469
}
6570

66-
public ContentSummary getContentSummary(Path path, TracingContext tracingContext)
67-
throws IOException, ExecutionException, InterruptedException {
68-
try {
69-
processDirectoryTree(path, tracingContext);
70-
while (!queue.isEmpty() || numTasks.get() > 0) {
71-
try {
72-
completionService.take().get();
73-
} finally {
74-
numTasks.decrementAndGet();
75-
LOG.debug("FileStatus queue size = {}, number of submitted unfinished tasks = {}, active thread count = {}",
76-
queue.size(), numTasks, ((ThreadPoolExecutor) executorService).getActiveCount());
77-
}
71+
public ContentSummary getContentSummary(Path path,
72+
TracingContext tracingContext)
73+
throws IOException, ExecutionException, InterruptedException {
74+
75+
processDirectoryTree(path, tracingContext);
76+
while (!queue.isEmpty() || numTasks.get() > 0) {
77+
try {
78+
completionService.take().get();
79+
} finally {
80+
numTasks.decrementAndGet();
81+
LOG.debug(
82+
"FileStatus queue size = {}, number of submitted unfinished tasks"
83+
+ " = {}, active thread count = {}",
84+
queue.size(), numTasks,
85+
((ThreadPoolExecutor) executorService).getActiveCount());
7886
}
79-
} finally {
80-
executorService.shutdownNow();
81-
LOG.debug("Executor shutdown");
8287
}
88+
8389
LOG.debug("Processed content summary of subtree under given path");
84-
ContentSummary.Builder builder = new ContentSummary.Builder()
85-
.directoryCount(directoryCount.get()).fileCount(fileCount.get())
90+
ContentSummary.Builder builder =
91+
new ContentSummary.Builder().directoryCount(
92+
directoryCount.get()).fileCount(fileCount.get())
8693
.length(totalBytes.get()).spaceConsumed(totalBytes.get());
8794
return builder.build();
8895
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestGetContentSummary.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ private void populateDirWithFiles(Path directory, int numFiles)
208208
final List<Future<Void>> tasks = new ArrayList<>();
209209
ExecutorService es = Executors.newFixedThreadPool(10);
210210
for (int i = 0; i < numFiles; i++) {
211-
final Path fileName = new Path(directory + "/test" + i);
211+
final Path fileName = new Path(directory, String.format("test-%02d", i));
212212
tasks.add(es.submit(() -> {
213213
touch(fileName);
214214
return null;
@@ -218,7 +218,7 @@ private void populateDirWithFiles(Path directory, int numFiles)
218218
task.get();
219219
}
220220
try (FSDataOutputStream out = getFileSystem().append(
221-
new Path(directory + "/test0"))) {
221+
new Path(directory + "/test-00"))) {
222222
out.write(b);
223223
}
224224
es.shutdownNow();

0 commit comments

Comments
 (0)