Skip to content

Commit e90a8af

Browse files
committed
HADOOP-19569. put operations from store to multipart, now StoreWriter
All upload operations are in MultipartIO service, which has been renamed and move to package org.apache.hadoop.fs.s3a.impl.write to match. For completeness deletion should also go into this class or an adjacent one on deletion. Pulled out multipart IO such that there are no back references from it to S3AStore -the final change is to define a store statistics class which it and other things can use to update stats. Executors in hadoop-common to - pick up shutdown of inner executor and shut themselves down. - semaphore executor to decrement counters in this process so that queue state is updated - semaphored delegating executor unit test in common This stops callers being able to submit work when the inner executor has shut down. WriteOperationHelper * make all calls through its callback interface, rather than given a ref to S3AFS. * Move WriteOperationHelper callbacks to S3Store layer, Multipart IO operations * move nearly all Multpart IO operationss out of s3afs and into a new mulitpart service interface and impl * Multipart service retrieved and invoked as appropriate * StoreImpl stores a map of ServiceName -> service. with a lookupService() method in S3AStore interface, it's possible to retrieve services through the API just by knowing their name and type * registering all current services this way StoreImpl to IllegalStateException on method invocation whene the service isn't running. Some methods are kept open as they do seem needed.
1 parent 6b75775 commit e90a8af

35 files changed

+1838
-1034
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -130,21 +130,20 @@ public static BlockingThreadPoolExecutorService newInstance(
130130
slower than enqueueing. */
131131
final BlockingQueue<Runnable> workQueue =
132132
new LinkedBlockingQueue<>(waitingTasks + activeTasks);
133+
final InnerExecutorRejection rejection = new InnerExecutorRejection();
133134
ThreadPoolExecutor eventProcessingExecutor =
134135
new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
135136
workQueue, newDaemonThreadFactory(prefixName),
136-
new RejectedExecutionHandler() {
137-
@Override
138-
public void rejectedExecution(Runnable r,
139-
ThreadPoolExecutor executor) {
140-
// This is not expected to happen.
141-
LOG.error("Could not submit task to executor {}",
142-
executor.toString());
143-
}
144-
});
137+
rejection);
145138
eventProcessingExecutor.allowCoreThreadTimeOut(true);
146-
return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
147-
eventProcessingExecutor);
139+
final BlockingThreadPoolExecutorService service =
140+
new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
141+
eventProcessingExecutor);
142+
rejection.setDelegate((r, executor) -> {
143+
service.shutdown();
144+
});
145+
146+
return service;
148147
}
149148

150149
/**
@@ -164,4 +163,28 @@ public String toString() {
164163
.append('}');
165164
return sb.toString();
166165
}
166+
167+
private static class InnerExecutorRejection implements RejectedExecutionHandler {
168+
169+
private RejectedExecutionHandler delegate;
170+
171+
private RejectedExecutionHandler getDelegate() {
172+
return delegate;
173+
}
174+
175+
private void setDelegate(final RejectedExecutionHandler delegate) {
176+
this.delegate = delegate;
177+
}
178+
179+
@Override
180+
public void rejectedExecution(Runnable r,
181+
ThreadPoolExecutor executor) {
182+
// This is not expected to happen.
183+
LOG.error("Could not submit task to executor {}",
184+
executor.toString());
185+
if (getDelegate() != null) {
186+
delegate.rejectedExecution(r, executor);
187+
}
188+
}
189+
}
167190
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.ExecutionException;
3232
import java.util.concurrent.ExecutorService;
3333
import java.util.concurrent.Future;
34+
import java.util.concurrent.RejectedExecutionException;
3435
import java.util.concurrent.Semaphore;
3536
import java.util.concurrent.TimeUnit;
3637
import java.util.concurrent.TimeoutException;
@@ -127,6 +128,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
127128

128129
@Override
129130
public <T> Future<T> submit(Callable<T> task) {
131+
rejectWhenShutdown();
130132
try (DurationTracker ignored =
131133
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
132134
queueingPermits.acquire();
@@ -139,6 +141,7 @@ public <T> Future<T> submit(Callable<T> task) {
139141

140142
@Override
141143
public <T> Future<T> submit(Runnable task, T result) {
144+
rejectWhenShutdown();
142145
try (DurationTracker ignored =
143146
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
144147
queueingPermits.acquire();
@@ -151,6 +154,7 @@ public <T> Future<T> submit(Runnable task, T result) {
151154

152155
@Override
153156
public Future<?> submit(Runnable task) {
157+
rejectWhenShutdown();
154158
try (DurationTracker ignored =
155159
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
156160
queueingPermits.acquire();
@@ -163,6 +167,7 @@ public Future<?> submit(Runnable task) {
163167

164168
@Override
165169
public void execute(Runnable command) {
170+
rejectWhenShutdown();
166171
try (DurationTracker ignored =
167172
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
168173
queueingPermits.acquire();
@@ -208,6 +213,16 @@ public String toString() {
208213
return sb.toString();
209214
}
210215

216+
/**
217+
* Raise an exception if invoked when the executor is shut down.
218+
* @throws RejectedExecutionException if the executor is shut down.
219+
*/
220+
private void rejectWhenShutdown() throws RejectedExecutionException{
221+
if (isShutdown()) {
222+
throw new RejectedExecutionException("ExecutorService is shutdown");
223+
}
224+
}
225+
211226
/**
212227
* Releases a permit after the task is executed.
213228
*/
@@ -222,6 +237,7 @@ class RunnableWithPermitRelease implements Runnable {
222237
@Override
223238
public void run() {
224239
try {
240+
rejectWhenShutdown();
225241
delegatee.run();
226242
} finally {
227243
queueingPermits.release();
@@ -244,6 +260,7 @@ class CallableWithPermitRelease<T> implements Callable<T> {
244260
@Override
245261
public T call() throws Exception {
246262
try {
263+
rejectWhenShutdown();
247264
return delegatee.call();
248265
} finally {
249266
queueingPermits.release();
Lines changed: 52 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,34 +16,34 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.hadoop.fs.s3a;
20-
21-
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
22-
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
23-
import org.apache.hadoop.util.StopWatch;
19+
package org.apache.hadoop.util;
2420

21+
import org.assertj.core.api.Assertions;
2522
import org.junit.jupiter.api.AfterAll;
23+
import org.junit.jupiter.api.BeforeEach;
2624
import org.junit.jupiter.api.Test;
27-
import org.junit.jupiter.api.Timeout;
2825
import org.slf4j.Logger;
2926
import org.slf4j.LoggerFactory;
3027

3128
import java.util.concurrent.Callable;
3229
import java.util.concurrent.CountDownLatch;
3330
import java.util.concurrent.ExecutorService;
3431
import java.util.concurrent.Future;
32+
import java.util.concurrent.RejectedExecutionException;
3533
import java.util.concurrent.TimeUnit;
34+
import java.util.stream.IntStream;
35+
36+
import org.apache.hadoop.test.AbstractHadoopTestBase;
3637

37-
import static org.junit.jupiter.api.Assertions.assertEquals;
38+
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
3839

3940
/**
40-
* Basic test for S3A's blocking executor service.
41+
* Test for the blocking executor service.
4142
*/
42-
@Timeout(60)
43-
public class ITestBlockingThreadPoolExecutorService {
43+
public class TestBlockingThreadPoolExecutorService extends AbstractHadoopTestBase {
4444

4545
private static final Logger LOG = LoggerFactory.getLogger(
46-
ITestBlockingThreadPoolExecutorService.class);
46+
TestBlockingThreadPoolExecutorService.class);
4747

4848
private static final int NUM_ACTIVE_TASKS = 4;
4949
private static final int NUM_WAITING_TASKS = 2;
@@ -61,15 +61,15 @@ public static void afterClass() throws Exception {
6161
ensureDestroyed();
6262
}
6363

64+
6465
/**
6566
* Basic test of running one trivial task.
6667
*/
6768
@Test
6869
public void testSubmitCallable() throws Exception {
6970
ensureCreated();
7071
Future<Integer> f = tpe.submit(callableSleeper);
71-
Integer v = f.get();
72-
assertEquals(SOME_VALUE, v);
72+
Assertions.assertThat(f.get()).isEqualTo(SOME_VALUE);
7373
}
7474

7575
/**
@@ -90,9 +90,9 @@ public void testSubmitRunnable() throws Exception {
9090
protected void verifyQueueSize(ExecutorService executorService,
9191
int expectedQueueSize) {
9292
CountDownLatch latch = new CountDownLatch(1);
93-
for (int i = 0; i < expectedQueueSize; i++) {
94-
executorService.submit(new LatchedSleeper(latch));
95-
}
93+
IntStream.range(0, expectedQueueSize)
94+
.mapToObj(i -> new LatchedSleeper(latch))
95+
.forEach(executorService::submit);
9696
StopWatch stopWatch = new StopWatch().start();
9797
latch.countDown();
9898
executorService.submit(sleeper);
@@ -120,6 +120,27 @@ public void testChainedQueue() throws Throwable {
120120
verifyQueueSize(wrapper, size);
121121
}
122122

123+
@Test
124+
public void testShutdownQueueRejectsOperations() throws Throwable {
125+
ensureCreated();
126+
tpe.shutdown();
127+
try {
128+
Assertions.assertThat(tpe.isShutdown())
129+
.describedAs("%s should be shutdown", tpe)
130+
.isTrue();
131+
// runnable
132+
intercept(RejectedExecutionException.class, () ->
133+
tpe.submit(failToRun));
134+
// callable
135+
intercept(RejectedExecutionException.class, () ->
136+
tpe.submit(() -> 0));
137+
intercept(RejectedExecutionException.class, () ->
138+
tpe.execute(failToRun));
139+
} finally {
140+
tpe = null;
141+
}
142+
}
143+
123144
// Helper functions, etc.
124145

125146
private void assertDidBlock(StopWatch sw) {
@@ -132,27 +153,26 @@ private void assertDidBlock(StopWatch sw) {
132153
}
133154
}
134155

135-
private Runnable sleeper = new Runnable() {
136-
@Override
137-
public void run() {
138-
String name = Thread.currentThread().getName();
139-
try {
140-
Thread.sleep(TASK_SLEEP_MSEC);
141-
} catch (InterruptedException e) {
142-
LOG.info("Thread {} interrupted.", name);
143-
Thread.currentThread().interrupt();
144-
}
145-
}
156+
private Runnable failToRun = () -> {
157+
throw new RuntimeException("Failed to Run");
146158
};
147159

148-
private Callable<Integer> callableSleeper = new Callable<Integer>() {
149-
@Override
150-
public Integer call() throws Exception {
151-
sleeper.run();
152-
return SOME_VALUE;
160+
private Runnable sleeper = () -> {
161+
String name = Thread.currentThread().getName();
162+
try {
163+
Thread.sleep(TASK_SLEEP_MSEC);
164+
} catch (InterruptedException e) {
165+
LOG.info("Thread {} interrupted.", name);
166+
Thread.currentThread().interrupt();
153167
}
154168
};
155169

170+
private Callable<Integer> callableSleeper = () -> {
171+
sleeper.run();
172+
return SOME_VALUE;
173+
};
174+
175+
156176
private class LatchedSleeper implements Runnable {
157177
private final CountDownLatch latch;
158178

0 commit comments

Comments
 (0)