diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java index 0c5501c7922..957f761ccbc 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java @@ -73,7 +73,7 @@ SortedMap> getCommitIndexMap() { return commitIndexMap; } - void updateCommitInfoMap(long index, List buffers) { + synchronized void updateCommitInfoMap(long index, List buffers) { commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>()) .addAll(buffers); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 5ff5da60989..5c0516d7bd4 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -25,7 +25,6 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -182,8 +181,7 @@ public BlockOutputStream( (long) flushPeriod * streamBufferArgs.getStreamBufferSize() == streamBufferArgs .getStreamBufferFlushSize()); - // A single thread executor handle the responses of async requests - responseExecutor = Executors.newSingleThreadExecutor(); + this.responseExecutor = blockOutputStreamResourceProvider.get(); bufferList = null; totalDataFlushedLength = 0; writtenDataLength = 0; @@ -657,7 +655,6 @@ public void cleanup(boolean invalidateClient) { bufferList.clear(); } bufferList = null; - responseExecutor.shutdown(); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index a45c1584484..90756bbc889 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -101,8 +101,7 @@ public class ECReconstructionCoordinator implements Closeable { private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3; - // TODO: Adjusts to the appropriate value when the ec-reconstruct-writer thread pool is used. - private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 0; + private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5; private final ECContainerOperationClient containerOperationClient; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index 878558073f7..0cb3973e041 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -43,8 +43,6 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -66,7 +64,6 @@ public final class ECKeyOutputStream extends KeyOutputStream private final int numParityBlks; private final ByteBufferPool bufferPool; private final RawErasureEncoder encoder; - private final ExecutorService flushExecutor; private final Future flushFuture; private final AtomicLong flushCheckpoint; @@ -119,12 +116,13 @@ private ECKeyOutputStream(Builder builder) { this.writeOffset = 0; this.encoder = CodecUtil.createRawEncoderWithFallback( builder.getReplicationConfig()); - this.flushExecutor = Executors.newSingleThreadExecutor(); S3Auth s3Auth = builder.getS3CredentialsProvider().get(); ThreadLocal s3CredentialsProvider = builder.getS3CredentialsProvider(); - flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth)); - this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue); + this.flushFuture = builder.getExecutorServiceSupplier().get().submit(() -> { + s3CredentialsProvider.set(s3Auth); + return flushStripeFromQueue(); + }); this.flushCheckpoint = new AtomicLong(0); this.atomicKeyCreation = builder.getAtomicKeyCreation(); } @@ -495,7 +493,6 @@ public void close() throws IOException { } catch (InterruptedException e) { throw new IOException("Flushing thread was interrupted", e); } finally { - flushExecutor.shutdownNow(); closeCurrentStreamEntry(); blockOutputStreamEntryPool.cleanup(); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 74b22e7ca4c..a6830ba9f77 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -196,8 +196,7 @@ public class RpcClient implements ClientProtocol { // for reconstruction. private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3; - // TODO: Adjusts to the appropriate value when the writeThreadPool is used. - private static final int WRITE_POOL_MIN_SIZE = 0; + private static final int WRITE_POOL_MIN_SIZE = 1; private final ConfigurationSource conf; private final OzoneManagerClientProtocol ozoneManagerClient; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java index 29cf1bc5e11..44303ed2ff2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java @@ -213,6 +213,14 @@ static void shutdown() throws IOException { } } + static void reInitClient() throws IOException { + ozClient = OzoneClientFactory.getRpcClient(conf); + store = ozClient.getObjectStore(); + TestOzoneRpcClient.setOzClient(ozClient); + TestOzoneRpcClient.setStore(store); + } + + @ParameterizedTest @EnumSource void testPutKeyWithEncryption(BucketLayout bucketLayout) throws Exception { @@ -770,9 +778,7 @@ void testGetKeyProvider() throws Exception { KeyProvider kp3 = ozClient.getObjectStore().getKeyProvider(); assertNotEquals(kp3, kpSpy); - // Restore ozClient and store - TestOzoneRpcClient.setOzClient(OzoneClientFactory.getRpcClient(conf)); - TestOzoneRpcClient.setStore(ozClient.getObjectStore()); + reInitClient(); } private static RepeatedOmKeyInfo getMatchedKeyInfo(