Skip to content

Commit c22d19f

Browse files
committed
HADOOP-16080. hadoop-aws does not work with hadoop-client-api. Contributed by Chao Sun (apache#2522)
1 parent 890f2da commit c22d19f

File tree

13 files changed

+61
-49
lines changed

13 files changed

+61
-49
lines changed

hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNFileSystem.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@
2828
import java.util.HashMap;
2929
import java.util.Set;
3030
import java.util.TreeSet;
31+
import java.util.concurrent.ExecutorService;
3132
import java.util.concurrent.TimeUnit;
3233

3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
35-
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
3636

3737
import org.apache.hadoop.classification.InterfaceAudience;
3838
import org.apache.hadoop.classification.InterfaceStability;
@@ -71,8 +71,8 @@ public class CosNFileSystem extends FileSystem {
7171
private String owner = "Unknown";
7272
private String group = "Unknown";
7373

74-
private ListeningExecutorService boundedIOThreadPool;
75-
private ListeningExecutorService boundedCopyThreadPool;
74+
private ExecutorService boundedIOThreadPool;
75+
private ExecutorService boundedCopyThreadPool;
7676

7777
public CosNFileSystem() {
7878
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
3030

31-
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
32-
3331
import org.apache.hadoop.classification.InterfaceAudience;
3432

3533
/**
@@ -105,8 +103,7 @@ public Thread newThread(Runnable r) {
105103

106104
private BlockingThreadPoolExecutorService(int permitCount,
107105
ThreadPoolExecutor eventProcessingExecutor) {
108-
super(MoreExecutors.listeningDecorator(eventProcessingExecutor),
109-
permitCount, false);
106+
super(eventProcessingExecutor, permitCount, false);
110107
this.eventProcessingExecutor = eventProcessingExecutor;
111108
}
112109

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818

1919
package org.apache.hadoop.util;
2020

21-
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ForwardingListeningExecutorService;
21+
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ForwardingExecutorService;
2222
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
23-
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
24-
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
2523

2624
import org.apache.hadoop.classification.InterfaceAudience;
2725
import org.apache.hadoop.fs.statistics.DurationTracker;
@@ -31,6 +29,7 @@
3129
import java.util.List;
3230
import java.util.concurrent.Callable;
3331
import java.util.concurrent.ExecutionException;
32+
import java.util.concurrent.ExecutorService;
3433
import java.util.concurrent.Future;
3534
import java.util.concurrent.Semaphore;
3635
import java.util.concurrent.TimeUnit;
@@ -55,10 +54,10 @@
5554
@SuppressWarnings("NullableProblems")
5655
@InterfaceAudience.Private
5756
public class SemaphoredDelegatingExecutor extends
58-
ForwardingListeningExecutorService {
57+
ForwardingExecutorService {
5958

6059
private final Semaphore queueingPermits;
61-
private final ListeningExecutorService executorDelegatee;
60+
private final ExecutorService executorDelegatee;
6261
private final int permitCount;
6362
private final DurationTrackerFactory trackerFactory;
6463

@@ -70,7 +69,7 @@ public class SemaphoredDelegatingExecutor extends
7069
* @param trackerFactory duration tracker factory.
7170
*/
7271
public SemaphoredDelegatingExecutor(
73-
ListeningExecutorService executorDelegatee,
72+
ExecutorService executorDelegatee,
7473
int permitCount,
7574
boolean fair,
7675
DurationTrackerFactory trackerFactory) {
@@ -89,14 +88,14 @@ public SemaphoredDelegatingExecutor(
8988
* @param fair should the semaphore be "fair"
9089
*/
9190
public SemaphoredDelegatingExecutor(
92-
ListeningExecutorService executorDelegatee,
91+
ExecutorService executorDelegatee,
9392
int permitCount,
9493
boolean fair) {
9594
this(executorDelegatee, permitCount, fair, null);
9695
}
9796

9897
@Override
99-
protected ListeningExecutorService delegate() {
98+
protected ExecutorService delegate() {
10099
return executorDelegatee;
101100
}
102101

@@ -127,7 +126,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
127126
}
128127

129128
@Override
130-
public <T> ListenableFuture<T> submit(Callable<T> task) {
129+
public <T> Future<T> submit(Callable<T> task) {
131130
try (DurationTracker ignored =
132131
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
133132
queueingPermits.acquire();
@@ -139,7 +138,7 @@ public <T> ListenableFuture<T> submit(Callable<T> task) {
139138
}
140139

141140
@Override
142-
public <T> ListenableFuture<T> submit(Runnable task, T result) {
141+
public <T> Future<T> submit(Runnable task, T result) {
143142
try (DurationTracker ignored =
144143
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
145144
queueingPermits.acquire();
@@ -151,7 +150,7 @@ public <T> ListenableFuture<T> submit(Runnable task, T result) {
151150
}
152151

153152
@Override
154-
public ListenableFuture<?> submit(Runnable task) {
153+
public Future<?> submit(Runnable task) {
155154
try (DurationTracker ignored =
156155
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
157156
queueingPermits.acquire();

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.Semaphore;
2828
import java.util.concurrent.TimeUnit;
2929

30+
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
3031
import org.apache.hadoop.conf.Configuration;
3132
import org.apache.hadoop.security.UserGroupInformation;
3233
import org.apache.hadoop.security.token.Token;
@@ -423,9 +424,10 @@ private void createFileSystems(final FileSystem.Cache cache, final int count)
423424
// only one instance can be created at a time.
424425
URI uri = new URI("blocking://a");
425426
ListeningExecutorService pool =
426-
BlockingThreadPoolExecutorService.newInstance(count * 2, 0,
427+
MoreExecutors.listeningDecorator(
428+
BlockingThreadPoolExecutorService.newInstance(count * 2, 0,
427429
10, TimeUnit.SECONDS,
428-
"creation-threads");
430+
"creation-threads"));
429431

430432
// submit a set of requests to create an FS instance.
431433
// the semaphore will block all but one, and that will block until

hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.concurrent.ExecutorService;
2828
import java.util.concurrent.TimeUnit;
2929

30-
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
3130
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
3231
import org.apache.commons.collections.CollectionUtils;
3332
import org.apache.commons.lang3.StringUtils;
@@ -78,8 +77,8 @@ public class AliyunOSSFileSystem extends FileSystem {
7877
private int maxKeys;
7978
private int maxReadAheadPartNumber;
8079
private int maxConcurrentCopyTasksPerDir;
81-
private ListeningExecutorService boundedThreadPool;
82-
private ListeningExecutorService boundedCopyThreadPool;
80+
private ExecutorService boundedThreadPool;
81+
private ExecutorService boundedCopyThreadPool;
8382

8483
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
8584
@Override

hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,10 @@
8484
<Bug pattern="VO_VOLATILE_INCREMENT"/>
8585
</Match>
8686

87+
<!-- Ignore return value from this method call -->
88+
<Match>
89+
<Class name="org.apache.hadoop.fs.s3a.impl.StoreContext"/>
90+
<Method name="submit"/>
91+
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
92+
</Match>
8793
</FindBugsFilter>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.Set;
4343
import java.util.Objects;
4444
import java.util.concurrent.CompletableFuture;
45+
import java.util.concurrent.ExecutorService;
4546
import java.util.concurrent.LinkedBlockingQueue;
4647
import java.util.concurrent.ThreadPoolExecutor;
4748
import java.util.concurrent.TimeUnit;
@@ -261,7 +262,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
261262
private long partSize;
262263
private boolean enableMultiObjectsDelete;
263264
private TransferManager transfers;
264-
private ListeningExecutorService boundedThreadPool;
265+
private ExecutorService boundedThreadPool;
265266
private ThreadPoolExecutor unboundedThreadPool;
266267
private int executorCapacity;
267268
private long multiPartThreshold;

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
2929
import com.amazonaws.services.s3.model.DeleteObjectsResult;
3030
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
31+
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;;
3132
import org.slf4j.Logger;
3233
import org.slf4j.LoggerFactory;
3334

@@ -207,7 +208,8 @@ public DeleteOperation(final StoreContext context,
207208
"page size out of range: %s", pageSize);
208209
this.pageSize = pageSize;
209210
metadataStore = context.getMetadataStore();
210-
executor = context.createThrottledExecutor(1);
211+
executor = MoreExecutors.listeningDecorator(
212+
context.createThrottledExecutor(1));
211213
}
212214

213215
public long getFilesDeleted() {

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
import java.net.URI;
2424
import java.util.concurrent.Callable;
2525
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.ExecutorService;
2627

2728
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
2829

30+
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
2931
import org.apache.hadoop.classification.InterfaceAudience;
3032
import org.apache.hadoop.classification.InterfaceStability;
3133
import org.apache.hadoop.conf.Configuration;
@@ -127,7 +129,7 @@ public StoreContext(
127129
final Configuration configuration,
128130
final String username,
129131
final UserGroupInformation owner,
130-
final ListeningExecutorService executor,
132+
final ExecutorService executor,
131133
final int executorCapacity,
132134
final Invoker invoker,
133135
final S3AStatisticsContext instrumentation,
@@ -144,7 +146,7 @@ public StoreContext(
144146
this.configuration = configuration;
145147
this.username = username;
146148
this.owner = owner;
147-
this.executor = executor;
149+
this.executor = MoreExecutors.listeningDecorator(executor);
148150
this.executorCapacity = executorCapacity;
149151
this.invoker = invoker;
150152
this.instrumentation = instrumentation;
@@ -179,7 +181,7 @@ public String getUsername() {
179181
return username;
180182
}
181183

182-
public ListeningExecutorService getExecutor() {
184+
public ExecutorService getExecutor() {
183185
return executor;
184186
}
185187

@@ -310,7 +312,7 @@ public void incrementGauge(Statistic statistic, long count) {
310312
* @param capacity maximum capacity of this executor.
311313
* @return an executor for submitting work.
312314
*/
313-
public ListeningExecutorService createThrottledExecutor(int capacity) {
315+
public ExecutorService createThrottledExecutor(int capacity) {
314316
return new SemaphoredDelegatingExecutor(executor,
315317
capacity, true);
316318
}
@@ -320,7 +322,7 @@ public ListeningExecutorService createThrottledExecutor(int capacity) {
320322
* {@link #executorCapacity}.
321323
* @return a new executor for exclusive use by the caller.
322324
*/
323-
public ListeningExecutorService createThrottledExecutor() {
325+
public ExecutorService createThrottledExecutor() {
324326
return createThrottledExecutor(executorCapacity);
325327
}
326328

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919
package org.apache.hadoop.fs.s3a.impl;
2020

2121
import java.net.URI;
22-
23-
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
22+
import java.util.concurrent.ExecutorService;
2423

2524
import org.apache.hadoop.conf.Configuration;
2625
import org.apache.hadoop.fs.s3a.Invoker;
@@ -46,7 +45,7 @@ public class StoreContextBuilder {
4645

4746
private UserGroupInformation owner;
4847

49-
private ListeningExecutorService executor;
48+
private ExecutorService executor;
5049

5150
private int executorCapacity;
5251

@@ -96,7 +95,7 @@ public StoreContextBuilder setOwner(final UserGroupInformation ugi) {
9695
}
9796

9897
public StoreContextBuilder setExecutor(
99-
final ListeningExecutorService ex) {
98+
final ExecutorService ex) {
10099
this.executor = ex;
101100
return this;
102101
}

0 commit comments

Comments
 (0)