Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10983. EC Key read corruption when the replica index of container in DN mismatches #6779

Merged
merged 23 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a1d5c4a
HDDS-10983. EC Key read corruption when the replica index of containe…
swamirishi Jun 6, 2024
8815940
HDDS-10983. Add Testcase
swamirishi Jun 6, 2024
4287270
HDDS-10983. Make the read api compatible with older clients
swamirishi Jun 7, 2024
2bb1b9d
HDDS-10983. Remove stacktrace
swamirishi Jun 7, 2024
5886cf4
HDDS-10983. Fix checkstyle
swamirishi Jun 7, 2024
cfc7415
HDDS-10983. Address review comments
swamirishi Jun 7, 2024
8edc907
HDDS-10983. Fix testcase
swamirishi Jun 7, 2024
1476d59
HDDS-10983. Add client version test
swamirishi Jun 8, 2024
437594d
HDDS-10983. Fix test cases
swamirishi Jun 9, 2024
65994f3
HDDS-10983. Fix issues
swamirishi Jun 10, 2024
731107c
HDDS-10983. Fix testcases
swamirishi Jun 10, 2024
5857567
HDDS-10983. Fix checkstyle
swamirishi Jun 10, 2024
dd11beb
HDDS-10983. Fix Acceptance test
swamirishi Jun 11, 2024
4c4e2c7
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 11, 2024
0d769da
HDDS-10983. Fix checkstyle
swamirishi Jun 11, 2024
45279a3
HDDS-10983. change pipeline supplier usage
swamirishi Jun 13, 2024
58b089d
HDDS-10983. Address review comments and simplify testcase
swamirishi Jun 14, 2024
e4c0d67
HDDS-10983. Convert to mapToInt
swamirishi Jun 14, 2024
da98011
HDDS-11013. Merge upstream master
swamirishi Jun 17, 2024
5db065f
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 17, 2024
b0764e4
HDDS-10983. Merge master
swamirishi Jun 17, 2024
3ae6768
HDDS-10983. Address reveiw comments
swamirishi Jun 19, 2024
040d6d8
HDDS-10983. Move replica index validation outside block manager
swamirishi Jun 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdds.HddsUtils.processForDebug;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.getContainerCommandRequestProtoBuilder;

/**
* {@link XceiverClientSpi} implementation, the standalone client.
Expand Down Expand Up @@ -337,8 +338,7 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry(

return TracingUtil.executeInNewSpan(spanName,
() -> {
ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
ContainerCommandRequestProto finalPayload = getContainerCommandRequestProtoBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan()).build();
return sendCommandWithRetry(finalPayload, validators);
});
Expand Down Expand Up @@ -490,8 +490,7 @@ public XceiverClientReply sendCommandAsync(

try (Scope ignored = GlobalTracer.get().activateSpan(span)) {

ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
ContainerCommandRequestProto finalPayload = getContainerCommandRequestProtoBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
XceiverClientReply asyncReply =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.getContainerCommandRequestProtoBuilder;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;

/**
Expand Down Expand Up @@ -204,7 +205,7 @@ private DataStreamOutput setupStream(Pipeline pipeline) throws IOException {
// it or remove it completely if possible
String id = pipeline.getFirstNode().getUuidString();
ContainerProtos.ContainerCommandRequestProto.Builder builder =
ContainerProtos.ContainerCommandRequestProto.newBuilder()
getContainerCommandRequestProtoBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The patch is huge (122K), hard to review. Size could be reduced by prefactoring:

  • introduce this factory method
  • replace all calls of newBuilder()

in a separate patch, without any functional changes, before the fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok will raise a patch for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adoroszlai @sodonnel I have raised another patch since this patch is becomning too big to review #6812

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other PR ,merged now.

.setCmdType(ContainerProtos.Type.StreamInit)
.setContainerID(blockID.get().getContainerID())
.setDatanodeUuid(id).setWriteChunk(writeChunkRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
Expand Down Expand Up @@ -60,6 +62,7 @@ public class ChunkInputStream extends InputStream
private final ChunkInfo chunkInfo;
private final long length;
private final BlockID blockID;
private ContainerProtos.DatanodeBlockID datanodeBlockID;
private final XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private final Supplier<Pipeline> pipelineSupplier;
Expand Down Expand Up @@ -290,13 +293,27 @@ protected synchronized void releaseClient() {
}
}

/**
* Updates DatanodeBlockId which based on blockId.
*/
private void updateDatanodeBlockId() throws IOException {
DatanodeDetails closestNode = pipelineSupplier.get().getClosestNode();
int replicaIdx = pipelineSupplier.get().getReplicaIndex(closestNode);
ContainerProtos.DatanodeBlockID.Builder builder = blockID.getDatanodeBlockIDProtobufBuilder();
if (replicaIdx > 0) {
builder.setReplicaIndex(replicaIdx);
}
datanodeBlockID = builder.build();
}

/**
* Acquire new client if previous one was released.
*/
protected synchronized void acquireClient() throws IOException {
if (xceiverClientFactory != null && xceiverClient == null) {
xceiverClient = xceiverClientFactory.acquireClientForReadData(
pipelineSupplier.get());
updateDatanodeBlockId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please store Pipeline pipeline = pipelineSupplier.get() and pass it to updateDatanodeBlockId().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one more pipelineSupplier.get() leftover in updateDatanodeBlockId():

int replicaIdx = pipelineSupplier.get().getReplicaIndex(closestNode);

}
}

Expand Down Expand Up @@ -423,7 +440,7 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)

ReadChunkResponseProto readChunkResponse =
ContainerProtocolCalls.readChunk(xceiverClient,
readChunkInfo, blockID, validators, tokenSupplier.get());
readChunkInfo, datanodeBlockID, validators, tokenSupplier.get());

if (readChunkResponse.hasData()) {
return readChunkResponse.getData().asReadOnlyByteBufferList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE))
.setNodes(Arrays.asList(dataLocation))
.setId(PipelineID.valueOf(dataLocation.getUuid())).setReplicaIndexes(
ImmutableMap.of(dataLocation, locationIndex + 1))
.setId(PipelineID.valueOf(dataLocation.getUuid()))
.setReplicaIndexes(ImmutableMap.of(dataLocation, locationIndex + 1))
.setState(Pipeline.PipelineState.CLOSED)
.build();

Expand Down Expand Up @@ -228,6 +228,7 @@ protected Function<BlockID, BlockLocationInfo> ecPipelineRefreshFunction(
HddsProtos.ReplicationFactor.ONE))
.setNodes(Collections.singletonList(curIndexNode))
.setId(PipelineID.randomId())
.setReplicaIndexes(Collections.singletonMap(curIndexNode, replicaIndex))
.setState(Pipeline.PipelineState.CLOSED)
.build();
blockLocationInfo.setPipeline(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,49 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;

import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* BlockID of Ozone (containerID + localID + blockCommitSequenceId).
* BlockID of Ozone (containerID + localID + blockCommitSequenceId + replicaIndex).
*/

public class BlockID {

private final ContainerBlockID containerBlockID;
private long blockCommitSequenceId;
// null value when not set with private constructor.(This is to avoid confusion of replica index 0 & null value).
// This value would be only set when deserializing from ContainerProtos.DatanodeBlockID or copying from another
// BlockID object.
private Integer replicaIndex;
private StackTraceElement[] stackTraceElements;

public BlockID(long containerID, long localID) {
this(containerID, localID, 0);
this(containerID, localID, 0, null);
}

private BlockID(long containerID, long localID, long bcsID) {
private BlockID(long containerID, long localID, long bcsID, Integer repIndex) {
containerBlockID = new ContainerBlockID(containerID, localID);
blockCommitSequenceId = bcsID;
this.replicaIndex = repIndex;
this.stackTraceElements = Thread.currentThread().getStackTrace();
swamirishi marked this conversation as resolved.
Show resolved Hide resolved
}

public BlockID(BlockID blockID) {
this(blockID.getContainerID(), blockID.getLocalID(), blockID.getBlockCommitSequenceId(),
blockID.getReplicaIndex());
}

public BlockID(ContainerBlockID containerBlockID) {
this(containerBlockID, 0);
this(containerBlockID, 0, null);
}

private BlockID(ContainerBlockID containerBlockID, long bcsId) {
private BlockID(ContainerBlockID containerBlockID, long bcsId, Integer repIndex) {
this.containerBlockID = containerBlockID;
blockCommitSequenceId = bcsId;
this.replicaIndex = repIndex;
this.stackTraceElements = Thread.currentThread().getStackTrace();
}

public long getContainerID() {
Expand All @@ -65,6 +81,15 @@ public void setBlockCommitSequenceId(long blockCommitSequenceId) {
this.blockCommitSequenceId = blockCommitSequenceId;
}

// Can return a null value in case it is not set.
public Integer getReplicaIndex() {
return replicaIndex;
}

public void setReplicaIndex(Integer replicaIndex) {
this.replicaIndex = replicaIndex;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unused. If removed, replicaIndex member can be final.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


public ContainerBlockID getContainerBlockID() {
return containerBlockID;
}
Expand All @@ -79,21 +104,33 @@ public String toString() {
public void appendTo(StringBuilder sb) {
containerBlockID.appendTo(sb);
sb.append(" bcsId: ").append(blockCommitSequenceId);
sb.append(" replicaIndex: ").append(replicaIndex);
sb.append(Arrays.stream(stackTraceElements).map(StackTraceElement::toString).collect(Collectors.joining("\n")));
swamirishi marked this conversation as resolved.
Show resolved Hide resolved
}

@JsonIgnore
public ContainerProtos.DatanodeBlockID getDatanodeBlockIDProtobuf() {
ContainerProtos.DatanodeBlockID.Builder blockID = getDatanodeBlockIDProtobufBuilder();
if (replicaIndex != null) {
blockID.setReplicaIndex(replicaIndex);
}
return blockID.build();
}

@JsonIgnore
public ContainerProtos.DatanodeBlockID.Builder getDatanodeBlockIDProtobufBuilder() {
return ContainerProtos.DatanodeBlockID.newBuilder().
setContainerID(containerBlockID.getContainerID())
.setLocalID(containerBlockID.getLocalID())
.setBlockCommitSequenceId(blockCommitSequenceId).build();
.setBlockCommitSequenceId(blockCommitSequenceId);
}

@JsonIgnore
public static BlockID getFromProtobuf(
ContainerProtos.DatanodeBlockID blockID) {
public static BlockID getFromProtobuf(ContainerProtos.DatanodeBlockID blockID) {
return new BlockID(blockID.getContainerID(),
blockID.getLocalID(), blockID.getBlockCommitSequenceId());
blockID.getLocalID(),
blockID.getBlockCommitSequenceId(),
blockID.hasReplicaIndex() ? blockID.getReplicaIndex() : null);
}

@JsonIgnore
Expand All @@ -107,7 +144,7 @@ public HddsProtos.BlockID getProtobuf() {
public static BlockID getFromProtobuf(HddsProtos.BlockID blockID) {
return new BlockID(
ContainerBlockID.getFromProtobuf(blockID.getContainerBlockID()),
blockID.getBlockCommitSequenceId());
blockID.getBlockCommitSequenceId(), null);
}

@Override
Expand All @@ -120,13 +157,13 @@ public boolean equals(Object o) {
}
BlockID blockID = (BlockID) o;
return containerBlockID.equals(blockID.getContainerBlockID())
&& blockCommitSequenceId == blockID.getBlockCommitSequenceId();
&& blockCommitSequenceId == blockID.getBlockCommitSequenceId()
&& Objects.equals(replicaIndex, blockID.getReplicaIndex());
swamirishi marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public int hashCode() {
return Objects
.hash(containerBlockID.getContainerID(), containerBlockID.getLocalID(),
blockCommitSequenceId);
return Objects.hash(containerBlockID.getContainerID(), containerBlockID.getLocalID(),
blockCommitSequenceId, replicaIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.JavaUtils;

import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.getContainerCommandRequestProtoBuilder;

/**
* Implementing the {@link Message} interface
* for {@link ContainerCommandRequestProto}.
*/
public final class ContainerCommandRequestMessage implements Message {
public static ContainerCommandRequestMessage toMessage(
ContainerCommandRequestProto request, String traceId) {
final ContainerCommandRequestProto.Builder b
= ContainerCommandRequestProto.newBuilder(request);
final ContainerCommandRequestProto.Builder b = getContainerCommandRequestProtoBuilder(request);
if (traceId != null) {
b.setTraceID(traceId);
}
Expand Down
Loading
Loading