Skip to content

Commit

Permalink
HDDS-10383. Introduce a Provider for client-side thread resources pas…
Browse files Browse the repository at this point in the history
…sing (apache#6222)
  • Loading branch information
xichen01 authored Feb 22, 2024
1 parent 45d420a commit f0b75b7
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ public enum ChecksumCombineMode {
// 3 concurrent stripe read should be enough.
private int ecReconstructStripeReadPoolLimit = 10 * 3;

@Config(key = "ec.reconstruct.stripe.write.pool.limit",
defaultValue = "30",
description = "Thread pool max size for parallelly write" +
" available ec chunks to reconstruct the whole stripe.",
tags = ConfigTag.CLIENT)
private int ecReconstructStripeWritePoolLimit = 10 * 3;

@Config(key = "checksum.combine.mode",
defaultValue = "COMPOSITE_CRC",
description = "The combined checksum type [MD5MD5CRC / COMPOSITE_CRC] "
Expand Down Expand Up @@ -387,6 +394,14 @@ public int getEcReconstructStripeReadPoolLimit() {
return ecReconstructStripeReadPoolLimit;
}

public void setEcReconstructStripeWritePoolLimit(int poolLimit) {
this.ecReconstructStripeWritePoolLimit = poolLimit;
}

public int getEcReconstructStripeWritePoolLimit() {
return ecReconstructStripeWritePoolLimit;
}

public void setFsDefaultBucketLayout(String bucketLayout) {
if (!bucketLayout.isEmpty()) {
this.fsDefaultBucketLayout = bucketLayout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -145,7 +146,8 @@ public BlockOutputStream(
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> blockOutputStreamResourceProvider
) throws IOException {
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
Expand Down Expand Up @@ -75,10 +77,11 @@ public ECBlockOutputStream(
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> executorServiceSupplier
) throws IOException {
super(blockID, xceiverClientManager,
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier);
// In EC stream, there will be only one node in pipeline.
this.datanodeDetails = pipeline.getClosestNode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

/**
* An {@link OutputStream} used by the REST service in combination with the
Expand Down Expand Up @@ -65,8 +67,8 @@ public class RatisBlockOutputStream extends BlockOutputStream
/**
* Creates a new BlockOutputStream.
*
* @param blockID block ID
* @param bufferPool pool of buffers
* @param blockID block ID
* @param bufferPool pool of buffers
*/
@SuppressWarnings("checkstyle:ParameterNumber")
public RatisBlockOutputStream(
Expand All @@ -76,10 +78,11 @@ public RatisBlockOutputStream(
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> blockOutputStreamResourceProvider
) throws IOException {
super(blockID, xceiverClientManager, pipeline,
bufferPool, config, token, clientMetrics, streamBufferArgs);
bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider);
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -108,7 +109,9 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
bufferPool,
config,
null,
ContainerClientMetrics.acquire(), streamBufferArgs);
ContainerClientMetrics.acquire(),
streamBufferArgs,
() -> newFixedThreadPool(10));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.IOUtils;
Expand All @@ -50,6 +50,7 @@
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.util.MemoizedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -70,7 +71,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -101,12 +101,15 @@ public class ECReconstructionCoordinator implements Closeable {

private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;

// TODO: Adjusts to the appropriate value when the ec-reconstruct-writer thread pool is used.
private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 0;

private final ECContainerOperationClient containerOperationClient;

private final ByteBufferPool byteBufferPool;

private final ExecutorService ecReconstructExecutor;

private final ExecutorService ecReconstructReadExecutor;
private final MemoizedSupplier<ExecutorService> ecReconstructWriteExecutor;
private final BlockInputStreamFactory blockInputStreamFactory;
private final TokenHelper tokenHelper;
private final ContainerClientMetrics clientMetrics;
Expand All @@ -123,20 +126,18 @@ public ECReconstructionCoordinator(
this.containerOperationClient = new ECContainerOperationClient(conf,
certificateClient);
this.byteBufferPool = new ElasticByteBufferPool();
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "ec-reconstruct-reader-TID-%d")
.build();
ozoneClientConfig = conf.getObject(OzoneClientConfig.class);
this.ecReconstructExecutor =
new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
ozoneClientConfig.getEcReconstructStripeReadPoolLimit(),
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy());
this.ecReconstructReadExecutor = createThreadPoolExecutor(
EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
ozoneClientConfig.getEcReconstructStripeReadPoolLimit(),
threadNamePrefix + "ec-reconstruct-reader-TID-%d");
this.ecReconstructWriteExecutor = MemoizedSupplier.valueOf(
() -> createThreadPoolExecutor(
EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE,
ozoneClientConfig.getEcReconstructStripeWritePoolLimit(),
threadNamePrefix + "ec-reconstruct-writer-TID-%d"));
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
.getInstance(byteBufferPool, () -> ecReconstructExecutor);
.getInstance(byteBufferPool, () -> ecReconstructReadExecutor);
tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient);
this.clientMetrics = ContainerClientMetrics.acquire();
this.metrics = metrics;
Expand Down Expand Up @@ -232,7 +233,7 @@ private ECBlockOutputStream getECBlockOutputStream(
containerOperationClient.singleNodePipeline(datanodeDetails,
repConfig, replicaIndex),
BufferPool.empty(), ozoneClientConfig,
blockLocationInfo.getToken(), clientMetrics, streamBufferArgs);
blockLocationInfo.getToken(), clientMetrics, streamBufferArgs, ecReconstructWriteExecutor);
}

@VisibleForTesting
Expand Down Expand Up @@ -272,7 +273,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
repConfig, blockLocationInfo, true,
this.containerOperationClient.getXceiverClientManager(), null,
this.blockInputStreamFactory, byteBufferPool,
this.ecReconstructExecutor)) {
this.ecReconstructReadExecutor)) {

ECBlockOutputStream[] targetBlockStreams =
new ECBlockOutputStream[toReconstructIndexes.size()];
Expand Down Expand Up @@ -457,6 +458,9 @@ public void close() throws IOException {
if (containerOperationClient != null) {
containerOperationClient.close();
}
if (ecReconstructWriteExecutor.isInitialized()) {
ecReconstructWriteExecutor.get().shutdownNow();
}
}

private Pipeline rebuildInputPipeline(ECReplicationConfig repConfig,
Expand Down Expand Up @@ -590,4 +594,12 @@ OptionalLong getTermOfLeaderSCM() {
.map(StateContext::getTermOfLeaderSCM)
.orElse(OptionalLong.empty());
}

private static ExecutorService createThreadPoolExecutor(
int corePoolSize, int maximumPoolSize, String threadNameFormat) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
60, TimeUnit.SECONDS, new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hdds.client.BlockID;
Expand Down Expand Up @@ -64,6 +66,7 @@ public class BlockOutputStreamEntry extends OutputStream {
private final BufferPool bufferPool;
private final ContainerClientMetrics clientMetrics;
private final StreamBufferArgs streamBufferArgs;
private final Supplier<ExecutorService> executorServiceSupplier;

BlockOutputStreamEntry(Builder b) {
this.config = b.config;
Expand All @@ -78,6 +81,7 @@ public class BlockOutputStreamEntry extends OutputStream {
this.bufferPool = b.bufferPool;
this.clientMetrics = b.clientMetrics;
this.streamBufferArgs = b.streamBufferArgs;
this.executorServiceSupplier = b.executorServiceSupplier;
}

@Override
Expand All @@ -104,13 +108,18 @@ void checkStream() throws IOException {
*/
void createOutputStream() throws IOException {
outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager,
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs,
executorServiceSupplier);
}

ContainerClientMetrics getClientMetrics() {
return clientMetrics;
}

Supplier<ExecutorService> getExecutorServiceSupplier() {
return executorServiceSupplier;
}

StreamBufferArgs getStreamBufferArgs() {
return streamBufferArgs;
}
Expand Down Expand Up @@ -357,6 +366,7 @@ public static class Builder {
private OzoneClientConfig config;
private ContainerClientMetrics clientMetrics;
private StreamBufferArgs streamBufferArgs;
private Supplier<ExecutorService> executorServiceSupplier;

public Pipeline getPipeline() {
return pipeline;
Expand Down Expand Up @@ -406,15 +416,22 @@ public Builder setToken(Token<OzoneBlockTokenIdentifier> bToken) {
this.token = bToken;
return this;
}

public Builder setClientMetrics(ContainerClientMetrics clientMetrics) {
this.clientMetrics = clientMetrics;
return this;
}

public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) {
this.streamBufferArgs = streamBufferArgs;
return this;
}

public Builder setExecutorServiceSupplier(Supplier<ExecutorService> executorServiceSupplier) {
this.executorServiceSupplier = executorServiceSupplier;
return this;
}

public BlockOutputStreamEntry build() {
return new BlockOutputStreamEntry(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware {
private final ExcludeList excludeList;
private final ContainerClientMetrics clientMetrics;
private final StreamBufferArgs streamBufferArgs;
private final Supplier<ExecutorService> executorServiceSupplier;

public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) {
this.config = b.getClientConfig();
Expand All @@ -109,6 +112,7 @@ public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) {
ByteStringConversion
.createByteBufferConversion(b.isUnsafeByteBufferConversionEnabled()));
this.clientMetrics = b.getClientMetrics();
this.executorServiceSupplier = b.getExecutorServiceSupplier();
}

ExcludeList createExcludeList() {
Expand Down Expand Up @@ -159,6 +163,7 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
.setToken(subKeyInfo.getToken())
.setClientMetrics(clientMetrics)
.setStreamBufferArgs(streamBufferArgs)
.setExecutorServiceSupplier(executorServiceSupplier)
.build();
}

Expand Down Expand Up @@ -229,6 +234,10 @@ StreamBufferArgs getStreamBufferArgs() {
return streamBufferArgs;
}

public Supplier<ExecutorService> getExecutorServiceSupplier() {
return executorServiceSupplier;
}

/**
* Discards the subsequent pre allocated blocks and removes the streamEntries
* from the streamEntries list for the container which is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ void checkStream() throws IOException {
streams[i] =
new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
createSingleECBlockPipeline(getPipeline(), nodes.get(i), i + 1),
getBufferPool(), getConf(), getToken(), getClientMetrics(), getStreamBufferArgs());
getBufferPool(), getConf(), getToken(), getClientMetrics(), getStreamBufferArgs(),
getExecutorServiceSupplier());
}
blockOutputStreams = streams;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
.setBufferPool(getBufferPool())
.setToken(subKeyInfo.getToken())
.setClientMetrics(getClientMetrics())
.setStreamBufferArgs(getStreamBufferArgs());
.setStreamBufferArgs(getStreamBufferArgs())
.setExecutorServiceSupplier(getExecutorServiceSupplier());
return b.build();
}

Expand Down
Loading

0 comments on commit f0b75b7

Please sign in to comment.