Skip to content

Commit f320785

Browse files
committed
clean up
1 parent aa48086 commit f320785

File tree

2 files changed

+10
-50
lines changed

2 files changed

+10
-50
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ContentSummaryProcessor.java

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323
import java.util.concurrent.ExecutionException;
2424
import java.util.concurrent.ExecutorCompletionService;
2525
import java.util.concurrent.ExecutorService;
26-
import java.util.concurrent.ForkJoinPool;
2726
import java.util.concurrent.LinkedBlockingQueue;
28-
import java.util.concurrent.RecursiveAction;
2927
import java.util.concurrent.SynchronousQueue;
3028
import java.util.concurrent.ThreadPoolExecutor;
3129
import java.util.concurrent.TimeUnit;
@@ -69,7 +67,6 @@ public ContentSummaryProcessor(ListingSupport abfsStore) {
6967

7068
public ContentSummary getContentSummary(Path path)
7169
throws IOException, ExecutionException, InterruptedException {
72-
// return gcs_fjp(path);
7370
try {
7471
processDirectoryTree(path);
7572
while (!queue.isEmpty() || numTasks.get() > 0) {
@@ -92,46 +89,6 @@ public ContentSummary getContentSummary(Path path)
9289
return builder.build();
9390
}
9491

95-
class GCS extends RecursiveAction {
96-
Path path;
97-
GCS (Path path) {
98-
this.path = path;
99-
}
100-
@Override
101-
protected void compute() {
102-
try {
103-
for (FileStatus fileStatus1 : abfsStore.listStatus(path)) {
104-
if (fileStatus1.isDirectory()) {
105-
processDirectory();
106-
new GCS(fileStatus1.getPath()).fork();
107-
} else {
108-
processFile(fileStatus1);
109-
}
110-
}
111-
} catch (IOException e) {
112-
e.printStackTrace();
113-
}
114-
}
115-
}
116-
117-
public ContentSummary gcs_fjp (Path path)
118-
throws InterruptedException, IOException {
119-
ForkJoinPool forkJoinPool = new ForkJoinPool();
120-
for (FileStatus fileStatus : abfsStore.listStatus(path)) {
121-
if (fileStatus.isDirectory()) {
122-
processDirectory();
123-
forkJoinPool.invoke(new GCS(fileStatus.getPath()));
124-
} else {
125-
processFile(fileStatus);
126-
}
127-
}
128-
forkJoinPool.awaitTermination(1, TimeUnit.SECONDS);
129-
ContentSummary.Builder builder = new ContentSummary.Builder()
130-
.directoryCount(directoryCount.get()).fileCount(fileCount.get())
131-
.length(totalBytes.get()).spaceConsumed(totalBytes.get());
132-
return builder.build();
133-
}
134-
13592
/**
13693
* Calls listStatus on given path and populated fileStatus queue with
13794
* subdirectories. Is called by new tasks to process the complete subtree

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestGetContentSummary.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import java.util.ArrayList;
2323
import java.util.List;
2424
import java.util.Random;
25+
import java.util.concurrent.CompletionService;
2526
import java.util.concurrent.ExecutionException;
27+
import java.util.concurrent.ExecutorCompletionService;
2628
import java.util.concurrent.ExecutorService;
2729
import java.util.concurrent.Executors;
2830
import java.util.concurrent.Future;
@@ -47,8 +49,8 @@ public class TestGetContentSummary extends AbstractAbfsIntegrationTest {
4749
private static final int TEST_BUFFER_SIZE = 20;
4850
private static final int FILES_PER_DIRECTORY = 2;
4951
private static final int MAX_THREADS = 16;
50-
private static final int NUM_FILES_FOR_LIST_MAX_TEST = 10;
51-
// DEFAULT_AZURE_LIST_MAX_RESULTS + 10;
52+
private static final int NUM_FILES_FOR_LIST_MAX_TEST =
53+
DEFAULT_AZURE_LIST_MAX_RESULTS + 10;
5254
private static final int NUM_CONCURRENT_CALLS = 8;
5355

5456
private final String[] directories = {"/testFolder",
@@ -156,17 +158,18 @@ public void testInvalidPath() throws Exception {
156158
@Test
157159
public void testConcurrentGetContentSummaryCalls()
158160
throws InterruptedException, ExecutionException, IOException {
161+
AzureBlobFileSystem fs = getFileSystem();
159162
ExecutorService executorService = new ThreadPoolExecutor(1, MAX_THREADS, 5,
160163
TimeUnit.SECONDS, new SynchronousQueue<>());
161-
ArrayList<Future<ContentSummary>> futures = new ArrayList<>();
164+
CompletionService<ContentSummary> completionService =
165+
new ExecutorCompletionService<>(executorService);
162166
createDirectoryStructure();
163167
for (int i = 0; i < NUM_CONCURRENT_CALLS; i++) {
164-
Future<ContentSummary> future = executorService.submit(
165-
() -> getFileSystem().getContentSummary(new Path("/testFolder")));
166-
futures.add(future);
168+
completionService.submit(() -> fs.getContentSummary(new Path(
169+
"/testFolder")));
167170
}
168171
for (int i = 0; i < NUM_CONCURRENT_CALLS; i++) {
169-
ContentSummary contentSummary = futures.get(i).get();
172+
ContentSummary contentSummary = completionService.take().get();
170173
verifyContentSummary(contentSummary, 7, 8 * FILES_PER_DIRECTORY,
171174
8 * TEST_BUFFER_SIZE);
172175
}

0 commit comments

Comments
 (0)