diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 9bdec27f534..c0221d07a55 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -37,6 +37,7 @@ import org.apache.hadoop.security.token.Token; import com.google.common.annotations.VisibleForTesting; +import org.apache.ratis.util.JavaUtils; /** * A BlockOutputStreamEntry manages the data writes into the DataNodes. @@ -60,33 +61,28 @@ public class BlockOutputStreamEntry extends OutputStream { private long currentPosition; private final Token token; - private BufferPool bufferPool; - private ContainerClientMetrics clientMetrics; - private StreamBufferArgs streamBufferArgs; - - @SuppressWarnings({"parameternumber", "squid:S00107"}) - BlockOutputStreamEntry( - BlockID blockID, String key, - XceiverClientFactory xceiverClientManager, - Pipeline pipeline, - long length, - BufferPool bufferPool, - Token token, - OzoneClientConfig config, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs - ) { - this.config = config; + private final BufferPool bufferPool; + private final ContainerClientMetrics clientMetrics; + private final StreamBufferArgs streamBufferArgs; + + BlockOutputStreamEntry(Builder b) { + this.config = b.config; this.outputStream = null; - this.blockID = blockID; - this.key = key; - this.xceiverClientManager = xceiverClientManager; - this.pipeline = pipeline; - this.token = token; - this.length = length; + this.blockID = b.blockID; + this.key = b.key; + this.xceiverClientManager = b.xceiverClientManager; + this.pipeline = b.pipeline; + this.token = b.token; + this.length = b.length; this.currentPosition = 0; - this.bufferPool = bufferPool; - this.clientMetrics = clientMetrics; - this.streamBufferArgs = streamBufferArgs; + this.bufferPool = b.bufferPool; + this.clientMetrics = b.clientMetrics; + this.streamBufferArgs = b.streamBufferArgs; + } + + @Override + public String toString() { + return JavaUtils.getClassSimpleName(getClass()) + ":" + key + " " + blockID; } /** @@ -362,6 +358,14 @@ public static class Builder { private ContainerClientMetrics clientMetrics; private StreamBufferArgs streamBufferArgs; + public Pipeline getPipeline() { + return pipeline; + } + + public long getLength() { + return length; + } + public Builder setBlockID(BlockID bID) { this.blockID = bID; return this; @@ -412,13 +416,7 @@ public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) { } public BlockOutputStreamEntry build() { - return new BlockOutputStreamEntry(blockID, - key, - xceiverClientManager, - pipeline, - length, - bufferPool, - token, config, clientMetrics, streamBufferArgs); + return new BlockOutputStreamEntry(this); } } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index d0f3b5728a8..4d6026f9259 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -26,7 +26,6 @@ import java.util.ListIterator; import java.util.Map; -import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.hdds.scm.ContainerClientMetrics; import org.apache.hadoop.hdds.scm.OzoneClientConfig; @@ -62,7 +61,7 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware { /** * List of stream entries that are used to write a block of data. */ - private final List streamEntries; + private final List streamEntries = new ArrayList<>(); private final OzoneClientConfig config; /** * The actual stream entry we are writing into. Note that a stream entry is @@ -73,7 +72,6 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware { private final OzoneManagerProtocol omClient; private final OmKeyArgs keyArgs; private final XceiverClientFactory xceiverClientFactory; - private final String requestID; /** * A {@link BufferPool} shared between all * {@link org.apache.hadoop.hdds.scm.storage.BlockOutputStream}s managed by @@ -86,39 +84,31 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware { private final ContainerClientMetrics clientMetrics; private final StreamBufferArgs streamBufferArgs; - @SuppressWarnings({"parameternumber", "squid:S00107"}) - public BlockOutputStreamEntryPool( - OzoneClientConfig config, - OzoneManagerProtocol omClient, - String requestId, ReplicationConfig replicationConfig, - String uploadID, int partNumber, - boolean isMultipart, OmKeyInfo info, - boolean unsafeByteBufferConversion, - XceiverClientFactory xceiverClientFactory, long openID, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs - ) { - this.config = config; - this.xceiverClientFactory = xceiverClientFactory; - streamEntries = new ArrayList<>(); + public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) { + this.config = b.getClientConfig(); + this.xceiverClientFactory = b.getXceiverManager(); currentStreamIndex = 0; - this.omClient = omClient; + this.omClient = b.getOmClient(); + final OmKeyInfo info = b.getOpenHandler().getKeyInfo(); this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName()) .setBucketName(info.getBucketName()).setKeyName(info.getKeyName()) - .setReplicationConfig(replicationConfig).setDataSize(info.getDataSize()) - .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID) - .setMultipartUploadPartNumber(partNumber).build(); - this.requestID = requestId; - this.openID = openID; + .setReplicationConfig(b.getReplicationConfig()) + .setDataSize(info.getDataSize()) + .setIsMultipartKey(b.isMultipartKey()) + .setMultipartUploadID(b.getMultipartUploadID()) + .setMultipartUploadPartNumber(b.getMultipartNumber()) + .build(); + this.openID = b.getOpenHandler().getId(); this.excludeList = createExcludeList(); + this.streamBufferArgs = b.getStreamBufferArgs(); this.bufferPool = new BufferPool(streamBufferArgs.getStreamBufferSize(), (int) (streamBufferArgs.getStreamBufferMaxSize() / streamBufferArgs .getStreamBufferSize()), ByteStringConversion - .createByteBufferConversion(unsafeByteBufferConversion)); - this.clientMetrics = clientMetrics; - this.streamBufferArgs = streamBufferArgs; + .createByteBufferConversion(b.isUnsafeByteBufferConversionEnabled())); + this.clientMetrics = b.getClientMetrics(); } ExcludeList createExcludeList() { @@ -126,25 +116,6 @@ ExcludeList createExcludeList() { Clock.system(ZoneOffset.UTC)); } - BlockOutputStreamEntryPool(ContainerClientMetrics clientMetrics, - OzoneClientConfig clientConfig, StreamBufferArgs streamBufferArgs) { - streamEntries = new ArrayList<>(); - omClient = null; - keyArgs = null; - xceiverClientFactory = null; - config = clientConfig; - streamBufferArgs.setStreamBufferFlushDelay(false); - requestID = null; - int chunkSize = 0; - bufferPool = new BufferPool(chunkSize, 1); - - currentStreamIndex = 0; - openID = -1; - excludeList = createExcludeList(); - this.clientMetrics = clientMetrics; - this.streamBufferArgs = null; - } - /** * When a key is opened, it is possible that there are some blocks already * allocated to it for this open session. In this case, to make use of these @@ -156,10 +127,8 @@ ExcludeList createExcludeList() { * * @param version the set of blocks that are pre-allocated. * @param openVersion the version corresponding to the pre-allocation. - * @throws IOException */ - public void addPreallocateBlocks(OmKeyLocationInfoGroup version, - long openVersion) throws IOException { + public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long openVersion) { // server may return any number of blocks, (0 to any) // only the blocks allocated in this open session (block createVersion // equals to open session version) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java index 07d0f46069c..7f6ce87d60c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java @@ -23,17 +23,10 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.ContainerClientMetrics; -import org.apache.hadoop.hdds.scm.OzoneClientConfig; -import org.apache.hadoop.hdds.scm.StreamBufferArgs; -import org.apache.hadoop.hdds.scm.XceiverClientFactory; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; -import org.apache.hadoop.hdds.scm.storage.BufferPool; import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream; -import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.hdds.utils.IOUtils; -import org.apache.hadoop.security.token.Token; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,19 +68,10 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry { private int currentStreamIdx = 0; private long successfulBlkGrpAckedLen; - @SuppressWarnings({"parameternumber", "squid:S00107"}) - ECBlockOutputStreamEntry(BlockID blockID, String key, - XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length, - BufferPool bufferPool, Token token, - OzoneClientConfig config, ContainerClientMetrics clientMetrics, - StreamBufferArgs streamBufferArgs) { - super(blockID, key, xceiverClientManager, pipeline, length, bufferPool, - token, config, clientMetrics, streamBufferArgs); - assertInstanceOf( - pipeline.getReplicationConfig(), ECReplicationConfig.class); - this.replicationConfig = - (ECReplicationConfig) pipeline.getReplicationConfig(); - this.length = replicationConfig.getData() * length; + ECBlockOutputStreamEntry(Builder b) { + super(b); + this.replicationConfig = assertInstanceOf(b.getPipeline().getReplicationConfig(), ECReplicationConfig.class); + this.length = replicationConfig.getData() * b.getLength(); } @Override @@ -433,82 +417,9 @@ public ByteString calculateChecksum() throws IOException { /** * Builder class for ChunkGroupOutputStreamEntry. * */ - public static class Builder { - private BlockID blockID; - private String key; - private XceiverClientFactory xceiverClientManager; - private Pipeline pipeline; - private long length; - private BufferPool bufferPool; - private Token token; - private OzoneClientConfig config; - private ContainerClientMetrics clientMetrics; - private StreamBufferArgs streamBufferArgs; - - public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) { - this.blockID = bID; - return this; - } - - public ECBlockOutputStreamEntry.Builder setKey(String keys) { - this.key = keys; - return this; - } - - public ECBlockOutputStreamEntry.Builder setXceiverClientManager( - XceiverClientFactory - xClientManager) { - this.xceiverClientManager = xClientManager; - return this; - } - - public ECBlockOutputStreamEntry.Builder setPipeline(Pipeline ppln) { - this.pipeline = ppln; - return this; - } - - public ECBlockOutputStreamEntry.Builder setLength(long len) { - this.length = len; - return this; - } - - public ECBlockOutputStreamEntry.Builder setBufferPool(BufferPool pool) { - this.bufferPool = pool; - return this; - } - - public ECBlockOutputStreamEntry.Builder setConfig( - OzoneClientConfig clientConfig) { - this.config = clientConfig; - return this; - } - - public ECBlockOutputStreamEntry.Builder setToken( - Token bToken) { - this.token = bToken; - return this; - } - - public ECBlockOutputStreamEntry.Builder setClientMetrics( - ContainerClientMetrics containerClientMetrics) { - this.clientMetrics = containerClientMetrics; - return this; - } - - public ECBlockOutputStreamEntry.Builder setStreamBufferArgs( - StreamBufferArgs args) { - this.streamBufferArgs = args; - return this; - } - + public static class Builder extends BlockOutputStreamEntry.Builder { public ECBlockOutputStreamEntry build() { - return new ECBlockOutputStreamEntry(blockID, - key, - xceiverClientManager, - pipeline, - length, - bufferPool, - token, config, clientMetrics, streamBufferArgs); + return new ECBlockOutputStreamEntry(this); } } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java index e551605d842..e278097a495 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java @@ -17,19 +17,7 @@ */ package org.apache.hadoop.ozone.client.io; -import org.apache.hadoop.hdds.client.ECReplicationConfig; -import org.apache.hadoop.hdds.client.ReplicationConfig; -import org.apache.hadoop.hdds.scm.ContainerClientMetrics; -import org.apache.hadoop.hdds.scm.OzoneClientConfig; -import org.apache.hadoop.hdds.scm.StreamBufferArgs; -import org.apache.hadoop.hdds.scm.XceiverClientFactory; -import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; -import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; - -import java.time.Clock; -import java.time.ZoneOffset; /** * {@link BlockOutputStreamEntryPool} is responsible to manage OM communication @@ -44,37 +32,14 @@ * @see ECBlockOutputStreamEntry */ public class ECBlockOutputStreamEntryPool extends BlockOutputStreamEntryPool { - - @SuppressWarnings({"parameternumber", "squid:S00107"}) - public ECBlockOutputStreamEntryPool(OzoneClientConfig config, - OzoneManagerProtocol omClient, - String requestId, - ReplicationConfig replicationConfig, - String uploadID, - int partNumber, - boolean isMultipart, - OmKeyInfo info, - boolean unsafeByteBufferConversion, - XceiverClientFactory xceiverClientFactory, - long openID, - ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs) { - super(config, omClient, requestId, replicationConfig, uploadID, partNumber, - isMultipart, info, unsafeByteBufferConversion, xceiverClientFactory, - openID, clientMetrics, streamBufferArgs); - assert replicationConfig instanceof ECReplicationConfig; - } - - @Override - ExcludeList createExcludeList() { - return new ExcludeList(getConfig().getExcludeNodesExpiryTime(), - Clock.system(ZoneOffset.UTC)); + public ECBlockOutputStreamEntryPool(ECKeyOutputStream.Builder builder) { + super(builder); } @Override - BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { - return - new ECBlockOutputStreamEntry.Builder() - .setBlockID(subKeyInfo.getBlockID()) + ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { + final ECBlockOutputStreamEntry.Builder b = new ECBlockOutputStreamEntry.Builder(); + b.setBlockID(subKeyInfo.getBlockID()) .setKey(getKeyName()) .setXceiverClientManager(getXceiverClientFactory()) .setPipeline(subKeyInfo.getPipeline()) @@ -83,8 +48,8 @@ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) { .setBufferPool(getBufferPool()) .setToken(subKeyInfo.getToken()) .setClientMetrics(getClientMetrics()) - .setStreamBufferArgs(getStreamBufferArgs()) - .build(); + .setStreamBufferArgs(getStreamBufferArgs()); + return b.build(); } @Override diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index b5c36474ff9..878558073f7 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -17,12 +17,28 @@ */ package org.apache.hadoop.ozone.client.io; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.client.HddsClientUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.ozone.om.protocol.S3Auth; +import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder; +import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.nio.Buffer; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -35,30 +51,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.hdds.client.ECReplicationConfig; -import org.apache.hadoop.hdds.scm.OzoneClientConfig; -import org.apache.hadoop.hdds.scm.XceiverClientFactory; -import org.apache.hadoop.hdds.scm.client.HddsClientUtils; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; -import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream; -import org.apache.hadoop.io.ByteBufferPool; -import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; -import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; -import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.hadoop.ozone.om.protocol.S3Auth; -import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder; -import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * ECKeyOutputStream handles the EC writes by writing the data into underlying * block output streams chunk by chunk. @@ -100,22 +92,6 @@ private enum StripeWriteStatus { private long offset; // how much data has been ingested into the stream private long writeOffset; - private final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool; - - @VisibleForTesting - public List getStreamEntries() { - return blockOutputStreamEntryPool.getStreamEntries(); - } - - @VisibleForTesting - public XceiverClientFactory getXceiverClientFactory() { - return blockOutputStreamEntryPool.getXceiverClientFactory(); - } - - @VisibleForTesting - public List getLocationInfoList() { - return blockOutputStreamEntryPool.getLocationInfoList(); - } @VisibleForTesting public void insertFlushCheckpoint(long version) throws IOException { @@ -128,8 +104,7 @@ public long getFlushCheckpoint() { } private ECKeyOutputStream(Builder builder) { - super(builder.getReplicationConfig(), builder.getClientMetrics(), - builder.getClientConfig(), builder.getStreamBufferArgs()); + super(builder.getReplicationConfig(), new ECBlockOutputStreamEntryPool(builder)); this.config = builder.getClientConfig(); this.bufferPool = builder.getByteBufferPool(); // For EC, cell/chunk size and buffer size can be same for now. @@ -140,16 +115,6 @@ private ECKeyOutputStream(Builder builder) { ecChunkSize, numDataBlks, numParityBlks, bufferPool); chunkIndex = 0; ecStripeQueue = new ArrayBlockingQueue<>(config.getEcStripeQueueSize()); - OmKeyInfo info = builder.getOpenHandler().getKeyInfo(); - blockOutputStreamEntryPool = - new ECBlockOutputStreamEntryPool(config, - builder.getOmClient(), builder.getRequestID(), - builder.getReplicationConfig(), - builder.getMultipartUploadID(), builder.getMultipartNumber(), - builder.isMultipartKey(), - info, builder.isUnsafeByteBufferConversionEnabled(), - builder.getXceiverManager(), builder.getOpenHandler().getId(), - builder.getClientMetrics(), builder.getStreamBufferArgs()); this.writeOffset = 0; this.encoder = CodecUtil.createRawEncoderWithFallback( @@ -164,22 +129,9 @@ private ECKeyOutputStream(Builder builder) { this.atomicKeyCreation = builder.getAtomicKeyCreation(); } - /** - * When a key is opened, it is possible that there are some blocks already - * allocated to it for this open session. In this case, to make use of these - * blocks, we need to add these blocks to stream entries. But, a key's version - * also includes blocks from previous versions, we need to avoid adding these - * old blocks to stream entries, because these old blocks should not be picked - * for write. To do this, the following method checks that, only those - * blocks created in this particular open version are added to stream entries. - * - * @param version the set of blocks that are pre-allocated. - * @param openVersion the version corresponding to the pre-allocation. - * @throws IOException - */ - public void addPreallocateBlocks(OmKeyLocationInfoGroup version, - long openVersion) throws IOException { - blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion); + @Override + protected ECBlockOutputStreamEntryPool getBlockOutputStreamEntryPool() { + return (ECBlockOutputStreamEntryPool) super.getBlockOutputStreamEntryPool(); } /** @@ -218,6 +170,7 @@ private void rollbackAndReset(ECChunkBuffers stripe) throws IOException { final ByteBuffer[] dataBuffers = stripe.getDataBuffers(); offset -= Arrays.stream(dataBuffers).mapToInt(Buffer::limit).sum(); + final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = getBlockOutputStreamEntryPool(); final ECBlockOutputStreamEntry failedStreamEntry = blockOutputStreamEntryPool.getCurrentStreamEntry(); failedStreamEntry.resetToFirstEntry(); @@ -256,8 +209,7 @@ private void logStreamError(List failedStreams, private StripeWriteStatus commitStripeWrite(ECChunkBuffers stripe) throws IOException { - ECBlockOutputStreamEntry streamEntry = - blockOutputStreamEntryPool.getCurrentStreamEntry(); + final ECBlockOutputStreamEntry streamEntry = getBlockOutputStreamEntryPool().getCurrentStreamEntry(); List failedStreams = streamEntry.streamsWithWriteFailure(); if (!failedStreams.isEmpty()) { @@ -297,6 +249,7 @@ private void excludePipelineAndFailedDN(Pipeline pipeline, List failedStreams) { // Exclude the failed pipeline + final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = getBlockOutputStreamEntryPool(); blockOutputStreamEntryPool.getExcludeList().addPipeline(pipeline.getId()); // If the failure is NOT caused by other reasons (e.g. container full), @@ -362,6 +315,7 @@ private void generateParityCells() throws IOException { } private void writeDataCells(ECChunkBuffers stripe) throws IOException { + final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = getBlockOutputStreamEntryPool(); blockOutputStreamEntryPool.allocateBlockIfNeeded(); ByteBuffer[] dataCells = stripe.getDataBuffers(); for (int i = 0; i < numDataBlks; i++) { @@ -374,6 +328,7 @@ private void writeDataCells(ECChunkBuffers stripe) throws IOException { private void writeParityCells(ECChunkBuffers stripe) { // Move the stream entry cursor to parity block index + final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = getBlockOutputStreamEntryPool(); blockOutputStreamEntryPool .getCurrentStreamEntry().forceToFirstParityBlock(); ByteBuffer[] parityCells = stripe.getParityBuffers(); @@ -413,7 +368,7 @@ private void handleOutputStreamWrite(ByteBuffer buffer, boolean isParity) { // The len cannot be bigger than cell buffer size. assert buffer.limit() <= ecChunkSize : "The buffer size: " + buffer.limit() + " should not exceed EC chunk size: " + ecChunkSize; - writeToOutputStream(blockOutputStreamEntryPool.getCurrentStreamEntry(), + writeToOutputStream(getBlockOutputStreamEntryPool().getCurrentStreamEntry(), buffer.array(), buffer.limit(), 0, isParity); } catch (Exception e) { markStreamAsFailed(e); @@ -449,8 +404,7 @@ private void handleException(BlockOutputStreamEntry streamEntry, Preconditions.checkNotNull(t); boolean containerExclusionException = checkIfContainerToExclude(t); if (containerExclusionException) { - blockOutputStreamEntryPool.getExcludeList() - .addPipeline(streamEntry.getPipeline().getId()); + getBlockOutputStreamEntryPool().getExcludeList().addPipeline(streamEntry.getPipeline().getId()); } markStreamAsFailed(exception); } @@ -460,7 +414,7 @@ private void markStreamClosed() { } private void markStreamAsFailed(Exception e) { - blockOutputStreamEntryPool.getCurrentStreamEntry().markFailed(e); + getBlockOutputStreamEntryPool().getCurrentStreamEntry().markFailed(e); } @Override @@ -470,6 +424,7 @@ public void flush() { private void closeCurrentStreamEntry() throws IOException { + final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = getBlockOutputStreamEntryPool(); if (!blockOutputStreamEntryPool.isEmpty()) { while (true) { try { @@ -503,6 +458,7 @@ public void close() throws IOException { return; } closed = true; + final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = getBlockOutputStreamEntryPool(); try { if (!closing) { // If stripe buffer is not empty, encode and flush the stripe. @@ -614,20 +570,6 @@ public static void padBufferToLimit(ByteBuffer buf, int limit) { buf.position(limit); } - public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { - return blockOutputStreamEntryPool.getCommitUploadPartInfo(); - } - - @VisibleForTesting - public ExcludeList getExcludeList() { - return blockOutputStreamEntryPool.getExcludeList(); - } - - @Override - public Map getMetadata() { - return this.blockOutputStreamEntryPool.getMetadata(); - } - /** * Builder class of ECKeyOutputStream. */ @@ -682,9 +624,8 @@ public ECKeyOutputStream build() { */ private void checkNotClosed() throws IOException { if (closing || closed) { - throw new IOException( - ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " - + blockOutputStreamEntryPool.getKeyName()); + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + + getBlockOutputStreamEntryPool().getKeyName()); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 8b128e9cd94..9ea17cf8b25 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -69,7 +69,6 @@ public class KeyOutputStream extends OutputStream implements Syncable, KeyMetadataAware { - private OzoneClientConfig config; private final ReplicationConfig replication; /** @@ -105,11 +104,8 @@ enum StreamAction { */ private boolean atomicKeyCreation; - public KeyOutputStream(ReplicationConfig replicationConfig, - ContainerClientMetrics clientMetrics, OzoneClientConfig clientConfig, - StreamBufferArgs streamBufferArgs) { + public KeyOutputStream(ReplicationConfig replicationConfig, BlockOutputStreamEntryPool blockOutputStreamEntryPool) { this.replication = replicationConfig; - this.config = clientConfig; closed = false; this.retryPolicyMap = HddsClientUtils.getExceptionList() .stream() @@ -117,18 +113,16 @@ public KeyOutputStream(ReplicationConfig replicationConfig, e -> RetryPolicies.TRY_ONCE_THEN_FAIL)); retryCount = 0; offset = 0; - blockOutputStreamEntryPool = - new BlockOutputStreamEntryPool(clientMetrics, clientConfig, streamBufferArgs); + this.blockOutputStreamEntryPool = blockOutputStreamEntryPool; } - @VisibleForTesting - public List getStreamEntries() { - return blockOutputStreamEntryPool.getStreamEntries(); + protected BlockOutputStreamEntryPool getBlockOutputStreamEntryPool() { + return blockOutputStreamEntryPool; } @VisibleForTesting - public XceiverClientFactory getXceiverClientFactory() { - return blockOutputStreamEntryPool.getXceiverClientFactory(); + public List getStreamEntries() { + return blockOutputStreamEntryPool.getStreamEntries(); } @VisibleForTesting @@ -146,39 +140,18 @@ public long getClientID() { return clientID; } - @SuppressWarnings({"parameternumber", "squid:S00107"}) - public KeyOutputStream( - OzoneClientConfig config, - OpenKeySession handler, - XceiverClientFactory xceiverClientManager, - OzoneManagerProtocol omClient, - String requestId, ReplicationConfig replicationConfig, - String uploadID, int partNumber, boolean isMultipart, - boolean unsafeByteBufferConversion, - ContainerClientMetrics clientMetrics, - boolean atomicKeyCreation, StreamBufferArgs streamBufferArgs - ) { - this.config = config; - this.replication = replicationConfig; - blockOutputStreamEntryPool = - new BlockOutputStreamEntryPool( - config, - omClient, - requestId, replicationConfig, - uploadID, partNumber, - isMultipart, handler.getKeyInfo(), - unsafeByteBufferConversion, - xceiverClientManager, - handler.getId(), - clientMetrics, streamBufferArgs); + public KeyOutputStream(Builder b) { + this.replication = b.replicationConfig; + this.blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(b); + final OzoneClientConfig config = b.getClientConfig(); this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException( config.getMaxRetryCount(), config.getRetryInterval()); this.retryCount = 0; this.isException = false; this.writeOffset = 0; - this.clientID = handler.getId(); - this.atomicKeyCreation = atomicKeyCreation; - this.streamBufferArgs = streamBufferArgs; + this.clientID = b.getOpenHandler().getId(); + this.atomicKeyCreation = b.getAtomicKeyCreation(); + this.streamBufferArgs = b.getStreamBufferArgs(); } /** @@ -192,10 +165,8 @@ public KeyOutputStream( * * @param version the set of blocks that are pre-allocated. * @param openVersion the version corresponding to the pre-allocation. - * @throws IOException */ - public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup version, - long openVersion) throws IOException { + public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup version, long openVersion) { blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion); } @@ -729,20 +700,7 @@ public boolean getAtomicKeyCreation() { } public KeyOutputStream build() { - return new KeyOutputStream( - clientConfig, - openHandler, - xceiverManager, - omClient, - requestID, - replicationConfig, - multipartUploadID, - multipartNumber, - isMultipartKey, - unsafeByteBufferConversion, - clientMetrics, - atomicKeyCreation, - streamBufferArgs); + return new KeyOutputStream(this); } } diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java index 7760e88e484..718e724e585 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java @@ -63,10 +63,10 @@ public class TestECBlockOutputStreamEntry { try (XceiverClientManager manager = new XceiverClientManager(new OzoneConfiguration())) { HashSet clients = new HashSet<>(); - ECBlockOutputStreamEntry entry = new ECBlockOutputStreamEntry.Builder() - .setXceiverClientManager(manager) - .setPipeline(anECPipeline) - .build(); + final ECBlockOutputStreamEntry.Builder b = new ECBlockOutputStreamEntry.Builder(); + b.setXceiverClientManager(manager) + .setPipeline(anECPipeline); + final ECBlockOutputStreamEntry entry = b.build(); for (int i = 0; i < nodes.size(); i++) { clients.add( manager.acquireClient( @@ -101,10 +101,10 @@ public class TestECBlockOutputStreamEntry { try (XceiverClientManager manager = new XceiverClientManager(new OzoneConfiguration())) { HashSet clients = new HashSet<>(); - ECBlockOutputStreamEntry entry = new ECBlockOutputStreamEntry.Builder() - .setXceiverClientManager(manager) - .setPipeline(anECPipeline) - .build(); + final ECBlockOutputStreamEntry.Builder b = new ECBlockOutputStreamEntry.Builder(); + b.setXceiverClientManager(manager) + .setPipeline(anECPipeline); + final ECBlockOutputStreamEntry entry = b.build(); for (int i = 0; i < nodes.size(); i++) { clients.add( manager.acquireClient( diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java index 98351600290..0687a0fb8e2 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java @@ -22,8 +22,6 @@ import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.scm.OzoneClientConfig; -import org.apache.hadoop.hdds.scm.StreamBufferArgs; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; @@ -79,10 +77,7 @@ public KeyOutputStream getKeyOutputStream() { OzoneConfiguration conf = new OzoneConfiguration(); ReplicationConfig replicationConfig = ReplicationConfig.getDefault(conf); - OzoneClientConfig ozoneClientConfig = conf.getObject(OzoneClientConfig.class); - StreamBufferArgs streamBufferArgs = - StreamBufferArgs.getDefaultStreamBufferArgs(replicationConfig, ozoneClientConfig); - return new KeyOutputStream(replicationConfig, null, ozoneClientConfig, streamBufferArgs) { + return new KeyOutputStream(replicationConfig, null) { @Override public synchronized OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {