Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10384. RPC client Reusing thread resources. #6270

Merged
merged 2 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ SortedMap<Long, List<BUFFER>> getCommitIndexMap() {
return commitIndexMap;
}

void updateCommitInfoMap(long index, List<BUFFER> buffers) {
synchronized void updateCommitInfoMap(long index, List<BUFFER> buffers) {
commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
.addAll(buffers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -182,8 +181,7 @@ public BlockOutputStream(
(long) flushPeriod * streamBufferArgs.getStreamBufferSize() == streamBufferArgs
.getStreamBufferFlushSize());

// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
this.responseExecutor = blockOutputStreamResourceProvider.get();
bufferList = null;
totalDataFlushedLength = 0;
writtenDataLength = 0;
Expand Down Expand Up @@ -657,7 +655,6 @@ public void cleanup(boolean invalidateClient) {
bufferList.clear();
}
bufferList = null;
responseExecutor.shutdown();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ public class ECReconstructionCoordinator implements Closeable {

private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;

// TODO: Adjusts to the appropriate value when the ec-reconstruct-writer thread pool is used.
private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 0;
private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5;

private final ECContainerOperationClient containerOperationClient;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -66,7 +64,6 @@ public final class ECKeyOutputStream extends KeyOutputStream
private final int numParityBlks;
private final ByteBufferPool bufferPool;
private final RawErasureEncoder encoder;
private final ExecutorService flushExecutor;
private final Future<Boolean> flushFuture;
private final AtomicLong flushCheckpoint;

Expand Down Expand Up @@ -119,12 +116,13 @@ private ECKeyOutputStream(Builder builder) {
this.writeOffset = 0;
this.encoder = CodecUtil.createRawEncoderWithFallback(
builder.getReplicationConfig());
this.flushExecutor = Executors.newSingleThreadExecutor();
S3Auth s3Auth = builder.getS3CredentialsProvider().get();
ThreadLocal<S3Auth> s3CredentialsProvider =
builder.getS3CredentialsProvider();
flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth));
this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue);
this.flushFuture = builder.getExecutorServiceSupplier().get().submit(() -> {
s3CredentialsProvider.set(s3Auth);
return flushStripeFromQueue();
});
this.flushCheckpoint = new AtomicLong(0);
this.atomicKeyCreation = builder.getAtomicKeyCreation();
}
Expand Down Expand Up @@ -495,7 +493,6 @@ public void close() throws IOException {
} catch (InterruptedException e) {
throw new IOException("Flushing thread was interrupted", e);
} finally {
flushExecutor.shutdownNow();
closeCurrentStreamEntry();
blockOutputStreamEntryPool.cleanup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ public class RpcClient implements ClientProtocol {
// for reconstruction.
private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;

// TODO: Adjusts to the appropriate value when the writeThreadPool is used.
private static final int WRITE_POOL_MIN_SIZE = 0;
private static final int WRITE_POOL_MIN_SIZE = 1;

private final ConfigurationSource conf;
private final OzoneManagerClientProtocol ozoneManagerClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ static void shutdown() throws IOException {
}
}

static void reInitClient() throws IOException {
ozClient = OzoneClientFactory.getRpcClient(conf);
store = ozClient.getObjectStore();
TestOzoneRpcClient.setOzClient(ozClient);
TestOzoneRpcClient.setStore(store);
}


@ParameterizedTest
@EnumSource
void testPutKeyWithEncryption(BucketLayout bucketLayout) throws Exception {
Expand Down Expand Up @@ -770,9 +778,7 @@ void testGetKeyProvider() throws Exception {

KeyProvider kp3 = ozClient.getObjectStore().getKeyProvider();
assertNotEquals(kp3, kpSpy);
// Restore ozClient and store
TestOzoneRpcClient.setOzClient(OzoneClientFactory.getRpcClient(conf));
TestOzoneRpcClient.setStore(ozClient.getObjectStore());
reInitClient();
}

private static RepeatedOmKeyInfo getMatchedKeyInfo(
Expand Down