Skip to content

Commit

Permalink
HDDS-10875. XceiverRatisServer#getRaftPeersInPipeline should be calle…
Browse files Browse the repository at this point in the history
…d before XceiverRatisServer#removeGroup (apache#6696)
  • Loading branch information
ivandika3 authored and jojochuang committed May 23, 2024
1 parent 15a51fe commit e2dd311
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
Expand Down Expand Up @@ -244,6 +245,12 @@ public static BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient(
RatisHelper.createRetryPolicy(conf), tlsConfig, conf);
}

public static BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClientNoRetry(
ConfigurationSource conf) {
return (leader, tlsConfig) -> newRaftClient(getRpcType(conf), leader,
RetryPolicies.noRetry(), tlsConfig, conf);
}

public static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
ConfigurationSource configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
Expand Down Expand Up @@ -68,7 +69,7 @@ public class ClosePipelineCommandHandler implements CommandHandler {
*/
public ClosePipelineCommandHandler(ConfigurationSource conf,
Executor executor) {
this(RatisHelper.newRaftClient(conf), executor);
this(RatisHelper.newRaftClientNoRetry(conf), executor);
}

/**
Expand Down Expand Up @@ -105,14 +106,16 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
try {
XceiverServerSpi server = ozoneContainer.getWriteChannel();
if (server.isExist(pipelineIdProto)) {
server.removeGroup(pipelineIdProto);
if (server instanceof XceiverServerRatis) {
// TODO: Refactor Ratis logic to XceiverServerRatis
// Propagate the group remove to the other Raft peers in the pipeline
XceiverServerRatis ratisServer = (XceiverServerRatis) server;
final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId());
final Collection<RaftPeer> peers = ratisServer.getRaftPeersInPipeline(pipelineID);
final boolean shouldDeleteRatisLogDirectory = ratisServer.getShouldDeleteRatisLogDirectory();
// This might throw GroupMismatchException if the Ratis group has been closed by other datanodes
final Collection<RaftPeer> peers = ratisServer.getRaftPeersInPipeline(pipelineID);
// Try to send remove group for the other datanodes first, ignoring GroupMismatchException
// if the Ratis group has been closed in the other datanodes
peers.stream()
.filter(peer -> !peer.getId().equals(ratisServer.getServer().getId()))
.forEach(peer -> {
Expand All @@ -122,19 +125,34 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
} catch (GroupMismatchException ae) {
// ignore silently since this means that the group has been closed by earlier close pipeline
// command in another datanode
LOG.debug("Failed to remove group {} for pipeline {} on peer {} since the group has " +
"been removed by earlier close pipeline command handled in another datanode", raftGroupId,
pipelineID, peer.getId());
} catch (IOException ioe) {
LOG.warn("Failed to remove group {} for peer {}", raftGroupId, peer.getId(), ioe);
LOG.warn("Failed to remove group {} of pipeline {} on peer {}",
raftGroupId, pipelineID, peer.getId(), ioe);
}
});
}
// Remove the Ratis group from the current datanode pipeline, might throw GroupMismatchException as
// well. It is a no-op for XceiverServerSpi implementations (e.g. XceiverServerGrpc)
server.removeGroup(pipelineIdProto);
LOG.info("Close Pipeline {} command on datanode {}.", pipelineID,
dn.getUuidString());
} else {
LOG.debug("Ignoring close pipeline command for pipeline {} " +
"as it does not exist", pipelineID);
LOG.debug("Ignoring close pipeline command for pipeline {} on datanode {} " +
"as it does not exist", pipelineID, dn.getUuidString());
}
} catch (IOException e) {
LOG.error("Can't close pipeline {}", pipelineID, e);
Throwable gme = HddsClientUtils.containsException(e, GroupMismatchException.class);
if (gme != null) {
// ignore silently since this means that the group has been closed by earlier close pipeline
// command in another datanode
LOG.debug("The group for pipeline {} on datanode {} has been removed by earlier close " +
"pipeline command handled in another datanode", pipelineID, dn.getUuidString());
} else {
LOG.error("Can't close pipeline {}", pipelineID, e);
}
} finally {
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
Expand Down

0 comments on commit e2dd311

Please sign in to comment.