Skip to content

Commit

Permalink
HDDS-10384. RPC client Reusing thread resources. (#6270)
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen01 authored Feb 26, 2024
1 parent df68290 commit 84c6e4d
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ SortedMap<Long, List<BUFFER>> getCommitIndexMap() {
return commitIndexMap;
}

void updateCommitInfoMap(long index, List<BUFFER> buffers) {
synchronized void updateCommitInfoMap(long index, List<BUFFER> buffers) {
commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
.addAll(buffers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -182,8 +181,7 @@ public BlockOutputStream(
(long) flushPeriod * streamBufferArgs.getStreamBufferSize() == streamBufferArgs
.getStreamBufferFlushSize());

// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
this.responseExecutor = blockOutputStreamResourceProvider.get();
bufferList = null;
totalDataFlushedLength = 0;
writtenDataLength = 0;
Expand Down Expand Up @@ -657,7 +655,6 @@ public void cleanup(boolean invalidateClient) {
bufferList.clear();
}
bufferList = null;
responseExecutor.shutdown();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ public class ECReconstructionCoordinator implements Closeable {

private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;

// TODO: Adjusts to the appropriate value when the ec-reconstruct-writer thread pool is used.
private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 0;
private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5;

private final ECContainerOperationClient containerOperationClient;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -66,7 +64,6 @@ public final class ECKeyOutputStream extends KeyOutputStream
private final int numParityBlks;
private final ByteBufferPool bufferPool;
private final RawErasureEncoder encoder;
private final ExecutorService flushExecutor;
private final Future<Boolean> flushFuture;
private final AtomicLong flushCheckpoint;

Expand Down Expand Up @@ -119,12 +116,13 @@ private ECKeyOutputStream(Builder builder) {
this.writeOffset = 0;
this.encoder = CodecUtil.createRawEncoderWithFallback(
builder.getReplicationConfig());
this.flushExecutor = Executors.newSingleThreadExecutor();
S3Auth s3Auth = builder.getS3CredentialsProvider().get();
ThreadLocal<S3Auth> s3CredentialsProvider =
builder.getS3CredentialsProvider();
flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth));
this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue);
this.flushFuture = builder.getExecutorServiceSupplier().get().submit(() -> {
s3CredentialsProvider.set(s3Auth);
return flushStripeFromQueue();
});
this.flushCheckpoint = new AtomicLong(0);
this.atomicKeyCreation = builder.getAtomicKeyCreation();
}
Expand Down Expand Up @@ -495,7 +493,6 @@ public void close() throws IOException {
} catch (InterruptedException e) {
throw new IOException("Flushing thread was interrupted", e);
} finally {
flushExecutor.shutdownNow();
closeCurrentStreamEntry();
blockOutputStreamEntryPool.cleanup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ public class RpcClient implements ClientProtocol {
// for reconstruction.
private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;

// TODO: Adjusts to the appropriate value when the writeThreadPool is used.
private static final int WRITE_POOL_MIN_SIZE = 0;
private static final int WRITE_POOL_MIN_SIZE = 1;

private final ConfigurationSource conf;
private final OzoneManagerClientProtocol ozoneManagerClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ static void shutdown() throws IOException {
}
}

static void reInitClient() throws IOException {
ozClient = OzoneClientFactory.getRpcClient(conf);
store = ozClient.getObjectStore();
TestOzoneRpcClient.setOzClient(ozClient);
TestOzoneRpcClient.setStore(store);
}


@ParameterizedTest
@EnumSource
void testPutKeyWithEncryption(BucketLayout bucketLayout) throws Exception {
Expand Down Expand Up @@ -770,9 +778,7 @@ void testGetKeyProvider() throws Exception {

KeyProvider kp3 = ozClient.getObjectStore().getKeyProvider();
assertNotEquals(kp3, kpSpy);
// Restore ozClient and store
TestOzoneRpcClient.setOzClient(OzoneClientFactory.getRpcClient(conf));
TestOzoneRpcClient.setStore(ozClient.getObjectStore());
reInitClient();
}

private static RepeatedOmKeyInfo getMatchedKeyInfo(
Expand Down

0 comments on commit 84c6e4d

Please sign in to comment.