Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen01 committed Dec 19, 2023
1 parent 7e808b4 commit f695a37
Showing 1 changed file with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -101,14 +100,14 @@ public class ECReconstructionCoordinator implements Closeable {
LoggerFactory.getLogger(ECReconstructionCoordinator.class);

private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 3;
private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5;

private final ECContainerOperationClient containerOperationClient;

private final ByteBufferPool byteBufferPool;

private final ExecutorService ecReconstructReadExecutor;
private final ExecutorService ecReconstructWriteExecutor;
private volatile ExecutorService ecReconstructWriteExecutor;
private final BlockInputStreamFactory blockInputStreamFactory;
private final TokenHelper tokenHelper;
private final ECReconstructionMetrics metrics;
Expand All @@ -124,17 +123,16 @@ public ECReconstructionCoordinator(
this.containerOperationClient = new ECContainerOperationClient(conf,
certificateClient);
this.byteBufferPool = new ElasticByteBufferPool();
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "ec-reconstruct-reader-TID-%d")
.build();
this.ecReconstructReadExecutor =
new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
conf.getObject(OzoneClientConfig.class)
.getEcReconstructStripeReadPoolLimit(),
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
threadFactory,
new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "ec-reconstruct-reader-TID-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy());
this.ecReconstructWriteExecutor =
new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE,
Expand All @@ -143,7 +141,9 @@ public ECReconstructionCoordinator(
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
threadFactory,
new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "ec-reconstruct-write-TID-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy());
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
.getInstance(byteBufferPool, () -> ecReconstructReadExecutor);
Expand Down Expand Up @@ -472,6 +472,10 @@ public void close() throws IOException {
if (containerOperationClient != null) {
containerOperationClient.close();
}
if (ecReconstructWriteExecutor != null) {
ecReconstructWriteExecutor.shutdownNow();
ecReconstructWriteExecutor = null;
}
}

private Pipeline rebuildInputPipeline(ECReplicationConfig repConfig,
Expand Down

0 comments on commit f695a37

Please sign in to comment.