Skip to content

Commit

Permalink
HDDS-10387. Fix parameter number warning in KeyOutputStream and relat…
Browse files Browse the repository at this point in the history
…ed classes (#6225)

(cherry picked from commit c672453)
  • Loading branch information
szetszwo authored and xichen01 committed Apr 16, 2024
1 parent 8fdba8e commit 4d6d98c
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 380 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.security.token.Token;

import com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.JavaUtils;

/**
* A BlockOutputStreamEntry manages the data writes into the DataNodes.
Expand All @@ -60,33 +61,28 @@ public class BlockOutputStreamEntry extends OutputStream {
private long currentPosition;
private final Token<OzoneBlockTokenIdentifier> token;

private BufferPool bufferPool;
private ContainerClientMetrics clientMetrics;
private StreamBufferArgs streamBufferArgs;

@SuppressWarnings({"parameternumber", "squid:S00107"})
BlockOutputStreamEntry(
BlockID blockID, String key,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
long length,
BufferPool bufferPool,
Token<OzoneBlockTokenIdentifier> token,
OzoneClientConfig config,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
) {
this.config = config;
private final BufferPool bufferPool;
private final ContainerClientMetrics clientMetrics;
private final StreamBufferArgs streamBufferArgs;

BlockOutputStreamEntry(Builder b) {
this.config = b.config;
this.outputStream = null;
this.blockID = blockID;
this.key = key;
this.xceiverClientManager = xceiverClientManager;
this.pipeline = pipeline;
this.token = token;
this.length = length;
this.blockID = b.blockID;
this.key = b.key;
this.xceiverClientManager = b.xceiverClientManager;
this.pipeline = b.pipeline;
this.token = b.token;
this.length = b.length;
this.currentPosition = 0;
this.bufferPool = bufferPool;
this.clientMetrics = clientMetrics;
this.streamBufferArgs = streamBufferArgs;
this.bufferPool = b.bufferPool;
this.clientMetrics = b.clientMetrics;
this.streamBufferArgs = b.streamBufferArgs;
}

@Override
public String toString() {
return JavaUtils.getClassSimpleName(getClass()) + ":" + key + " " + blockID;
}

/**
Expand Down Expand Up @@ -362,6 +358,14 @@ public static class Builder {
private ContainerClientMetrics clientMetrics;
private StreamBufferArgs streamBufferArgs;

public Pipeline getPipeline() {
return pipeline;
}

public long getLength() {
return length;
}

public Builder setBlockID(BlockID bID) {
this.blockID = bID;
return this;
Expand Down Expand Up @@ -412,13 +416,7 @@ public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) {
}

public BlockOutputStreamEntry build() {
return new BlockOutputStreamEntry(blockID,
key,
xceiverClientManager,
pipeline,
length,
bufferPool,
token, config, clientMetrics, streamBufferArgs);
return new BlockOutputStreamEntry(this);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.ListIterator;
import java.util.Map;

import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
Expand Down Expand Up @@ -62,7 +61,7 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware {
/**
* List of stream entries that are used to write a block of data.
*/
private final List<BlockOutputStreamEntry> streamEntries;
private final List<BlockOutputStreamEntry> streamEntries = new ArrayList<>();
private final OzoneClientConfig config;
/**
* The actual stream entry we are writing into. Note that a stream entry is
Expand All @@ -73,7 +72,6 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware {
private final OzoneManagerProtocol omClient;
private final OmKeyArgs keyArgs;
private final XceiverClientFactory xceiverClientFactory;
private final String requestID;
/**
* A {@link BufferPool} shared between all
* {@link org.apache.hadoop.hdds.scm.storage.BlockOutputStream}s managed by
Expand All @@ -86,65 +84,38 @@ public class BlockOutputStreamEntryPool implements KeyMetadataAware {
private final ContainerClientMetrics clientMetrics;
private final StreamBufferArgs streamBufferArgs;

@SuppressWarnings({"parameternumber", "squid:S00107"})
public BlockOutputStreamEntryPool(
OzoneClientConfig config,
OzoneManagerProtocol omClient,
String requestId, ReplicationConfig replicationConfig,
String uploadID, int partNumber,
boolean isMultipart, OmKeyInfo info,
boolean unsafeByteBufferConversion,
XceiverClientFactory xceiverClientFactory, long openID,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
) {
this.config = config;
this.xceiverClientFactory = xceiverClientFactory;
streamEntries = new ArrayList<>();
public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) {
this.config = b.getClientConfig();
this.xceiverClientFactory = b.getXceiverManager();
currentStreamIndex = 0;
this.omClient = omClient;
this.omClient = b.getOmClient();
final OmKeyInfo info = b.getOpenHandler().getKeyInfo();
this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
.setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
.setReplicationConfig(replicationConfig).setDataSize(info.getDataSize())
.setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID)
.setMultipartUploadPartNumber(partNumber).build();
this.requestID = requestId;
this.openID = openID;
.setReplicationConfig(b.getReplicationConfig())
.setDataSize(info.getDataSize())
.setIsMultipartKey(b.isMultipartKey())
.setMultipartUploadID(b.getMultipartUploadID())
.setMultipartUploadPartNumber(b.getMultipartNumber())
.build();
this.openID = b.getOpenHandler().getId();
this.excludeList = createExcludeList();

this.streamBufferArgs = b.getStreamBufferArgs();
this.bufferPool =
new BufferPool(streamBufferArgs.getStreamBufferSize(),
(int) (streamBufferArgs.getStreamBufferMaxSize() / streamBufferArgs
.getStreamBufferSize()),
ByteStringConversion
.createByteBufferConversion(unsafeByteBufferConversion));
this.clientMetrics = clientMetrics;
this.streamBufferArgs = streamBufferArgs;
.createByteBufferConversion(b.isUnsafeByteBufferConversionEnabled()));
this.clientMetrics = b.getClientMetrics();
}

ExcludeList createExcludeList() {
return new ExcludeList(getConfig().getExcludeNodesExpiryTime(),
Clock.system(ZoneOffset.UTC));
}

BlockOutputStreamEntryPool(ContainerClientMetrics clientMetrics,
OzoneClientConfig clientConfig, StreamBufferArgs streamBufferArgs) {
streamEntries = new ArrayList<>();
omClient = null;
keyArgs = null;
xceiverClientFactory = null;
config = clientConfig;
streamBufferArgs.setStreamBufferFlushDelay(false);
requestID = null;
int chunkSize = 0;
bufferPool = new BufferPool(chunkSize, 1);

currentStreamIndex = 0;
openID = -1;
excludeList = createExcludeList();
this.clientMetrics = clientMetrics;
this.streamBufferArgs = null;
}

/**
* When a key is opened, it is possible that there are some blocks already
* allocated to it for this open session. In this case, to make use of these
Expand All @@ -156,10 +127,8 @@ ExcludeList createExcludeList() {
*
* @param version the set of blocks that are pre-allocated.
* @param openVersion the version corresponding to the pre-allocation.
* @throws IOException
*/
public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
long openVersion) throws IOException {
public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long openVersion) {
// server may return any number of blocks, (0 to any)
// only the blocks allocated in this open session (block createVersion
// equals to open session version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,10 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -75,19 +68,10 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry {
private int currentStreamIdx = 0;
private long successfulBlkGrpAckedLen;

@SuppressWarnings({"parameternumber", "squid:S00107"})
ECBlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientFactory xceiverClientManager, Pipeline pipeline, long length,
BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
OzoneClientConfig config, ContainerClientMetrics clientMetrics,
StreamBufferArgs streamBufferArgs) {
super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
token, config, clientMetrics, streamBufferArgs);
assertInstanceOf(
pipeline.getReplicationConfig(), ECReplicationConfig.class);
this.replicationConfig =
(ECReplicationConfig) pipeline.getReplicationConfig();
this.length = replicationConfig.getData() * length;
ECBlockOutputStreamEntry(Builder b) {
super(b);
this.replicationConfig = assertInstanceOf(b.getPipeline().getReplicationConfig(), ECReplicationConfig.class);
this.length = replicationConfig.getData() * b.getLength();
}

@Override
Expand Down Expand Up @@ -433,82 +417,9 @@ public ByteString calculateChecksum() throws IOException {
/**
* Builder class for ChunkGroupOutputStreamEntry.
* */
public static class Builder {
private BlockID blockID;
private String key;
private XceiverClientFactory xceiverClientManager;
private Pipeline pipeline;
private long length;
private BufferPool bufferPool;
private Token<OzoneBlockTokenIdentifier> token;
private OzoneClientConfig config;
private ContainerClientMetrics clientMetrics;
private StreamBufferArgs streamBufferArgs;

public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) {
this.blockID = bID;
return this;
}

public ECBlockOutputStreamEntry.Builder setKey(String keys) {
this.key = keys;
return this;
}

public ECBlockOutputStreamEntry.Builder setXceiverClientManager(
XceiverClientFactory
xClientManager) {
this.xceiverClientManager = xClientManager;
return this;
}

public ECBlockOutputStreamEntry.Builder setPipeline(Pipeline ppln) {
this.pipeline = ppln;
return this;
}

public ECBlockOutputStreamEntry.Builder setLength(long len) {
this.length = len;
return this;
}

public ECBlockOutputStreamEntry.Builder setBufferPool(BufferPool pool) {
this.bufferPool = pool;
return this;
}

public ECBlockOutputStreamEntry.Builder setConfig(
OzoneClientConfig clientConfig) {
this.config = clientConfig;
return this;
}

public ECBlockOutputStreamEntry.Builder setToken(
Token<OzoneBlockTokenIdentifier> bToken) {
this.token = bToken;
return this;
}

public ECBlockOutputStreamEntry.Builder setClientMetrics(
ContainerClientMetrics containerClientMetrics) {
this.clientMetrics = containerClientMetrics;
return this;
}

public ECBlockOutputStreamEntry.Builder setStreamBufferArgs(
StreamBufferArgs args) {
this.streamBufferArgs = args;
return this;
}

public static class Builder extends BlockOutputStreamEntry.Builder {
public ECBlockOutputStreamEntry build() {
return new ECBlockOutputStreamEntry(blockID,
key,
xceiverClientManager,
pipeline,
length,
bufferPool,
token, config, clientMetrics, streamBufferArgs);
return new ECBlockOutputStreamEntry(this);
}
}
}
Loading

0 comments on commit 4d6d98c

Please sign in to comment.