diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 51290cf80df..96771440549 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -250,8 +250,8 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, .addHandler(new DeleteContainerCommandHandler( dnConf.getContainerDeleteThreads(), clock, dnConf.getCommandQueueLimit(), threadNamePrefix)) - .addHandler( - new ClosePipelineCommandHandler(pipelineCommandExecutorService)) + .addHandler(new ClosePipelineCommandHandler(conf, + pipelineCommandExecutorService)) .addHandler(new CreatePipelineCommandHandler(conf, pipelineCommandExecutorService)) .addHandler(new SetNodeOperationalStateCommandHandler(conf, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java index f332ad4f134..5242c8686dc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java @@ -16,28 +16,38 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.ratis.RatisHelper; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.ozone.container.common.statemachine .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server .XceiverServerSpi; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.grpc.GrpcTlsConfig; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.exceptions.GroupMismatchException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; /** * Handler for close pipeline command received from SCM. @@ -51,11 +61,23 @@ public class ClosePipelineCommandHandler implements CommandHandler { private final AtomicInteger queuedCount = new AtomicInteger(0); private long totalTime; private final Executor executor; + private final BiFunction newRaftClient; /** * Constructs a closePipelineCommand handler. */ - public ClosePipelineCommandHandler(Executor executor) { + public ClosePipelineCommandHandler(ConfigurationSource conf, + Executor executor) { + this(RatisHelper.newRaftClient(conf), executor); + } + + /** + * Constructs a closePipelineCommand handler. + */ + public ClosePipelineCommandHandler( + BiFunction newRaftClient, + Executor executor) { + this.newRaftClient = newRaftClient; this.executor = executor; } @@ -84,6 +106,27 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer, XceiverServerSpi server = ozoneContainer.getWriteChannel(); if (server.isExist(pipelineIdProto)) { server.removeGroup(pipelineIdProto); + if (server instanceof XceiverServerRatis) { + // TODO: Refactor Ratis logic to XceiverServerRatis + // Propagate the group remove to the other Raft peers in the pipeline + XceiverServerRatis ratisServer = (XceiverServerRatis) server; + final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId()); + final Collection peers = ratisServer.getRaftPeersInPipeline(pipelineID); + final boolean shouldDeleteRatisLogDirectory = ratisServer.getShouldDeleteRatisLogDirectory(); + peers.stream() + .filter(peer -> !peer.getId().equals(ratisServer.getServer().getId())) + .forEach(peer -> { + try (RaftClient client = newRaftClient.apply(peer, ozoneContainer.getTlsClientConfig())) { + client.getGroupManagementApi(peer.getId()) + .remove(raftGroupId, shouldDeleteRatisLogDirectory, !shouldDeleteRatisLogDirectory); + } catch (GroupMismatchException ae) { + // ignore silently since this means that the group has been closed by earlier close pipeline + // command in another datanode + } catch (IOException ioe) { + LOG.warn("Failed to remove group {} for peer {}", raftGroupId, peer.getId(), ioe); + } + }); + } LOG.info("Close Pipeline {} command on datanode {}.", pipelineID, dn.getUuidString()); } else { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 4688ce4b278..6d119b17b3b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -83,6 +83,7 @@ import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.apache.ratis.protocol.exceptions.StateMachineException; import org.apache.ratis.protocol.ClientId; @@ -622,6 +623,10 @@ public RaftServer.Division getServerDivision(RaftGroupId id) return server.getDivision(id); } + public boolean getShouldDeleteRatisLogDirectory() { + return this.shouldDeleteRatisLogDirectory; + } + private void processReply(RaftClientReply reply) throws IOException { // NotLeader exception is thrown only when the raft server to which the // request is submitted is not the leader. The request will be rejected @@ -919,6 +924,11 @@ public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException { return minIndex == null ? -1 : minIndex; } + public Collection getRaftPeersInPipeline(PipelineID pipelineId) throws IOException { + final RaftGroupId groupId = RaftGroupId.valueOf(pipelineId.getId()); + return server.getDivision(groupId).getGroup().getPeers(); + } + public void notifyGroupRemove(RaftGroupId gid) { raftGids.remove(gid); // Remove any entries for group leader map diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java new file mode 100644 index 00000000000..d161f5537ae --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.ratis.RatisHelper; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.ozone.container.common.ContainerTestUtils; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.api.GroupManagementApi; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeEach; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test cases to verify ClosePipelineCommandHandler. + */ +public class TestClosePipelineCommandHandler { + + private OzoneContainer ozoneContainer; + private StateContext stateContext; + private SCMConnectionManager connectionManager; + private RaftClient raftClient; + private GroupManagementApi raftClientGroupManager; + private OzoneConfiguration conf; + + @BeforeEach + public void setup() throws Exception { + conf = new OzoneConfiguration(); + ozoneContainer = mock(OzoneContainer.class); + connectionManager = mock(SCMConnectionManager.class); + raftClient = mock(RaftClient.class); + raftClientGroupManager = mock(GroupManagementApi.class); + lenient().when(raftClient.getGroupManagementApi( + any(RaftPeerId.class))).thenReturn(raftClientGroupManager); + } + + @Test + void testPipelineClose() throws IOException { + final List datanodes = getDatanodes(); + final DatanodeDetails currentDatanode = datanodes.get(0); + final PipelineID pipelineID = PipelineID.randomId(); + final SCMCommand command = + new ClosePipelineCommand(pipelineID); + stateContext = ContainerTestUtils.getMockContext(currentDatanode, conf); + + final boolean shouldDeleteRatisLogDirectory = true; + XceiverServerRatis writeChannel = mock(XceiverServerRatis.class); + when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel); + when(writeChannel.getShouldDeleteRatisLogDirectory()).thenReturn(shouldDeleteRatisLogDirectory); + when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(true); + Collection raftPeers = datanodes.stream() + .map(RatisHelper::toRaftPeer) + .collect(Collectors.toList()); + when(writeChannel.getServer()).thenReturn(mock(RaftServer.class)); + when(writeChannel.getServer().getId()).thenReturn(RatisHelper.toRaftPeerId(currentDatanode)); + when(writeChannel.getRaftPeersInPipeline(pipelineID)).thenReturn(raftPeers); + + final ClosePipelineCommandHandler commandHandler = + new ClosePipelineCommandHandler((leader, tls) -> raftClient, MoreExecutors.directExecutor()); + commandHandler.handle(command, ozoneContainer, stateContext, connectionManager); + + verify(writeChannel, times(1)) + .removeGroup(pipelineID.getProtobuf()); + + verify(raftClientGroupManager, times(2)) + .remove(any(), eq(shouldDeleteRatisLogDirectory), eq(!shouldDeleteRatisLogDirectory)); + } + + @Test + void testCommandIdempotency() throws IOException { + final List datanodes = getDatanodes(); + final DatanodeDetails currentDatanode = datanodes.get(0); + final PipelineID pipelineID = PipelineID.randomId(); + final SCMCommand command = + new ClosePipelineCommand(pipelineID); + stateContext = ContainerTestUtils.getMockContext(currentDatanode, conf); + + XceiverServerRatis writeChannel = mock(XceiverServerRatis.class); + when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel); + // When the pipeline has been closed earlier by other datanode that received a close pipeline command + when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(false); + + final ClosePipelineCommandHandler commandHandler = + new ClosePipelineCommandHandler(conf, MoreExecutors.directExecutor()); + commandHandler.handle(command, ozoneContainer, stateContext, connectionManager); + + verify(writeChannel, times(0)) + .removeGroup(pipelineID.getProtobuf()); + + verify(raftClientGroupManager, times(0)) + .remove(any(), anyBoolean(), anyBoolean()); + } + + private List getDatanodes() { + final DatanodeDetails dnOne = MockDatanodeDetails.randomDatanodeDetails(); + final DatanodeDetails dnTwo = MockDatanodeDetails.randomDatanodeDetails(); + final DatanodeDetails dnThree = MockDatanodeDetails.randomDatanodeDetails(); + return Arrays.asList(dnOne, dnTwo, dnThree); + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index 6c68b808833..04c35cc1feb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -17,29 +17,16 @@ */ package org.apache.hadoop.hdds.scm.pipeline; -import java.io.IOException; import java.util.List; import java.util.stream.Collectors; import org.apache.hadoop.hdds.client.RatisReplicationConfig; -import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.ratis.RatisHelper; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.grpc.GrpcTlsConfig; -import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.retry.RetryPolicy; -import org.apache.ratis.rpc.SupportedRpcType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Utility class for Ratis pipelines. Contains methods to create and destroy - * ratis pipelines. + * Utility class for Ratis pipelines. */ public final class RatisPipelineUtils { @@ -48,56 +35,6 @@ public final class RatisPipelineUtils { private RatisPipelineUtils() { } - /** - * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all - * the datanodes. - * - * @param pipeline - Pipeline to be destroyed - * @param ozoneConf - Ozone configuration - * @param grpcTlsConfig - * @throws IOException - */ - public static void destroyPipeline(Pipeline pipeline, - ConfigurationSource ozoneConf, - GrpcTlsConfig grpcTlsConfig) { - final RaftGroup group = RatisHelper.newRaftGroup(pipeline); - if (LOG.isDebugEnabled()) { - LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group); - } - for (DatanodeDetails dn : pipeline.getNodes()) { - try { - destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig); - } catch (IOException e) { - LOG.warn("Pipeline destroy failed for pipeline={} dn={} exception={}", - pipeline.getId(), dn, e.getMessage()); - } - } - } - - /** - * Sends ratis command to destroy pipeline on the given datanode. - * - * @param dn - Datanode on which pipeline needs to be destroyed - * @param pipelineID - ID of pipeline to be destroyed - * @param ozoneConf - Ozone configuration - * @param grpcTlsConfig - grpc tls configuration - * @throws IOException - */ - static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID, - ConfigurationSource ozoneConf, GrpcTlsConfig grpcTlsConfig) - throws IOException { - final String rpcType = ozoneConf - .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, - ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); - final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf); - final RaftPeer p = RatisHelper.toRaftPeer(dn); - try (RaftClient client = RatisHelper - .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, - retryPolicy, grpcTlsConfig, ozoneConf)) { - client.getGroupManagementApi(p.getId()) - .remove(RaftGroupId.valueOf(pipelineID.getId()), true, false); - } - } /** * Return the list of pipelines who share the same set of datanodes