Skip to content

Commit

Permalink
HDDS-9959. Propagate group remove to other datanodes during pipeline …
Browse files Browse the repository at this point in the history
…close (apache#5827)
  • Loading branch information
ivandika3 authored Jan 4, 2024
1 parent d7fd453 commit ac68072
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
.addHandler(new DeleteContainerCommandHandler(
dnConf.getContainerDeleteThreads(), clock,
dnConf.getCommandQueueLimit(), threadNamePrefix))
.addHandler(
new ClosePipelineCommandHandler(pipelineCommandExecutorService))
.addHandler(new ClosePipelineCommandHandler(conf,
pipelineCommandExecutorService))
.addHandler(new CreatePipelineCommandHandler(conf,
pipelineCommandExecutorService))
.addHandler(new SetNodeOperationalStateCommandHandler(conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,38 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server
.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;

/**
* Handler for close pipeline command received from SCM.
Expand All @@ -51,11 +61,23 @@ public class ClosePipelineCommandHandler implements CommandHandler {
private final AtomicInteger queuedCount = new AtomicInteger(0);
private long totalTime;
private final Executor executor;
private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;

/**
* Constructs a closePipelineCommand handler.
*/
public ClosePipelineCommandHandler(Executor executor) {
public ClosePipelineCommandHandler(ConfigurationSource conf,
Executor executor) {
this(RatisHelper.newRaftClient(conf), executor);
}

/**
* Constructs a closePipelineCommand handler.
*/
public ClosePipelineCommandHandler(
BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient,
Executor executor) {
this.newRaftClient = newRaftClient;
this.executor = executor;
}

Expand Down Expand Up @@ -84,6 +106,27 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
XceiverServerSpi server = ozoneContainer.getWriteChannel();
if (server.isExist(pipelineIdProto)) {
server.removeGroup(pipelineIdProto);
if (server instanceof XceiverServerRatis) {
// TODO: Refactor Ratis logic to XceiverServerRatis
// Propagate the group remove to the other Raft peers in the pipeline
XceiverServerRatis ratisServer = (XceiverServerRatis) server;
final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId());
final Collection<RaftPeer> peers = ratisServer.getRaftPeersInPipeline(pipelineID);
final boolean shouldDeleteRatisLogDirectory = ratisServer.getShouldDeleteRatisLogDirectory();
peers.stream()
.filter(peer -> !peer.getId().equals(ratisServer.getServer().getId()))
.forEach(peer -> {
try (RaftClient client = newRaftClient.apply(peer, ozoneContainer.getTlsClientConfig())) {
client.getGroupManagementApi(peer.getId())
.remove(raftGroupId, shouldDeleteRatisLogDirectory, !shouldDeleteRatisLogDirectory);
} catch (GroupMismatchException ae) {
// ignore silently since this means that the group has been closed by earlier close pipeline
// command in another datanode
} catch (IOException ioe) {
LOG.warn("Failed to remove group {} for peer {}", raftGroupId, peer.getId(), ioe);
}
});
}
LOG.info("Close Pipeline {} command on datanode {}.", pipelineID,
dn.getUuidString());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.protocol.ClientId;
Expand Down Expand Up @@ -622,6 +623,10 @@ public RaftServer.Division getServerDivision(RaftGroupId id)
return server.getDivision(id);
}

public boolean getShouldDeleteRatisLogDirectory() {
return this.shouldDeleteRatisLogDirectory;
}

private void processReply(RaftClientReply reply) throws IOException {
// NotLeader exception is thrown only when the raft server to which the
// request is submitted is not the leader. The request will be rejected
Expand Down Expand Up @@ -919,6 +924,11 @@ public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException {
return minIndex == null ? -1 : minIndex;
}

public Collection<RaftPeer> getRaftPeersInPipeline(PipelineID pipelineId) throws IOException {
final RaftGroupId groupId = RaftGroupId.valueOf(pipelineId.getId());
return server.getDivision(groupId).getGroup().getPeers();
}

public void notifyGroupRemove(RaftGroupId gid) {
raftGids.remove(gid);
// Remove any entries for group leader map
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

import com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.GroupManagementApi;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeEach;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Test cases to verify ClosePipelineCommandHandler.
*/
public class TestClosePipelineCommandHandler {

private OzoneContainer ozoneContainer;
private StateContext stateContext;
private SCMConnectionManager connectionManager;
private RaftClient raftClient;
private GroupManagementApi raftClientGroupManager;
private OzoneConfiguration conf;

@BeforeEach
public void setup() throws Exception {
conf = new OzoneConfiguration();
ozoneContainer = mock(OzoneContainer.class);
connectionManager = mock(SCMConnectionManager.class);
raftClient = mock(RaftClient.class);
raftClientGroupManager = mock(GroupManagementApi.class);
lenient().when(raftClient.getGroupManagementApi(
any(RaftPeerId.class))).thenReturn(raftClientGroupManager);
}

@Test
void testPipelineClose() throws IOException {
final List<DatanodeDetails> datanodes = getDatanodes();
final DatanodeDetails currentDatanode = datanodes.get(0);
final PipelineID pipelineID = PipelineID.randomId();
final SCMCommand<ClosePipelineCommandProto> command =
new ClosePipelineCommand(pipelineID);
stateContext = ContainerTestUtils.getMockContext(currentDatanode, conf);

final boolean shouldDeleteRatisLogDirectory = true;
XceiverServerRatis writeChannel = mock(XceiverServerRatis.class);
when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel);
when(writeChannel.getShouldDeleteRatisLogDirectory()).thenReturn(shouldDeleteRatisLogDirectory);
when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(true);
Collection<RaftPeer> raftPeers = datanodes.stream()
.map(RatisHelper::toRaftPeer)
.collect(Collectors.toList());
when(writeChannel.getServer()).thenReturn(mock(RaftServer.class));
when(writeChannel.getServer().getId()).thenReturn(RatisHelper.toRaftPeerId(currentDatanode));
when(writeChannel.getRaftPeersInPipeline(pipelineID)).thenReturn(raftPeers);

final ClosePipelineCommandHandler commandHandler =
new ClosePipelineCommandHandler((leader, tls) -> raftClient, MoreExecutors.directExecutor());
commandHandler.handle(command, ozoneContainer, stateContext, connectionManager);

verify(writeChannel, times(1))
.removeGroup(pipelineID.getProtobuf());

verify(raftClientGroupManager, times(2))
.remove(any(), eq(shouldDeleteRatisLogDirectory), eq(!shouldDeleteRatisLogDirectory));
}

@Test
void testCommandIdempotency() throws IOException {
final List<DatanodeDetails> datanodes = getDatanodes();
final DatanodeDetails currentDatanode = datanodes.get(0);
final PipelineID pipelineID = PipelineID.randomId();
final SCMCommand<ClosePipelineCommandProto> command =
new ClosePipelineCommand(pipelineID);
stateContext = ContainerTestUtils.getMockContext(currentDatanode, conf);

XceiverServerRatis writeChannel = mock(XceiverServerRatis.class);
when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel);
// When the pipeline has been closed earlier by other datanode that received a close pipeline command
when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(false);

final ClosePipelineCommandHandler commandHandler =
new ClosePipelineCommandHandler(conf, MoreExecutors.directExecutor());
commandHandler.handle(command, ozoneContainer, stateContext, connectionManager);

verify(writeChannel, times(0))
.removeGroup(pipelineID.getProtobuf());

verify(raftClientGroupManager, times(0))
.remove(any(), anyBoolean(), anyBoolean());
}

private List<DatanodeDetails> getDatanodes() {
final DatanodeDetails dnOne = MockDatanodeDetails.randomDatanodeDetails();
final DatanodeDetails dnTwo = MockDatanodeDetails.randomDatanodeDetails();
final DatanodeDetails dnThree = MockDatanodeDetails.randomDatanodeDetails();
return Arrays.asList(dnOne, dnTwo, dnThree);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,16 @@
*/
package org.apache.hadoop.hdds.scm.pipeline;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.SupportedRpcType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Utility class for Ratis pipelines. Contains methods to create and destroy
* ratis pipelines.
* Utility class for Ratis pipelines.
*/
public final class RatisPipelineUtils {

Expand All @@ -48,56 +35,6 @@ public final class RatisPipelineUtils {

private RatisPipelineUtils() {
}
/**
* Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
* the datanodes.
*
* @param pipeline - Pipeline to be destroyed
* @param ozoneConf - Ozone configuration
* @param grpcTlsConfig
* @throws IOException
*/
public static void destroyPipeline(Pipeline pipeline,
ConfigurationSource ozoneConf,
GrpcTlsConfig grpcTlsConfig) {
final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
if (LOG.isDebugEnabled()) {
LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
}
for (DatanodeDetails dn : pipeline.getNodes()) {
try {
destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
} catch (IOException e) {
LOG.warn("Pipeline destroy failed for pipeline={} dn={} exception={}",
pipeline.getId(), dn, e.getMessage());
}
}
}

/**
* Sends ratis command to destroy pipeline on the given datanode.
*
* @param dn - Datanode on which pipeline needs to be destroyed
* @param pipelineID - ID of pipeline to be destroyed
* @param ozoneConf - Ozone configuration
* @param grpcTlsConfig - grpc tls configuration
* @throws IOException
*/
static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
ConfigurationSource ozoneConf, GrpcTlsConfig grpcTlsConfig)
throws IOException {
final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final RaftPeer p = RatisHelper.toRaftPeer(dn);
try (RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
retryPolicy, grpcTlsConfig, ozoneConf)) {
client.getGroupManagementApi(p.getId())
.remove(RaftGroupId.valueOf(pipelineID.getId()), true, false);
}
}

/**
* Return the list of pipelines who share the same set of datanodes
Expand Down

0 comments on commit ac68072

Please sign in to comment.